<a href="https://colab.research.google.com/github/shivaniaia/MLProjects/blob/main/MiniProjModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing Kaggle and Uploading the Dataset


In [None]:
#install kaggle
!pip install -q kaggle

In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"shivania123","key":"3f1f82080373948f47a9aa05e5fd51fe"}'}

In [None]:
!cp kaggle.json ~/.kaggle/

In [None]:
!pip install kagglehub
from google.colab import files
files.upload()
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json



Saving kaggle.json to kaggle (1).json


# Copying the Dataset Onto Google Colab Directory

In [None]:
import kagglehub
import os
import shutil
import time
import threading

# --- Your original download code ---
try:
    path = kagglehub.dataset_download("nahiduzzaman13/mulberry-leaf-dataset", force_download=True)
    print("Download completed.")
    print("Path to dataset files:", path)

    # --- Correctly find and copy the data ---
    source_folder_name = 'Mulberry Data'
    source_path = os.path.join(path, source_folder_name)
    destination_path = '/content/mulberry_data'

    print(f"Checking source path: {source_path}")
    if os.path.exists(source_path):
        print(f"Source path exists. Contents: {os.listdir(source_path)}")
        # Delete any previous incomplete directory to avoid errors
        if os.path.exists(destination_path):
            shutil.rmtree(destination_path)

        # Use copytree instead of move to ensure all contents are copied correctly
        shutil.copytree(source_path, destination_path)
        print(f"Successfully copied the final dataset to: {destination_path}")
        print("Final directory structure:")
        print(os.listdir(destination_path))
    else:
        print("Error: The 'Mulberry Data' folder was not found at the source path. Please check the downloaded folder's structure.")

except Exception as e:
    print(f"An error occurred: {e}")

Using Colab cache for faster access to the 'mulberry-leaf-dataset' dataset.
Download completed.
Path to dataset files: /kaggle/input/mulberry-leaf-dataset
Checking source path: /kaggle/input/mulberry-leaf-dataset/Mulberry Data
Source path exists. Contents: ['Disease Free leaves', 'Leaf Rust', 'Leaf spot']
Successfully copied the final dataset to: /content/mulberry_data
Final directory structure:
['Leaf Rust', 'Leaf spot', 'Disease Free leaves']


In [None]:
# Use the path from the previous output
downloaded_path = '/root/.cache/kagglehub/datasets/nahiduzzaman13/mulberry-leaf-dataset/versions/1'
!ls -F {downloaded_path}

In [None]:
# The dataset should now be in /content/mulberry_data after running the previous cell.
# You can verify this with the following command:
!ls -F /content/mulberry_data

# If the previous cell was successful, the FileNotFoundError in the data loading cells should now be resolved.

'Disease Free leaves'/	'Leaf Rust'/  'Leaf spot'/


In [None]:
import os
import shutil

# The read-only source directory where Kaggle has mounted the dataset.
source_path = '/kaggle/input/mulberry-leaf-dataset/Mulberry Data'

# The writable destination directory in your Colab session.
destination_path = '/content/mulberry_data'

# Check if the destination already exists and remove it to avoid errors
if os.path.exists(destination_path):
    shutil.rmtree(destination_path)

# Copy the entire directory tree
try:
    shutil.copytree(source_path, destination_path)
    print(f"Successfully copied the dataset to: {destination_path}")
    print("Final directory structure:")
    print(os.listdir(destination_path))

except Exception as e:
    print(f"An error occurred during copying: {e}")

Keeping session alive...
Successfully copied the dataset to: /content/mulberry_data
Final directory structure:
['Leaf Rust', 'Leaf spot', 'Disease Free leaves']


# Data Augmentation


Augmentation and Splitting of Dataset


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision.datasets import ImageFolder
from sklearn.model_selection import train_test_split
from glob import glob
from collections import Counter
import numpy as np
import cv2
import os
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define the image size for MobileNet
IMAGE_SIZE = 224

# Training augmentations
train_transforms = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

# Validation/Test augmentations
val_test_transforms = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

#-----------------------------------------------------------------------------
#SPLITTING THE DATASET

# Set the path to your consolidated dataset
data_dir = '/content/mulberry_data'

# Get all class names and image paths
all_class_names = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])
all_image_paths = glob(os.path.join(data_dir, '*', '*.jpg')) + \
                   glob(os.path.join(data_dir, '*', '*.jpeg')) + \
                   glob(os.path.join(data_dir, '*', '*.JPG'))

# Create labels for stratification
all_labels = [os.path.basename(os.path.dirname(p)) for p in all_image_paths]

# Split into train and temporary sets
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
    all_image_paths, all_labels, train_size=0.7, stratify=all_labels, random_state=42
)

# Split the temporary set into validation and test
val_paths, test_paths, val_labels, test_labels = train_test_split(
    temp_paths, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)

# A temporary, in-memory dataset class to handle the splits
class CustomDataset(Dataset):
    def __init__(self, paths, labels, class_names, transform=None):
        self.paths = paths
        self.labels = labels
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(class_names)}
        self.samples = [(p, self.class_to_idx[l]) for p, l in zip(paths, labels)]
        self.transform = transform
        self.classes = class_names

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image=image)['image']
        return image, target

# Create the final datasets
train_dataset = CustomDataset(train_paths, train_labels, all_class_names, transform=train_transforms)
val_dataset = CustomDataset(val_paths, val_labels, all_class_names, transform=val_test_transforms)
test_dataset = CustomDataset(test_paths, test_labels, all_class_names, transform=val_test_transforms)

# Calculate class weights for the WeightedRandomSampler
train_counts = Counter(train_labels)
total_samples = len(train_dataset)
class_weights = {cls: total_samples / count for cls, count in train_counts.items()}
sample_weights = [class_weights[label] for label in train_labels]
sample_weights = torch.DoubleTensor(sample_weights)

# Create the sampler
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))

# Create DataLoaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print("DataLoaders created successfully.")
print(f"Number of classes: {len(train_dataset.classes)}")
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")
print(f"Training set class distribution (before sampler): {train_counts}")

DataLoaders created successfully.
Number of classes: 3
Training samples: 763
Validation samples: 164
Test samples: 164
Training set class distribution (before sampler): Counter({'Leaf Rust': 342, 'Disease Free leaves': 308, 'Leaf spot': 113})


  original_init(self, **validated_kwargs)


# Importing the Pretrained MobileNetV3 Model

In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the MobileNetV3-Large model with pre-trained weights
weights = MobileNet_V3_Large_Weights.DEFAULT
model = mobilenet_v3_large(weights=weights)

# Freeze all layers in the feature extractor part of the model
for param in model.features.parameters():
    param.requires_grad = False

# Get the number of input features for the classifier head
num_ftrs = model.classifier[0].in_features

# Replace the classifier head with a new one for our number of classes
num_classes = 3  # Based on your output: Leaf Rust, Disease Free leaves, Leaf spot
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)

# Move the model to the GPU
model = model.to(device)

print("MobileNetV3-Large model loaded and modified. Ready for training.")

MobileNetV3-Large model loaded and modified. Ready for training.


# Training the Model

No need to keep doing this step because the best trained model has been saved to the google drive, hence skip to accessing the model

In [None]:
import torch.optim as optim
from tqdm import tqdm

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10):
    best_acc = 0.0
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        print("-" * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        scheduler.step()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        print(f"Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(val_loader.dataset)
        epoch_acc = running_corrects.double() / len(val_loader.dataset)

        print(f"Val Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

        # Save the best model
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print("Saved best model state.")

# Start training
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15)

Epoch 1/15
----------


Training:   0%|          | 0/24 [00:06<?, ?it/s]


KeyboardInterrupt: 

# Saving the Trained Model to Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
import os
import shutil

# Define the destination path in your Google Drive
# You can create a dedicated folder for your project
destination_folder = '/content/drive/My Drive/MulberryLeafProject'
os.makedirs(destination_folder, exist_ok=True)

# Define the source and destination paths for the model file
source_path = 'best_model.pth'
destination_path = os.path.join(destination_folder, 'best_model.pth')

# Move the file
shutil.move(source_path, destination_path)
print(f"Model successfully saved to {destination_path}")

# Accessing The Best Trained Model (Drive)

For running on CPU (not GPU)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights

# Define the path to your saved model file in Google Drive
model_path = '/content/drive/My Drive/MulberryLeafProject/best_model.pth'

# Define the model architecture exactly as you did before training
model = mobilenet_v3_large(weights=None)
num_ftrs = model.classifier[0].in_features
num_classes = 3
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)

# Load the saved weights, mapping them to the CPU
# This is the key fix for the RuntimeError
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval() # Set the model to evaluation mode

print("Model loaded successfully. You can now use it for predictions.")

Model loaded successfully. You can now use it for predictions.


For running on GPU i think


In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights

# Define the path to your saved model file in Google Drive
model_path = '/content/drive/MyDrive/MulberryLeafProject/best_model.pth'

# Define the model architecture exactly as you did before training
model = mobilenet_v3_large(weights=None)
num_ftrs = model.classifier[0].in_features
num_classes = 3
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)

# Load the saved weights into the model
model.load_state_dict(torch.load(model_path))
model.eval() # Set the model to evaluation mode

print("Model loaded successfully. You can now use it for predictions.")

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

# Testing The Model

For When you train the model from scratch again

In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights
from sklearn.metrics import classification_report
from tqdm import tqdm

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the MobileNetV3-Large model with pre-trained weights
weights = MobileNet_V3_Large_Weights.DEFAULT
model = mobilenet_v3_large(weights=weights)

# Freeze all layers in the feature extractor part of the model
for param in model.features.parameters():
    param.requires_grad = False

# Get the number of input features for the classifier head
num_ftrs = model.classifier[0].in_features

# Replace the classifier head with a new one for our number of classes
num_classes = 3
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)

# Load the best-trained weights
model.load_state_dict(torch.load('best_model.pth'))
model = model.to(device)

def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Testing"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Get class names for the report
    class_names = test_loader.dataset.classes

    # Print a detailed classification report
    print("\n" + "="*50)
    print("Classification Report on the Test Set")
    print("="*50)
    print(classification_report(all_labels, all_preds, target_names=class_names))

# Run the evaluation
evaluate_model(model, test_loader)

FileNotFoundError: [Errno 2] No such file or directory: 'best_model.pth'

Using the best trained model saved to drive

In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights
from sklearn.metrics import classification_report
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define the image size and transformations
IMAGE_SIZE = 224

# The transformations MUST be exactly the same as during the validation phase of training.
val_test_transforms = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

# --- Re-create your test dataset and loader with the correct transforms ---
# This code assumes you have the `test_paths`, `test_labels`, and `all_class_names` variables defined.
# If you don't, you need to re-run the data splitting code from before.

# A temporary, in-memory dataset class to handle the splits
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, paths, labels, class_names, transform=None):
        self.paths = paths
        self.labels = labels
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(class_names)}
        self.samples = [(p, self.class_to_idx[l]) for p, l in zip(paths, labels)]
        self.transform = transform
        self.classes = class_names

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image=image)['image']
        return image, target

# Load the test dataset with the correct transforms
test_dataset = CustomDataset(test_paths, test_labels, all_class_names, transform=val_test_transforms)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

# --- Now, run the evaluation using the correctly prepared data ---

# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the model and its weights
# (This code should be the same as your model loading script)
model_path = '/content/drive/My Drive/MulberryLeafProject/best_model.pth'
model = mobilenet_v3_large(weights=None)
num_ftrs = model.classifier[0].in_features
num_classes = 3
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.to(device)
model.eval()

# Run the evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Testing"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    class_names = test_loader.dataset.classes

    print("\n" + "="*50)
    print("Classification Report on the Test Set")
    print("="*50)
    print(classification_report(all_labels, all_preds, target_names=class_names))

evaluate_model(model, test_loader)

Testing: 100%|██████████| 6/6 [01:07<00:00, 11.27s/it]


Classification Report on the Test Set
                     precision    recall  f1-score   support

Disease Free leaves       0.99      1.00      0.99        66
          Leaf Rust       0.99      0.96      0.97        74
          Leaf spot       0.92      0.96      0.94        24

           accuracy                           0.98       164
          macro avg       0.96      0.97      0.97       164
       weighted avg       0.98      0.98      0.98       164






# Testing With Jpeg Array (Google)

In [None]:
!pip install requests Pillow



In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights
from PIL import Image
import requests
from io import BytesIO
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np

# Define the image size and transformations
IMAGE_SIZE = 224
val_test_transforms = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

# Define your classes
all_class_names = ['Disease Free leaves', 'Leaf Rust', 'Leaf spot']

# Function to load and predict a single image from a URL
def predict_from_url(model, image_url):
    try:
        # Download the image from the URL
        response = requests.get(image_url)
        img_bytes = BytesIO(response.content)
        pil_image = Image.open(img_bytes).convert("RGB") # Ensure it's in RGB format
    except Exception as e:
        print(f"Error downloading or opening image: {e}")
        return

    # Convert the PIL image to a NumPy array for Albumentations
    image = np.array(pil_image)

    # Apply the same transformations as the model was trained on
    transformed_image = val_test_transforms(image=image)["image"]

    # Add a batch dimension (1, 3, 224, 224)
    transformed_image = transformed_image.unsqueeze(0)

    # Move the image to the correct device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    transformed_image = transformed_image.to(device)
    model.to(device)

    # Make a prediction
    model.eval()
    with torch.no_grad():
        outputs = model(transformed_image)
        _, preds = torch.max(outputs, 1)

    # Get the predicted class name
    predicted_class = all_class_names[preds.item()]

    return predicted_class

# Load your model from Google Drive first (this part is from a previous answer)
model_path = '/content/drive/My Drive/MulberryLeafProject/best_model.pth'
model = mobilenet_v3_large(weights=None)
num_ftrs = model.classifier[0].in_features
num_classes = 3
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.Hardswish(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(1024, num_classes),
)
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

# --- Run the prediction with an image from the internet ---
# Replace this URL with any image URL you want to test
image_url = "https://www.shutterstock.com/image-photo/mulberry-leaf-on-white-background-260nw-295198253.jpg"

predicted_class = predict_from_url(model, image_url)
if predicted_class:
    print(f"\nPrediction for the image from the internet: {predicted_class}")


Prediction for the image from the internet: Disease Free leaves
