In [1]:
!pip install timm opencv-python-headless scikit-learn "numpy<2" --quiet --force-reinstall

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.9/59.9 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.9/61.9 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.7/57.7 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m46.8 MB/s[0m eta [36m0:00:00[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.0/50.0 MB[0m [31m36.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.9/12.9 MB[0m [31m105.8 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m95.1 MB/s[0m eta [36m0:00:00[0m:

In [2]:
import warnings
warnings.filterwarnings('ignore')

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.transforms import functional as TF
import timm
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from PIL import Image
import os
import re
from pathlib import Path
import random
from tqdm.auto import tqdm
import pickle
import json
from collections import defaultdict
import gc
import sys

# Set seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Clear GPU memory
torch.cuda.empty_cache()
gc.collect()

0

In [3]:
class Config:
    DATA_PATH = '/kaggle/input/crop-diseases-2/Dataset for Crop Pest and Disease Detection/CCMT Dataset-Augmented'
    MODEL_NAME = 'mobilenetv3_large_100'
    NUM_CLASSES = 22
    IMG_SIZE = 224
    BATCH_SIZE = 64
    LEARNING_RATE = 2e-4
    WEIGHT_DECAY = 1e-5
    EPOCHS = 10
    N_FOLDS = 5
    EARLY_STOPPING_PATIENCE = 5
    SCHEDULER_PATIENCE = 3
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    MIXUP_ALPHA = 0.3
    OUTPUT_DIR = '/kaggle/working'
    MODELS_DIR = f'{OUTPUT_DIR}/models'
    PLOTS_DIR = f'{OUTPUT_DIR}/plots'
    
    # Memory optimization settings
    GRADIENT_ACCUMULATION_STEPS = 2
    USE_MIXED_PRECISION = True
    MAX_GRAD_NORM = 1.0

os.makedirs(Config.MODELS_DIR, exist_ok=True)
os.makedirs(Config.PLOTS_DIR, exist_ok=True)

# Enable memory optimization
if Config.USE_MIXED_PRECISION:
    from torch.cuda.amp import autocast, GradScaler
    scaler = GradScaler()

print(f"Using device: {Config.DEVICE}")
print(f"Model: {Config.MODEL_NAME}")
print(f"Image size: {Config.IMG_SIZE}")
print(f"Batch size: {Config.BATCH_SIZE}")
print(f"Mixed precision: {Config.USE_MIXED_PRECISION}")

Using device: cuda
Model: mobilenetv3_large_100
Image size: 224
Batch size: 64
Mixed precision: True


In [4]:
def prepare_dataframes(base_path):
    crops = ['Cashew', 'Cassava', 'Maize', 'Tomato']
    all_train_files, all_test_files, class_to_idx = [], [], {}
    current_idx = 0
    
    print("Scanning directories and cleaning names...")
    if not os.path.exists(base_path):
        print(f"ERROR: Base path not found at {base_path}.")
        return pd.DataFrame(), pd.DataFrame(), {}, {}

    for crop in crops:
        for phase, file_list in [('train_set', all_train_files), ('test_set', all_test_files)]:
            crop_path = os.path.join(base_path, crop, phase)
            if not os.path.exists(crop_path):
                continue
            
            for disease_folder in os.listdir(crop_path):
                if not os.path.isdir(os.path.join(crop_path, disease_folder)):
                    continue
                
                clean_disease_name = re.sub(r'\d+$', '', disease_folder).strip()
                composite_class_name = f"{crop}_{clean_disease_name}"
                
                if composite_class_name not in class_to_idx:
                    if phase == 'train_set':
                        class_to_idx[composite_class_name] = current_idx
                        current_idx += 1
                    else:
                        # Ensure test classes are also in train classes
                        if composite_class_name in class_to_idx:
                             pass
                        else:
                            continue

                class_idx = class_to_idx.get(composite_class_name)
                if class_idx is None: continue
                
                disease_path = os.path.join(crop_path, disease_folder)
                
                for filename in os.listdir(disease_path):
                    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
                        file_list.append({
                            'filepath': os.path.join(disease_path, filename),
                            'class_name': composite_class_name,
                            'class_idx': class_idx
                        })

    df_train = pd.DataFrame(all_train_files).sample(frac=1, random_state=42).reset_index(drop=True)
    df_test = pd.DataFrame(all_test_files).sample(frac=1, random_state=42).reset_index(drop=True)
    idx_to_class = {idx: cls for cls, idx in class_to_idx.items()}
    
    # Ensure test dataframe only contains classes present in the training set
    df_test = df_test[df_test['class_idx'].isin(df_train['class_idx'].unique())].reset_index(drop=True)

    Config.NUM_CLASSES = len(class_to_idx)
    print(f"\nFound {Config.NUM_CLASSES} unique classes after cleaning.")
    print(f"Total training images: {len(df_train)}")
    print(f"Total testing images: {len(df_test)}")
    
    return df_train, df_test, class_to_idx, idx_to_class

df_train, df_test, class_to_idx, idx_to_class = prepare_dataframes(Config.DATA_PATH)
if class_to_idx:
    with open(f'{Config.OUTPUT_DIR}/class_mappings.json', 'w') as f:
        json.dump({'class_to_idx': class_to_idx, 'idx_to_class': idx_to_class}, f)
    print("\nClass Distribution in Training Data:")
    print(df_train['class_name'].value_counts())
else:
    df_train = pd.DataFrame()


class LightweightAugmentation:
    def __init__(self, img_size=224):
        self.img_size = img_size
    
    def __call__(self, image):
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)
        if random.random() < 0.5:
            image = TF.hflip(image)
        if random.random() < 0.3:
            image = TF.vflip(image)
        if random.random() < 0.4:
            angle = random.uniform(-15, 15)
            image = TF.rotate(image, angle)
        image = TF.resize(image, int(self.img_size * 1.1))
        i, j, h, w = transforms.RandomCrop.get_params(image, (self.img_size, self.img_size))
        image = TF.crop(image, i, j, h, w)
        return image

def get_transforms(phase='train'):
    if phase == 'train':
        return transforms.Compose([
            LightweightAugmentation(Config.IMG_SIZE),
            transforms.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.05),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            transforms.RandomErasing(p=0.2, scale=(0.02, 0.08))
        ])
    else:
        return transforms.Compose([
            transforms.Lambda(lambda x: Image.fromarray(x) if isinstance(x, np.ndarray) else x),
            transforms.Resize((Config.IMG_SIZE, Config.IMG_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

Scanning directories and cleaning names...

Found 22 unique classes after cleaning.
Total training images: 80271
Total testing images: 24981

Class Distribution in Training Data:
class_name
Tomato_septoria leaf spot    9373
Cassava_bacterial blight     9195
Cashew_healthy               5877
Tomato_leaf blight           5200
Cashew_red rust              4751
Maize_streak virus           4043
Maize_leaf blight            4025
Maize_leaf beetle            3789
Cashew_leaf miner            3466
Cassava_brown spot           3250
Cassava_green mite           3246
Cashew_anthracnose           3102
Tomato_verticulium wilt      3100
Maize_leaf spot              3024
Maize_grasshoper             2575
Cassava_healthy              2271
Cassava_mosaic               2250
Tomato_leaf curl             2050
Tomato_healthy               2000
Cashew_gumosis               1714
Maize_fall armyworm          1140
Maize_healthy                 830
Name: count, dtype: int64


In [5]:
class CropDiseaseDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        label = torch.tensor(row['class_idx'], dtype=torch.long)
        image = cv2.imread(row['filepath'])
        if image is None:
            image = np.zeros((Config.IMG_SIZE, Config.IMG_SIZE, 3), dtype=np.uint8)
        else:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image)
        return image, label

In [6]:
class MobileNetV3CropModel(nn.Module):
    def __init__(self, model_name=Config.MODEL_NAME, num_classes=Config.NUM_CLASSES, pretrained=True):
        super().__init__()
        self.backbone = timm.create_model(model_name, pretrained=pretrained)
        num_features = self.backbone.num_features
        self.backbone.reset_classifier(0)
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.BatchNorm1d(num_features),
            nn.Dropout(0.2),
            nn.Linear(num_features, num_features // 4),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(num_features // 4),
            nn.Dropout(0.1),
            nn.Linear(num_features // 4, num_classes)
        )
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.classifier.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        features = self.backbone.forward_features(x)
        out = self.classifier(features)
        return out
        
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience, self.verbose, self.delta = patience, verbose, delta
        self.counter, self.best_score, self.early_stop = 0, None, False
        self.val_loss_min = np.Inf
        self.best_weights = None

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose: print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience: self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0
            self.save_checkpoint(val_loss, model)

    def save_checkpoint(self, val_loss, model):
        if self.verbose: print(f'Val loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
        self.best_weights = {k: v.cpu() for k, v in model.state_dict().items()}
        self.val_loss_min = val_loss

    def load_best_weights(self, model):
        if self.best_weights:
            model.load_state_dict({k: v.to(Config.DEVICE) for k, v in self.best_weights.items()})
        return model

class LabelSmoothingLoss(nn.Module):
    def __init__(self, num_classes, smoothing=0.1):
        super().__init__()
        self.num_classes, self.smoothing = num_classes, smoothing
        self.confidence = 1.0 - smoothing
    
    def forward(self, pred, target):
        pred = pred.log_softmax(dim=-1)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.num_classes - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=-1))

def mixup_data(x, y, alpha=0.3):
    lam = np.random.beta(alpha, alpha) if alpha > 0 else 1
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [7]:
def train_one_epoch(model, loader, criterion, optimizer, device, scaler=None):
    model.train()
    running_loss = 0.0
    total_correct = 0
    total_samples = 0
    
    optimizer.zero_grad()
    
    for batch_idx, (data, target) in enumerate(tqdm(loader, desc='[Train]')):
        data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
        
        # Use mixed precision if available
        if Config.USE_MIXED_PRECISION and scaler is not None:
            with autocast():
                # Apply mixup occasionally
                if np.random.rand() < 0.3 and Config.MIXUP_ALPHA > 0:
                    mixed_data, y_a, y_b, lam = mixup_data(data, target, Config.MIXUP_ALPHA)
                    output = model(mixed_data)
                    loss = mixup_criterion(criterion, output, y_a, y_b, lam)
                else:
                    output = model(data)
                    loss = criterion(output, target)
                
                # Normalize loss by gradient accumulation steps
                loss = loss / Config.GRADIENT_ACCUMULATION_STEPS
            
            scaler.scale(loss).backward()
            
            # Gradient accumulation
            if (batch_idx + 1) % Config.GRADIENT_ACCUMULATION_STEPS == 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), Config.MAX_GRAD_NORM)
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
        
        else:
            # Regular training without mixed precision
            if np.random.rand() < 0.3 and Config.MIXUP_ALPHA > 0:
                mixed_data, y_a, y_b, lam = mixup_data(data, target, Config.MIXUP_ALPHA)
                output = model(mixed_data)
                loss = mixup_criterion(criterion, output, y_a, y_b, lam)
            else:
                output = model(data)
                loss = criterion(output, target)
            
            loss = loss / Config.GRADIENT_ACCUMULATION_STEPS
            loss.backward()
            
            if (batch_idx + 1) % Config.GRADIENT_ACCUMULATION_STEPS == 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), Config.MAX_GRAD_NORM)
                optimizer.step()
                optimizer.zero_grad()
        
        running_loss += loss.item() * Config.GRADIENT_ACCUMULATION_STEPS * data.size(0)
        total_samples += target.size(0)
        
        # Calculate accuracy (only for non-mixup batches)
        if np.random.rand() >= 0.3 or Config.MIXUP_ALPHA <= 0:
            _, predicted = torch.max(output.data, 1)
            total_correct += (predicted == target).sum().item()
        
        # Clear cache periodically
        if batch_idx % 20 == 0:
            torch.cuda.empty_cache()
    
    epoch_loss = running_loss / total_samples
    epoch_acc = (total_correct / total_samples) * 100
    
    return epoch_loss, epoch_acc

def validate_one_epoch(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    total_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(tqdm(loader, desc='[Val]')):
            data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
            
            if Config.USE_MIXED_PRECISION:
                with autocast():
                    output = model(data)
                    loss = criterion(output, target)
            else:
                output = model(data)
                loss = criterion(output, target)
            
            running_loss += loss.item() * data.size(0)
            total_samples += target.size(0)
            
            _, predicted = torch.max(output.data, 1)
            total_correct += (predicted == target).sum().item()
            
            # Clear cache periodically
            if batch_idx % 20 == 0:
                torch.cuda.empty_cache()
    
    epoch_loss = running_loss / total_samples
    epoch_acc = (total_correct / total_samples) * 100
    
    return epoch_loss, epoch_acc

In [8]:
def train_kfold(df_train, n_folds=Config.N_FOLDS):
    if df_train.empty:
        print("Training df empty. Halting.")
        return [], None
    
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
    results = []
    oof_preds = np.zeros((len(df_train), Config.NUM_CLASSES))
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(df_train, df_train['class_idx'])):
        print(f"\n{'='*20} FOLD {fold+1}/{n_folds} {'='*20}")
        
        # Clear memory before each fold
        torch.cuda.empty_cache()
        gc.collect()
        
        train_df = df_train.iloc[train_idx]
        val_df = df_train.iloc[val_idx]
        
        # Data loaders with memory optimization
        train_loader = DataLoader(
            CropDiseaseDataset(train_df, get_transforms('train')),
            batch_size=Config.BATCH_SIZE,
            shuffle=True,
            num_workers=4,
            pin_memory=True,
            persistent_workers=True
        )
        
        val_loader = DataLoader(
            CropDiseaseDataset(val_df, get_transforms('val')),
            batch_size=Config.BATCH_SIZE * 2,
            shuffle=False,
            num_workers=2,
            pin_memory=True,
            persistent_workers=True
        )
        
        # Initialize model
        model = MobileNetV3CropModel().to(Config.DEVICE)
        
        # Optimizer with different learning rates for backbone and classifier
        backbone_params = [p for name, p in model.named_parameters() if 'backbone' in name]
        classifier_params = [p for name, p in model.named_parameters() if 'classifier' in name]
        
        optimizer = torch.optim.AdamW([
            {'params': backbone_params, 'lr': Config.LEARNING_RATE * 0.1},  # Lower LR for pretrained backbone
            {'params': classifier_params, 'lr': Config.LEARNING_RATE}
        ], weight_decay=Config.WEIGHT_DECAY)
        
        # Learning rate scheduler
        scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer, T_0=5, T_mult=2, eta_min=1e-6
        )
        
        # Loss function with label smoothing
        criterion = LabelSmoothingLoss(Config.NUM_CLASSES, smoothing=0.1).to(Config.DEVICE)
        early_stopping = EarlyStopping(patience=Config.EARLY_STOPPING_PATIENCE, verbose=True)
        
        # Mixed precision scaler
        fold_scaler = GradScaler() if Config.USE_MIXED_PRECISION else None
        
        history = defaultdict(list)
        
        for epoch in range(Config.EPOCHS):
            print(f"Epoch {epoch+1}/{Config.EPOCHS}")
            
            train_loss, train_acc = train_one_epoch(
                model, train_loader, criterion, optimizer, Config.DEVICE, fold_scaler
            )
            val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, Config.DEVICE)
            
            print(f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.2f}% | Val Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")
            
            history['train_loss'].append(train_loss)
            history['val_loss'].append(val_loss)
            history['train_acc'].append(train_acc)
            history['val_acc'].append(val_acc)
            
            scheduler.step()
            early_stopping(val_loss, model)
            
            if early_stopping.early_stop:
                print("Early stopping triggered.")
                break
        
        # Load best weights and save model
        model = early_stopping.load_best_weights(model)
        torch.save(model.state_dict(), f'{Config.MODELS_DIR}/mobilenet_model_fold_{fold+1}.pth')
        
        # Generate OOF predictions with memory optimization
        model.eval()
        fold_preds = []
        with torch.no_grad():
            for batch_idx, (data, _) in enumerate(tqdm(val_loader, desc="OOF Preds")):
                data = data.to(Config.DEVICE, non_blocking=True)
                
                if Config.USE_MIXED_PRECISION:
                    with autocast():
                        output = model(data)
                else:
                    output = model(data)
                
                probabilities = F.softmax(output, dim=1)
                fold_preds.append(probabilities.cpu().numpy())
                
                # Clear cache
                if batch_idx % 10 == 0:
                    torch.cuda.empty_cache()
        
        oof_preds[val_idx] = np.concatenate(fold_preds)
        
        results.append({'history': history})
        print(f"Fold {fold+1} Best Val Acc: {np.max(history['val_acc']):.2f}%")
        
        # Clean up memory
        del model, train_loader, val_loader, optimizer, scheduler, criterion
        torch.cuda.empty_cache()
        gc.collect()
    
    cv_mean_acc = np.mean([np.max(fold_result['history']['val_acc']) for fold_result in results])
    print(f"\nCV Mean Best Acc: {cv_mean_acc:.2f}%")
    
    # Save OOF predictions
    np.save(f'{Config.OUTPUT_DIR}/oof_predictions_mobilenet.npy', oof_preds)
    
    return results, oof_preds

In [None]:
fold_results, oof_predictions = train_kfold(df_train)




model.safetensors:   0%|          | 0.00/22.1M [00:00<?, ?B/s]

Epoch 1/10


[Train]:   0%|          | 0/1004 [00:00<?, ?it/s]

In [None]:
def plot_training_history(fold_results):
    if not fold_results:
        return
    
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    for i, result in enumerate(fold_results):
        history = result['history']
        axes[0].plot(history['val_loss'], label=f'Fold {i+1} Val Loss')
        axes[1].plot(history['val_acc'], label=f'Fold {i+1} Val Acc')
    
    axes[0].set_title('Validation Loss - MobileNetV3')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    axes[0].grid(True)
    
    axes[1].set_title('Validation Accuracy - MobileNetV3')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy (%)')
    axes[1].legend()
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.savefig(f'{Config.PLOTS_DIR}/mobilenet_training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

def analyze_oof_predictions(oof_preds, df, idx_to_class_map):
    if oof_preds is None or df.empty or not idx_to_class_map:
        return
    
    true_labels = df['class_idx'].values
    pred_labels = np.argmax(oof_preds, axis=1)
    
    target_names = [idx_to_class_map[i] for i in sorted(idx_to_class_map.keys())]
    
    oof_accuracy = accuracy_score(true_labels, pred_labels) * 100
    print(f"\nOOF Accuracy (MobileNetV3): {oof_accuracy:.2f}%")
    
    # Confusion Matrix
    cm = confusion_matrix(true_labels, pred_labels)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=target_names, yticklabels=target_names, cbar=False)
    plt.title('MobileNetV3 Out-of-Fold Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.savefig(f'{Config.PLOTS_DIR}/mobilenet_oof_confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()

# Plot results
plot_training_history(fold_results)
analyze_oof_predictions(oof_predictions, df_train, idx_to_class)

In [None]:
class MobileNetEnsemble:
    def __init__(self, model_paths, device):
        self.models = []
        self.device = device
        
        for path in model_paths:
            if os.path.exists(path):
                model = self.load_model(path, device)
                self.models.append(model)
        
        print(f"Loaded {len(self.models)} MobileNetV3 models for ensemble.")
    
    def load_model(self, path, device):
        model = MobileNetV3CropModel(pretrained=False)
        model.load_state_dict(torch.load(path, map_location=device))
        model.to(device)
        model.eval()
        return model
    
    def predict(self, loader):
        if not self.models:
            return None
        
        all_predictions = []
        
        with torch.no_grad():
            for batch_idx, (data, _) in enumerate(tqdm(loader, desc="Ensemble Inference")):
                data = data.to(self.device, non_blocking=True)
                
                # Get predictions from all models
                batch_predictions = []
                for model in self.models:
                    if Config.USE_MIXED_PRECISION:
                        with autocast():
                            output = model(data)
                    else:
                        output = model(data)
                    
                    probabilities = F.softmax(output, dim=1)
                    batch_predictions.append(probabilities)
                
                # Average predictions
                ensemble_pred = torch.stack(batch_predictions).mean(dim=0)
                all_predictions.append(ensemble_pred.cpu().numpy())
                
                # Clear cache
                if batch_idx % 10 == 0:
                    torch.cuda.empty_cache()
        
        return np.concatenate(all_predictions)

In [None]:
model_paths = [f'{Config.MODELS_DIR}/mobilenet_model_fold_{i+1}.pth' for i in range(Config.N_FOLDS)]
ensemble_model = MobileNetEnsemble(model_paths, Config.DEVICE)

In [None]:
def evaluate_test_set(df_test, ensemble, idx_to_class_map):
    if df_test.empty or not hasattr(ensemble, 'models') or not ensemble.models or not idx_to_class_map:
        print("Skipping test evaluation - missing data or models.")
        return
    
    print(f"\n{'='*20} Evaluating MobileNetV3 on Test Set {'='*20}")
    
    test_loader = DataLoader(
        CropDiseaseDataset(df_test, get_transforms('val')),
        batch_size=Config.BATCH_SIZE * 2, # Use larger batch for inference
        shuffle=False,
        num_workers=2,
        pin_memory=True
    )
    
    test_predictions = ensemble.predict(test_loader)
    
    if test_predictions is not None:
        true_labels = df_test['class_idx'].values
        pred_labels = np.argmax(test_predictions, axis=1)
        
        target_names = [idx_to_class_map[i] for i in sorted(idx_to_class_map.keys())]
        
        test_accuracy = accuracy_score(true_labels, pred_labels) * 100
        print(f"\nFinal Test Set Accuracy (Ensemble): {test_accuracy:.2f}%")
        
        print("\nClassification Report (Test Set):")
        print(classification_report(true_labels, pred_labels, target_names=target_names, digits=3))
        
        # Confusion Matrix for Test Set
        cm = confusion_matrix(true_labels, pred_labels)
        plt.figure(figsize=(12, 10))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Greens',
                    xticklabels=target_names, yticklabels=target_names, cbar=False)
        plt.title('MobileNetV3 Test Set Confusion Matrix (Ensemble)')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.xticks(rotation=45)
        plt.yticks(rotation=0)
        plt.tight_layout()
        plt.savefig(f'{Config.PLOTS_DIR}/mobilenet_test_confusion_matrix.png', dpi=300, bbox_inches='tight')
        plt.show()

evaluate_test_set(df_test, ensemble_model, idx_to_class)

In [None]:
import joblib
# This cell creates a self-contained prediction pipeline class and saves it using joblib.
# This is the recommended way to prepare a model for deployment in a web app.

class PredictionPipeline:
    def __init__(self, model, transforms, idx_to_class):
        # For deployment, it's best practice to force the model to CPU
        self.device = torch.device('cpu')
        self.model = model
        # Ensure all models in the ensemble are on the CPU and in eval mode
        for m in self.model.models:
            m.to(self.device)
            m.eval()
        self.transforms = transforms
        self.idx_to_class = idx_to_class

    def predict(self, image_path):
        """
        Takes the path to an image, preprocesses it, and returns the predicted class and confidence.
        """
        # 1. Load and preprocess image
        image = cv2.imread(image_path)
        if image is None:
            raise FileNotFoundError(f"Image not found at {image_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # 2. Apply transforms
        transformed_image = self.transforms(image)
        # Add batch dimension and move to CPU
        tensor = transformed_image.unsqueeze(0).to(self.device)

        # 3. Make prediction
        with torch.no_grad():
            # Get raw logits from each model in the ensemble
            batch_predictions = [m(tensor) for m in self.model.models]
            # Average the logits and apply softmax for probabilities
            ensemble_pred = torch.stack(batch_predictions).mean(dim=0)
            probabilities = F.softmax(ensemble_pred, dim=1)
        
        # 4. Get top prediction and confidence
        confidence, pred_idx = torch.max(probabilities, 1)
        pred_idx = pred_idx.item()
        
        # 5. Map index to class name
        predicted_class = self.idx_to_class.get(pred_idx, "Unknown Class")
        
        return predicted_class, confidence.item() * 100

# --- Create and save the pipeline ---

# 1. Get the necessary components
inference_transforms = get_transforms('val')
# Ensure integer keys for the map
int_idx_to_class = {int(k): v for k, v in idx_to_class.items()}

# 2. IMPORTANT: Create a new ensemble instance on the CPU for deployment
# This prevents saving the model with GPU dependencies.
ensemble_model_cpu = MobileNetEnsemble(model_paths, device=torch.device('cpu'))

# 3. Instantiate the pipeline
deployment_pipeline = PredictionPipeline(
    model=ensemble_model_cpu,
    transforms=inference_transforms,
    idx_to_class=int_idx_to_class
)

# 4. Save the entire pipeline object with joblib
pipeline_path = f'{Config.OUTPUT_DIR}/mobilenet_deployment_pipeline.joblib'
joblib.dump(deployment_pipeline, pipeline_path)

print(f"\n✅ Deployment pipeline saved successfully to: {pipeline_path}")

In [None]:
# This cell shows how to load the .joblib file and use it for prediction.
# You would use this same logic in your web application's backend.

print("--- Testing the saved deployment pipeline ---")

# 1. Load the saved pipeline
try:
    loaded_pipeline = joblib.load(pipeline_path)
    print("Pipeline loaded successfully.")

    # 2. Get a random image from the test set to test the pipeline
    if not df_test.empty:
        sample_row = df_test.sample(1).iloc[0]
        sample_image_path = sample_row['filepath']
        true_label = sample_row['class_name']
        
        print(f"\nTesting with image: {sample_image_path}")
        print(f"True Label: {true_label}")

        # 3. Make a prediction
        predicted_class, confidence = loaded_pipeline.predict(sample_image_path)
        
        # Display image
        img = Image.open(sample_image_path)
        plt.imshow(img)
        plt.title(f"Predicted: {predicted_class} ({confidence:.2f}%)\nTrue: {true_label}")
        plt.axis('off')
        plt.show()

        print(f"\nPredicted Disease: {predicted_class}")
        print(f"Confidence: {confidence:.2f}%")
    else:
        print("Test dataframe is empty, cannot demonstrate prediction.")

except FileNotFoundError:
    print(f"Error: Could not find the pipeline file at {pipeline_path}")
except Exception as e:
    print(f"An error occurred: {e}")