In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:
!pip install opendatasets

In [None]:
import opendatasets as od
od.download("https://www.kaggle.com/datasets/emmarex/plantdisease")

In [None]:
class PlantDiseaseDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # Load the image and apply transformations
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)

        return img, torch.tensor(label, dtype=torch.long)


def load_images(directory_root):
    image_list, label_list = [], []
    print("[INFO] Loading images...")

    for disease_folder in os.listdir(directory_root):
        disease_folder_path = os.path.join(directory_root, disease_folder)
        if not os.path.isdir(disease_folder_path):
            continue

        for img_name in os.listdir(disease_folder_path):
            if img_name.startswith("."):
                continue
            img_path = os.path.join(disease_folder_path, img_name)
            if img_path.lower().endswith(('.jpg', '.jpeg', '.png')):
                image_list.append(img_path)
                label_list.append(disease_folder)

    print("[INFO] Image loading completed")
    print(f"Total images: {len(image_list)}")
    return image_list, label_list

# Load images and labels
directory_root = "/content/plantdisease/PlantVillage" # Placeholder for your dataset path
image_paths, labels = load_images(directory_root)

# Encode labels as integers
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

# Train, validation, and test splits
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
    image_paths, labels_encoded, test_size=0.3, random_state=42, stratify=labels_encoded
)
valid_paths, test_paths, valid_labels, test_labels = train_test_split(
    temp_paths, temp_labels, test_size=0.5, random_state=42, stratify=temp_labels
)

print(f"Training samples: {len(train_paths)}")
print(f"Validation samples: {len(valid_paths)}")
print(f"Test samples: {len(test_paths)}")

In [None]:
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np

def visualize_class_distribution(labels, class_names):
    class_counts = Counter(labels)
    sorted_counts = sorted(class_counts.items(), key=lambda x: x[0])
    counts = [count for _, count in sorted_counts]
    class_labels = [class_names[idx] for idx, _ in sorted_counts]
    percentages = [count/len(labels)*100 for count in counts]

    plt.figure(figsize=(10,8))
    ax = plt.subplot(111, projection='polar')
    angles = np.linspace(0, 2*np.pi, len(counts), endpoint=False)
    colors = plt.cm.YlGn(np.linspace(0.4, 0.9, len(counts)))
    bars = ax.bar(angles, counts, width=2*np.pi/len(counts)*0.75, alpha=0.8, color=colors)

    for angle, count, percentage in zip(angles, counts, percentages):
        label_radius = count + (max(counts) * 0.08)
        label = f'{count:,}\n({percentage:.1f}%)'
        ax.text(angle, label_radius, label, ha='center', va='center', fontsize=10,
                rotation=np.degrees(angle) if -90 <= np.degrees(angle) <= 90 else np.degrees(angle) + 180)

    ax.set_title('Plant Disease Distribution\nTotal Samples: {:,}'.format(sum(counts)), y=1.05, fontsize=20, pad=20)
    ax.set_xticks(angles)
    ax.set_xticklabels(class_labels, fontsize=10)
    ax.grid(True, alpha=0.3, linestyle='--')
    ax.set_ylim(0, max(counts) * 1.2)
    plt.tight_layout()
    plt.show()

plt.style.use('classic')
plt.rcParams['figure.facecolor'] = 'white'
visualize_class_distribution(train_labels, label_encoder.classes_)

In [None]:
# Data Transformations
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),           # Data augmentation for training
    transforms.RandomRotation(30),              # Random rotation for variability
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization
])

valid_test_transform = transforms.Compose([
    transforms.Resize((256, 256)),              # Consistent resizing for validation/test
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Same normalization as training
])

# Create datasets with appropriate transformations
train_dataset = PlantDiseaseDataset(train_paths, train_labels, transform=train_transform)
valid_dataset = PlantDiseaseDataset(valid_paths, valid_labels, transform=valid_test_transform)
test_dataset = PlantDiseaseDataset(test_paths, test_labels, transform=valid_test_transform)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # Shuffle for training
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False) # No shuffle for validation/test
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)   # No shuffle for test

In [None]:
def visualize_samples(dataset, num_samples=5):
    plt.figure(figsize=(22, 5))
    for i in range(num_samples):
        img, label = dataset[i]
        img = img.permute(1, 2, 0).numpy()
        img = (img * 0.5) + 0.5  # De-normalize
        plt.subplot(1, num_samples, i + 1)
        plt.imshow(img)
        plt.title(label_encoder.inverse_transform([label])[0].replace('_',' '))  # Convert label back to class name
        plt.axis('off')
    plt.show()

visualize_samples(train_dataset)

In [None]:
for inputs, labels in test_loader:
    print(f"Batch inputs shape: {inputs.shape}")  # Should be [batch_size, 3, 128, 128]
    print(f"Batch labels shape: {labels.shape}")  # Should be [batch_size]
    print(f"First 5 samples labels: {labels[:5]}")   # Print first 5 labels
    break

In [None]:
class CustomCNN(nn.Module):
    def __init__(self, num_clasess):
        super(CustomCNN, self).__init__()
        # Convolution Block 1
        self.conv_block1=nn.Sequential(
            nn.Conv2d(3,32, kernel_size=3, padding="same"),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)  # Output: 32x128x128 (assuming input is 3x256x256)
        )
        # Convolution Block 2
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding='same'),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2) # Output: 64x64x64
        )

        # Convolution Block 3
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2) # Output: 128x32x32
        )
        # Convolution Block 4
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding="same"),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2) # Output: 256x16x16
        )
        # Global Average Pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1,1)) # Output: 256x1x1
        # Fully Connected Layers
        self.fc_block = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 128), # Adjusted input size after GAP
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128,num_clasess)
        )

    def forward(self,x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.global_avg_pool(x)
        x = self.fc_block(x)
        return x



In [None]:
# Initialize model, loss, optimizer
num_clasess = len(label_encoder.classes_)
print(f"Number of Clasess: {num_clasess}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device Used: {device}")

model = CustomCNN(num_clasess).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr = 0.002)

In [None]:
from torchsummary import summary
summary(model, input_size=(3,256,256))

In [None]:
class EarlyStopping:
    def __init__(self, patience =3, min_delta=0, save_path="best_model.pth"):
        self.patience = patience
        self.min_delta = min_delta
        self.save_path = save_path
        self.best_loss = float('inf')
        self.counter = 0

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            torch.save(model.state_dict(), self.save_path)  # Save the best model
            print(f"[INFO] Model checkpoint saved to {self.save_path}")
        else:
            self.counter += 1
            if self.counter >= self.patience:
                print("[INFO] Early stopping triggered.")
                return True
        return False

def evaluate_model(model, data_loader, criterion):
    model.eval() # Set model to evaluation mode
    val_loss = 0.0
    correct, total = 0, 0

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_loss /= len(data_loader)
    accuracy = correct / total *100
    return val_loss, accuracy

def train_model(model, train_loader, valid_loader, criterion, optimizer, epochs, early_stopping=None):
    train_losses,train_accuarcies, valid_losses, valid_accuracies = [], [], [], []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        progress_bar = tqdm(enumerate(train_loader), desc=f"Epoch {epoch+1}/{epochs}", total=len(train_loader))

        for batch_idx, (inputs, labels) in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            running_corrects+= torch.sum(preds == labels.data)
            progress_bar.set_postfix({'Train Loss': loss.item()})

        # Record training loss
        train_loss = running_loss / len(train_loader)
        train_losses.append(train_loss)
        train_accuracy = running_corrects.double() / len(train_loader.dataset) * 100
        train_accuarcies.append(train_accuracy.item())

        # Validation step
        val_loss, val_accuracy = evaluate_model(model, valid_loader, criterion)
        valid_losses.append(val_loss)
        valid_accuracies.append(val_accuracy)

        # Print epoch summary
        print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}, Val Accuracy = {val_accuracy:.2f}%")

        # Early stopping
        if early_stopping and early_stopping(val_loss, model):
            print("[INFO] Early stopping triggered.")
            break

    return train_losses,train_accuarcies, valid_losses, valid_accuracies



**TRAINING of The CNN model**

In [None]:
n_epochs = 50

early_stopping = EarlyStopping(patience=4, min_delta=0.01, save_path="best_model.pth")
train_losses, valid_losses, valid_accuracies = train_model(
    model, train_loader, valid_loader, criterion, optimizer, epochs=n_epochs, early_stopping=early_stopping
)

In [None]:
def plot_learning_curve(train_losses,train_accuarcies, valid_losses, valid_accuracies):
    plt.figure(figsize=(12, 6))

    # Loss curve
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="Train Loss")
    plt.plot(valid_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Loss Curve")
    plt.legend()

    # Accuracy curve
    plt.subplot(1, 2, 2)
    plt.plot(valid_accuracies, label="Validation Accuracy")
    plt.plot(train_accuarcies, label="Train Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy (%)")
    plt.title("Accuracy Curve")
    plt.legend()

    plt.show()

plot_learning_curve(train_losses, valid_losses, valid_accuracies)

In [None]:
# Load the best model
model.load_state_dict(torch.load("best_model.pth"))
print("[INFO] Best model loaded for final evaluation.")

final_val_loss, final_val_accuracy = evaluate_model(model, test_loader, criterion)
print(f"Final Evaluation -> test Loss: {final_val_loss:.4f}, test Accuracy: {final_val_accuracy:.2f}%")

# EFFICIENTNET MODEL


In [None]:
from torchvision import models
from torchsummary import summary

enet_model = models.efficientnet_b0(weights=True)
enet_model = enet_model.to(device) # Move the model to the device (GPU)
summary(enet_model, input_size=(3,256,256))

In [None]:
# Freezing the layers
for param in enet_model.parameters():
    param.requires_grad = False

In [None]:
num_ftrs = enet_model.classifier[1].in_features
enet_model.classifier[1] = nn.Linear(num_ftrs, num_clasess)
enet_model = enet_model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(enet_model.classifier[1].parameters(), lr=1e-4)

In [None]:
efficientnet_train_losses, efficientnet_val_losses = [], []
efficientnet_train_accuracies, efficientnet_val_accuracies = [], []

In [None]:
n_epochs = 50

early_stopping = EarlyStopping(patience=4, min_delta=0.01, save_path="best_enet_model.pth")
efficientnet_train_losses,efficientnet_train_accuracies, efficientnet_val_losses, efficientnet_val_accuracies = train_model(
    enet_model, train_loader, valid_loader, criterion, optimizer, epochs=n_epochs, early_stopping=early_stopping
)

In [None]:
plot_learning_curve(efficientnet_train_losses,efficientnet_train_accuracies, efficientnet_val_losses, efficientnet_val_accuracies)

In [None]:
from google.colab import files
files.download("best_enet_model.pth")

Testing on the test dataset

In [None]:
# Load the best model
enet_model.load_state_dict(torch.load("best_enet_model.pth"))
print("[INFO] Best model loaded for final evaluation.")

final_test_loss, final_test_accuracy = evaluate_model(enet_model, test_loader, criterion)
print(f"Final Evaluation -> test Loss: {final_test_loss:.4f}, test Accuracy: {final_test_accuracy:.2f}%")

# DIET MODEL

In [None]:
! pip install timm

In [None]:
import timm

num_classes = 15
batch_size = 32
epochs = 50
lr = 1e-4


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device Used: {device}")

In [None]:
# Data Transformations
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),           # Data augmentation for training
    transforms.RandomRotation(30),              # Random rotation for variability
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization
])

valid_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),              # Consistent resizing for validation/test
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Same normalization as training
])

# Create datasets with appropriate transformations
train_dataset = PlantDiseaseDataset(train_paths, train_labels, transform=train_transform)
valid_dataset = PlantDiseaseDataset(valid_paths, valid_labels, transform=valid_test_transform)
test_dataset = PlantDiseaseDataset(test_paths, test_labels, transform=valid_test_transform)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # Shuffle for training
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False) # No shuffle for validation/test
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)   # No shuffle for test

In [None]:
deit_model = timm.create_model(
    "deit_small_patch16_224",
    pretrained=True,
    num_classes=num_classes
)

In [None]:
for param in deit_model.parameters():
    param.requires_grad = False

deit_model.head = nn.Linear(deit_model.head.in_features, num_classes)

optimizer = torch.optim.AdamW(deit_model.head.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
deit_model = deit_model.to(device)

In [None]:
early_stopping = EarlyStopping(patience=4, min_delta=0.01, save_path="best_deit_model.pth")
deit_train_losses,deit_train_accuracies, deit_val_losses, deit_val_accuracies = train_model(
    deit_model, train_loader, valid_loader, criterion, optimizer, epochs=epochs, early_stopping=early_stopping
)

In [None]:
plot_learning_curve(deit_train_losses,deit_train_accuracies, deit_val_losses, deit_val_accuracies)