In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory


# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
from glob import glob
from PIL import Image
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


DATA_DIR = "/kaggle/input/plantvillage-dataset"  # Change if path differs

MODALITIES = ["color", "grayscale", "segmented"]
IMAGE_SIZE = 224  # Standard for pretrained models


class MultiModalityDataset(Dataset):
    def __init__(self, samples, modality_transforms):
        """
        samples: list of (img_path, label_id, modality_name)
        modality_transforms: dict {modality_name: transform}
        """
        self.samples = samples
        self.transforms = modality_transforms

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label, modality = self.samples[idx]

        img = Image.open(img_path).convert("RGB")  # Convert all to RGB
        img = self.transforms[modality](img)

        return {
            "image": img,
            "label": torch.tensor(label, dtype=torch.long),
            "modality": modality
        }





In [None]:
# 1Ô∏è‚É£ Build class name ‚Üí ID mapping
class_names = sorted(next(os.walk(os.path.join(DATA_DIR, "color")))[1])
class_to_idx = {cls: i for i, cls in enumerate(class_names)}

# 2Ô∏è‚É£ Gather all samples (paths + labels + modality)
samples = []
for modality in MODALITIES:
    for cls in class_names:
        folder = os.path.join(DATA_DIR, modality, cls)
        for img_path in glob(os.path.join(folder, "*.jpg")):
            samples.append((img_path, class_to_idx[cls], modality))

print(f"Total samples found: {len(samples)}")




In [None]:
# 3Ô∏è‚É£ Train/Val/Test split
train_val, test = train_test_split(samples, test_size=0.15, shuffle=True, stratify=[s[1] for s in samples])
train, val = train_test_split(train_val, test_size=0.18, shuffle=True, stratify=[s[1] for s in train_val])
# Final: ~70% train / 15% val / 15% test

print(f"Train: {len(train)}, Val: {len(val)}, Test: {len(test)}")


def get_transforms(train=True):
    if train:
        return {
            "color": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.RandomHorizontalFlip(),
                transforms.RandomRotation(10),
                transforms.ColorJitter(0.2,0.2),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225]),
            ]),
            "grayscale": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.RandomHorizontalFlip(),
                transforms.RandomRotation(10),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[0.5, 0.5, 0.5])
            ]),
            "segmented": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[0.5, 0.5, 0.5])
            ]),
        }
    else:
        return {
            "color": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225]),
            ]),
            "grayscale": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[0.5, 0.5, 0.5])
            ]),
            "segmented": transforms.Compose([
                transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[0.5, 0.5, 0.5])
            ]),
        }




In [None]:
# 5Ô∏è‚É£ Build datasets
train_dataset = MultiModalityDataset(train, get_transforms(train=True))
val_dataset   = MultiModalityDataset(val, get_transforms(train=False))
test_dataset  = MultiModalityDataset(test, get_transforms(train=False))


# 6Ô∏è‚É£ DataLoaders
BATCH_SIZE = 32

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)


print("‚úÖ DataLoaders are ready!")

In [None]:
def make_subset(samples, ratio, seed=42):
    subset, _ = train_test_split(
        samples,
        train_size=ratio,
        stratify=[s[1] for s in samples],
        random_state=seed
    )
    return subset


In [None]:
# Small subset for quick testing
train_tiny = make_subset(train, 0.05)
train_tiny_dataset = MultiModalityDataset(train_tiny, get_transforms(train=True))
train_tiny_loader = DataLoader(train_tiny_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

# Medium subset for hyperparameter tuning
train_medium = make_subset(train, 0.3)
train_medium_dataset = MultiModalityDataset(train_medium, get_transforms(train=True))
train_medium_loader = DataLoader(train_medium_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)


In [None]:
import torch
import torch.nn as nn
from torchvision.models import vit_b_16, ViT_B_16_Weights
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
import copy
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# 1Ô∏è‚É£ Load pretrained ViT
weights = ViT_B_16_Weights.IMAGENET1K_V1
model = vit_b_16(weights=weights)


# 2Ô∏è‚É£ Freeze feature layers (freeze everything except head)
for param in model.parameters():
    param.requires_grad = False


# 3Ô∏è‚É£ Replace the classification head
num_classes = len(class_names)  # from previous cell
model.heads = nn.Sequential(
    nn.Linear(model.heads.head.in_features, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, num_classes)
)

model.to(device)


# 4Ô∏è‚É£ Define Loss & Optimizer (only head parameters train)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.heads.parameters(), lr=1e-4)
scheduler = StepLR(optimizer, step_size=3, gamma=0.1)


# 5Ô∏è‚É£ Training + Validation Loop
SAVE_PATH = "best_vit_model.pth"  # Saved in working directory


def train_model(num_epochs=1000, patience=100):
    best_val_loss = float("inf")
    best_model_wts = copy.deepcopy(model.state_dict())
    no_improve_epochs = 0

    for epoch in range(num_epochs):
        # ---------- Training ----------
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for batch in train_loader:
            images = batch["image"].to(device)
            labels = batch["label"].to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            train_correct += predicted.eq(labels).sum().item()
            train_total += labels.size(0)

        train_loss /= train_total
        train_acc = train_correct / train_total

        # ---------- Validation ----------
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch in val_loader:
                images = batch["image"].to(device)
                labels = batch["label"].to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                _, predicted = outputs.max(1)
                val_correct += predicted.eq(labels).sum().item()
                val_total += labels.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        # ---------- Scheduler step ----------
        scheduler.step()

        # ---------- Best model save ----------
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            torch.save(best_model_wts, SAVE_PATH)
            no_improve_epochs = 0
            improved = "‚úÖ (improved & saved)"
        else:
            no_improve_epochs += 1
            improved = ""

        print(f"Epoch [{epoch+1}/{num_epochs}] "
              f"| Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} "
              f"| Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} "
              + improved)

        # ---------- Optional Early Stopping ----------
        if patience is not None and no_improve_epochs >= patience:
            print(f"‚èπ Early stopping at epoch {epoch+1} ‚Äî no improvement for {patience} epochs.")
            break

    print("üèÅ Training finished!")

    # Load best weights before returning
    model.load_state_dict(best_model_wts)
    return model


model = train_model(num_epochs=1000, patience=100)
print("‚úÖ Best model restored & ready!")
