In [None]:
pip install thop

In [1]:
# =========================
# Cell 1 — Setup + Data + Model
# =========================

import os, time, copy, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_fscore_support

# FLOPs (thop)
try:
    from thop import profile
    THOP_AVAILABLE = True
except Exception:
    THOP_AVAILABLE = False
    print("thop not available — FLOPs will be reported as N/A")


# Settings
DATA_ROOT = "/kaggle/input/plant-disease-dataset/Dataset_Final_V2_Split"
OUT_DIR = "/kaggle/working/plantanet_relu_final"
os.makedirs(OUT_DIR, exist_ok=True)

IMG_SIZE = 160
BATCH_SIZE = 32
EPOCHS = 100

# Config 3 (best from tuning)
LR = 1e-3
WEIGHT_DECAY = 1e-4
MIXUP_ALPHA = 0.2

NUM_WORKERS = 4
SEED = 42

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {DEVICE} | CUDA: {torch.cuda.is_available()} | GPUs: {torch.cuda.device_count()}")


# Reproducibility
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


# Transforms
train_tfm = transforms.Compose([
    transforms.RandomResizedCrop(
        IMG_SIZE,
        scale=(0.9, 1.0),
        ratio=(0.95, 1.05)
    ),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(
        brightness=0.15,
        contrast=0.15,
        saturation=0.15
    ),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])

eval_tfm = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])


# Datasets & Dataloaders
train_dir = os.path.join(DATA_ROOT, "train")
val_dir   = os.path.join(DATA_ROOT, "val")
test_dir  = os.path.join(DATA_ROOT, "test")

assert os.path.exists(train_dir) and os.path.exists(val_dir), \
    f"Dataset folders not found under {DATA_ROOT}. Expected train/ val/."

train_ds = datasets.ImageFolder(train_dir, transform=train_tfm)
val_ds   = datasets.ImageFolder(val_dir,   transform=eval_tfm)

test_ds = datasets.ImageFolder(test_dir, transform=eval_tfm) if os.path.exists(test_dir) else None

train_loader = DataLoader(
    train_ds, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=NUM_WORKERS, pin_memory=True
)
val_loader = DataLoader(
    val_ds, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=True
)
test_loader = None
if test_ds is not None:
    test_loader = DataLoader(
        test_ds, batch_size=BATCH_SIZE, shuffle=False,
        num_workers=NUM_WORKERS, pin_memory=True
    )

class_names = train_ds.classes
NUM_CLASSES = len(class_names)
print(f"Classes={NUM_CLASSES} | Train={len(train_ds)}, Val={len(val_ds)}, Test={(len(test_ds) if test_ds else 0)}")


# Model definition — PlantaNet-ReLU
class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_ch, out_ch, stride=1):
        super().__init__()
        self.depth = nn.Conv2d(in_ch, in_ch, 3, stride, 1, groups=in_ch, bias=False)
        self.point = nn.Conv2d(in_ch, out_ch, 1, bias=False)
        self.norm = nn.GroupNorm(8 if out_ch % 8 == 0 else 4, out_ch)
        self.act = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.depth(x)
        x = self.point(x)
        x = self.norm(x)
        x = self.act(x)
        return x


class PlantaNetReLU(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        c1, c2, c3, c4 = 144, 224, 320, 448

        self.stem = nn.Sequential(
            nn.Conv2d(3, c1, 3, 1, 1, bias=False),
            nn.GroupNorm(8 if c1 % 8 == 0 else 4, c1),
            nn.ReLU(inplace=True),
        )

        self.block1 = nn.Sequential(
            DepthwiseSeparableConv(c1, c2, stride=2),
            nn.Dropout(0.15)
        )
        self.block2 = nn.Sequential(
            DepthwiseSeparableConv(c2, c3, stride=2),
            nn.Dropout(0.20)
        )
        self.block3 = nn.Sequential(
            DepthwiseSeparableConv(c3, c4, stride=2),
            nn.Dropout(0.25)
        )

        self.conv_extra = nn.Sequential(
            nn.Conv2d(c4, c4, 3, 1, 1, bias=False),
            nn.GroupNorm(8 if c4 % 8 == 0 else 4, c4),
            nn.ReLU(inplace=True),
            nn.Dropout(0.25),
        )

        self.gap = nn.AdaptiveAvgPool2d(1)

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(c4, 1024, bias=False),
            nn.GroupNorm(8 if 1024 % 8 == 0 else 4, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(1024, num_classes),
        )

    def forward(self, x):
        x = self.stem(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.conv_extra(x)
        x = self.gap(x)
        x = self.classifier(x)
        return x


def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


# Instantiate model & efficiency metrics
model = PlantaNetReLU(NUM_CLASSES)
params = count_parameters(model)
print(f"Model params: {params:,} ({params/1e6:.3f} M)")

# FLOPs
flops_M = None
if THOP_AVAILABLE:
    try:
        model_for_flops = PlantaNetReLU(NUM_CLASSES)
        dummy_cpu = torch.randn(1, 3, IMG_SIZE, IMG_SIZE)
        f, _ = profile(model_for_flops, inputs=(dummy_cpu,), verbose=False)
        flops_M = f / 1e6
    except Exception as e:
        print("FLOPs profiling failed:", e)

# Move model to device and DataParallel if possible
model = model.to(DEVICE)
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs with DataParallel.")
    model = nn.DataParallel(model)

# Inference time and model size
model.eval()
dummy = torch.randn(1, 3, IMG_SIZE, IMG_SIZE).to(DEVICE)
if DEVICE.type == "cuda":
    torch.cuda.synchronize()
start = time.time()
with torch.no_grad():
    for _ in range(30):
        _ = model(dummy)
if DEVICE.type == "cuda":
    torch.cuda.synchronize()
inf_ms = (time.time() - start) / 30 * 1000.0

tmp_path = os.path.join(OUT_DIR, "PlantaNet_ReLU_tmp.pth")
torch.save(model.state_dict(), tmp_path)
size_mb = os.path.getsize(tmp_path) / (1024 * 1024)
os.remove(tmp_path)

print(f"FLOPs={(round(flops_M,2) if flops_M else 'N/A')}M | Inference={inf_ms:.2f} ms | Size={size_mb:.2f} MB")


# Loss & optimizer (Config 3)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.Adamax(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)


Device: cuda | CUDA: True | GPUs: 2
Classes=51 | Train=95504, Val=20472, Test=20506
Model params: 2,579,955 (2.580 M)
Using 2 GPUs with DataParallel.
FLOPs=1213.9M | Inference=23.48 ms | Size=9.85 MB


In [3]:
# =========================
# Cell 2 — MixUp + Training + Test + Plots
# =========================

def mixup_data(x, y, alpha=MIXUP_ALPHA):
    """Returns mixed inputs, pairs of targets, and lambda."""
    if alpha <= 0:
        return x, y, y, 1.0
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, preds, y_a, y_b, lam):
    return lam * criterion(preds, y_a) + (1 - lam) * criterion(preds, y_b)


def train_one_epoch(loader, mixup_alpha=MIXUP_ALPHA):
    model.train()
    running_loss = 0.0
    total = 0
    start_time = time.time()

    for inputs, labels in loader:
        inputs = inputs.to(DEVICE, non_blocking=True)
        labels = labels.to(DEVICE, non_blocking=True)
        optimizer.zero_grad()

        inputs_m, y_a, y_b, lam = mixup_data(inputs, labels, alpha=mixup_alpha)
        outputs = model(inputs_m)
        loss = mixup_criterion(criterion, outputs, y_a, y_b, lam)

        loss.backward()
        optimizer.step()

        bs = inputs.size(0)
        running_loss += loss.item() * bs
        total += bs

    return running_loss / total, time.time() - start_time


def eval_one_epoch(loader):
    model.eval()
    running_loss = 0.0
    running_correct = 0
    total = 0
    preds_all, labels_all = [], []
    start_time = time.time()

    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(DEVICE, non_blocking=True)
            labels = labels.to(DEVICE, non_blocking=True)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            bs = inputs.size(0)
            running_loss += loss.item() * bs
            running_correct += torch.sum(preds == labels).item()
            total += bs

            preds_all.append(preds.cpu().numpy())
            labels_all.append(labels.cpu().numpy())

    preds_all = np.concatenate(preds_all) if preds_all else np.array([])
    labels_all = np.concatenate(labels_all) if labels_all else np.array([])
    return running_loss / total, running_correct / total, preds_all, labels_all, time.time() - start_time


# Main training loop
history = []
best_val_acc = 0.0
best_state = None
t0 = time.time()

print("\nTraining starts:")
for epoch in range(1, EPOCHS + 1):
    _mixup_loss, tr_time = train_one_epoch(train_loader, mixup_alpha=MIXUP_ALPHA)
    train_loss_clean, train_acc_clean, _, _, tr_eval_time = eval_one_epoch(train_loader)
    val_loss, val_acc, val_preds, val_labels, val_time = eval_one_epoch(val_loader)

    epoch_time = tr_time + tr_eval_time + val_time

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = copy.deepcopy(
            model.module.state_dict() if isinstance(model, nn.DataParallel) else model.state_dict()
        )
        torch.save(best_state, os.path.join(OUT_DIR, "PlantaNet_ReLU_best_weights.pth"))

    history.append({
        "epoch": epoch,
        "train_loss": float(train_loss_clean),
        "train_acc": float(train_acc_clean),
        "val_loss": float(val_loss),
        "val_acc": float(val_acc),
        "epoch_time_sec": float(epoch_time),
    })

    print(
        f"Epoch {epoch:03d}/{EPOCHS} | "
        f"train_loss={train_loss_clean:.4f} | train_acc={train_acc_clean:.4f} | "
        f"val_loss={val_loss:.4f} | val_acc={val_acc:.4f} | "
        f"time={epoch_time:.1f}s"
    )

print(f"\nTotal training time: {(time.time()-t0)/60:.2f} minutes")

hist_df = pd.DataFrame(history)
hist_df.to_csv(os.path.join(OUT_DIR, "PlantaNet_ReLU_history.csv"), index=False)


# Evaluate best model on test (if exists)
best_weights_path = os.path.join(OUT_DIR, "PlantaNet_ReLU_best_weights.pth")

if best_state is not None and os.path.exists(best_weights_path):
    eval_model = PlantaNetReLU(NUM_CLASSES)
    eval_model.load_state_dict(torch.load(best_weights_path, map_location=DEVICE), strict=False)
    eval_model = eval_model.to(DEVICE)
else:
    eval_model = model.module if isinstance(model, nn.DataParallel) else model
    eval_model = eval_model.to(DEVICE)

test_acc, prec, rec, f1 = None, None, None, None
y_true, y_pred = None, None

if test_loader is not None:
    eval_model.eval()
    preds_list, labels_list = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(DEVICE, non_blocking=True)
            labels = labels.to(DEVICE, non_blocking=True)
            outputs = eval_model(inputs)
            _, preds = torch.max(outputs, 1)
            preds_list.append(preds.cpu().numpy())
            labels_list.append(labels.cpu().numpy())

    y_pred = np.concatenate(preds_list)
    y_true = np.concatenate(labels_list)

    test_acc = float((y_pred == y_true).mean())
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="weighted", zero_division=0
    )

    print(
        f"\nTest results: TestAcc={test_acc:.4f}, "
        f"Precision={prec:.4f}, Recall={rec:.4f}, F1={f1:.4f}"
    )

    report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True, zero_division=0)
    pd.DataFrame(report).T.to_csv(os.path.join(OUT_DIR, "classification_report.csv"), index=False)

torch.save(eval_model.state_dict(), os.path.join(OUT_DIR, "PlantaNet_ReLU_final_weights.pth"))


# Plots
plt.figure(figsize=(8, 5))
plt.plot(hist_df["epoch"], hist_df["train_loss"], label="Train Loss")
plt.plot(hist_df["epoch"], hist_df["val_loss"], label="Val Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.legend()
plt.title("Loss Curve"); plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "loss_curve.png"), dpi=150)
plt.close()

plt.figure(figsize=(8, 5))
plt.plot(hist_df["epoch"], hist_df["train_acc"], label="Train Acc")
plt.plot(hist_df["epoch"], hist_df["val_acc"], label="Val Acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.legend()
plt.title("Accuracy Curve"); plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "accuracy_curve.png"), dpi=150)
plt.close()


# Confusion matrix if test exists
if y_true is not None:
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=False, cmap="Blues")
    plt.title("Confusion Matrix - PlantaNet_ReLU")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "confusion_matrix.png"), dpi=150)
    plt.close()


# Efficiency summary
summary = pd.DataFrame([{
    "Model": "PlantaNet_ReLU",
    "Params(M)": round(params / 1e6, 3),
    "FLOPs(M)": None if flops_M is None else round(flops_M, 2),
    "Inference(ms)": round(inf_ms, 2),
    "Size(MB)": round(size_mb, 2),
    "ValAcc(%)": round(best_val_acc * 100, 2),
    "TestAcc(%)": None if test_acc is None else round(test_acc * 100, 2),
    "Precision": None if prec is None else round(prec, 3),
    "Recall": None if rec is None else round(rec, 3),
    "F1": None if f1 is None else round(f1, 3),
}])

summary.to_csv(os.path.join(OUT_DIR, "PlantaNet_ReLU_efficiency_summary.csv"), index=False)
print("\nEfficiency summary:")
print(summary.to_string(index=False))



Training starts:
Epoch 001/100 | train_loss=1.3664 | train_acc=0.7744 | val_loss=1.3814 | val_acc=0.7649 | time=437.8s
Epoch 002/100 | train_loss=1.1221 | train_acc=0.8705 | val_loss=1.1191 | val_acc=0.8704 | time=413.3s
Epoch 003/100 | train_loss=0.9774 | train_acc=0.9250 | val_loss=0.9703 | val_acc=0.9264 | time=408.5s
Epoch 004/100 | train_loss=0.9243 | train_acc=0.9401 | val_loss=0.9203 | val_acc=0.9399 | time=412.0s
Epoch 005/100 | train_loss=0.8856 | train_acc=0.9558 | val_loss=0.8870 | val_acc=0.9526 | time=412.2s
Epoch 006/100 | train_loss=0.8532 | train_acc=0.9636 | val_loss=0.8536 | val_acc=0.9625 | time=410.7s
Epoch 007/100 | train_loss=0.8481 | train_acc=0.9657 | val_loss=0.8550 | val_acc=0.9630 | time=415.4s
Epoch 008/100 | train_loss=0.8277 | train_acc=0.9679 | val_loss=0.8233 | val_acc=0.9697 | time=413.3s
Epoch 009/100 | train_loss=0.8033 | train_acc=0.9806 | val_loss=0.8077 | val_acc=0.9789 | time=417.4s
Epoch 010/100 | train_loss=0.7935 | train_acc=0.9817 | val_loss=

In [4]:
# =========================
# Cell 3 — Grad-CAM (runs separately)
# =========================

print("\nIdentifying correctly classified samples for Grad-CAM...")

if test_ds is None:
    raise ValueError("No test dataset found. Put a test/ folder inside DATA_ROOT to run Grad-CAM.")

correct_samples = {cls: [] for cls in range(NUM_CLASSES)}
eval_model.eval()

with torch.no_grad():
    for path, label in test_ds.samples:
        img = Image.open(path).convert("RGB")
        tensor = eval_tfm(img).unsqueeze(0).to(DEVICE)
        out = eval_model(tensor)
        pred = out.argmax(1).item()
        if pred == label:
            correct_samples[label].append(path)

print("Done. Now generating Grad-CAM (one correctly classified image per class).")

gradcam_dir = os.path.join(OUT_DIR, "gradcam")
os.makedirs(gradcam_dir, exist_ok=True)

# Load weights safely
grad_model = PlantaNetReLU(NUM_CLASSES)

best_weights_path = os.path.join(OUT_DIR, "PlantaNet_ReLU_best_weights.pth")

if os.path.exists(best_weights_path):
    grad_model.load_state_dict(torch.load(best_weights_path, map_location=DEVICE), strict=False)
elif best_state is not None:
    grad_model.load_state_dict(best_state, strict=False)
else:
    print("No best weights found — Grad-CAM will use current model weights.")

grad_model = grad_model.to(DEVICE).eval()
target_layer = grad_model.conv_extra[0]


class GradCAMFixed:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        target_layer.register_forward_hook(self._save_activation)
        target_layer.register_full_backward_hook(self._save_gradient)

    def _save_activation(self, module, inp, out):
        self.activations = out.detach()

    def _save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()

    def generate(self, input_tensor, class_idx=None):
        self.model.zero_grad()
        input_tensor = input_tensor.to(DEVICE)
        outputs = self.model(input_tensor)

        if class_idx is None:
            class_idx = int(outputs.argmax(dim=1).item())

        score = outputs[:, class_idx]
        score.backward(retain_graph=False)

        grads = self.gradients
        acts = self.activations
        weights = grads.mean(dim=(2, 3), keepdim=True)
        cam = (weights * acts).sum(dim=1).squeeze(0)
        cam = torch.relu(cam)

        cam_np = cam.cpu().numpy()
        cam_np -= cam_np.min()
        cam_np /= (cam_np.max() + 1e-9)
        return cam_np


gradcam = GradCAMFixed(grad_model, target_layer)

mean = np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1)
std = np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1)

def denorm_tensor(img_tensor):
    arr = img_tensor.cpu().numpy()
    arr = (arr * std) + mean
    arr = np.clip(arr, 0, 1)
    return np.transpose(arr, (1, 2, 0))


for cls in range(NUM_CLASSES):
    if len(correct_samples[cls]) == 0:
        print(f"No correctly classified sample for class {class_names[cls]}")
        continue

    path = correct_samples[cls][0]
    try:
        pil = Image.open(path).convert("RGB")
        tensor = eval_tfm(pil).unsqueeze(0).to(DEVICE)
        cam_map = gradcam.generate(tensor, class_idx=cls)
    except Exception as e:
        print(f"Grad-CAM error for {path}: {e}")
        continue

    cam_resized = cv2.resize(cam_map, (IMG_SIZE, IMG_SIZE))
    heatmap = cv2.applyColorMap(np.uint8(255 * cam_resized), cv2.COLORMAP_JET)[:, :, ::-1]

    original = denorm_tensor(tensor[0])
    orig_uint8 = np.uint8(original * 255)
    heat_uint8 = np.uint8(heatmap)
    overlay = cv2.addWeighted(orig_uint8, 0.6, heat_uint8, 0.4, 0)

    cls_name = class_names[cls].replace("/", "_").replace(" ", "_")
    cls_folder = os.path.join(gradcam_dir, cls_name)
    os.makedirs(cls_folder, exist_ok=True)
    save_path = os.path.join(cls_folder, f"gradcam_{cls_name}.png")

    cv2.imwrite(save_path, overlay[:, :, ::-1])
    print("Saved Grad-CAM for class:", cls_name)

print("\nGrad-CAM generation completed. Saved under:", gradcam_dir)
print("\nFinished")



Identifying correctly classified samples for Grad-CAM...
Done. Now generating Grad-CAM (one correctly classified image per class).
Saved Grad-CAM for class: Apple___Apple_scab
Saved Grad-CAM for class: Apple___Black_rot
Saved Grad-CAM for class: Apple___Cedar_apple_rust
Saved Grad-CAM for class: Apple___healthy
Saved Grad-CAM for class: Banana___cordana
Saved Grad-CAM for class: Banana___healthy
Saved Grad-CAM for class: Banana___pestalotiopsis
Saved Grad-CAM for class: Banana___sigatoka
Saved Grad-CAM for class: Bean___angular_leaf_spot
Saved Grad-CAM for class: Bean___bean_rust
Saved Grad-CAM for class: Bean___healthy
Saved Grad-CAM for class: Blueberry___healthy
Saved Grad-CAM for class: Corn___Cercospora_leaf_spot_Gray_leaf_spot
Saved Grad-CAM for class: Corn___Common_rust_
Saved Grad-CAM for class: Corn___Northern_Leaf_Blight
Saved Grad-CAM for class: Corn___healthy
Saved Grad-CAM for class: Grape___Black_rot
Saved Grad-CAM for class: Grape___Esca_(Black_Measles)
Saved Grad-CAM f

In [5]:
import os, shutil

# Folder where Grad-CAM images are saved
gradcam_dir = os.path.join(OUT_DIR, "gradcam")   # or "gradcam_pp" for Grad-CAM++

# Output zip path
zip_path = os.path.join(OUT_DIR, "gradcam_results.zip")

# Remove old zip if it exists
if os.path.exists(zip_path):
    os.remove(zip_path)

# Make zip
shutil.make_archive(
    base_name=zip_path.replace(".zip",""),
    format="zip",
    root_dir=gradcam_dir
)

print("✅ Grad-CAM zipped at:", zip_path)
print("Download it from Kaggle → Output tab / Files panel.")


✅ Grad-CAM zipped at: /kaggle/working/plantanet_relu_final/gradcam_results.zip
Download it from Kaggle → Output tab / Files panel.
