Imports and Initialization

In [11]:
import os
import csv
import shutil
import pandas as pd
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F

from torchvision import transforms

from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm  # For progress bar

Helper Functions

In [None]:
#Python script to generate csv

dataset_path = "C:/Users/azril/Downloads/Dataset_v1/Train"

op_csv = "C:/Users/azril/Downloads/Dataset_v1/csv/dataset_v1.csv"

with open(op_csv, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['S/N', 'Plant', 'Healthy/NotHealthy','Filepath'])

    serial_number = 1  # Initialize serial number

    for plant_folder in os.listdir(dataset_path):
        plant_path = os.path.join(dataset_path, plant_folder)

        if os.path.isdir(plant_path):
            for status in ['Healthy', 'NotHealthy']:
                status_path = os.path.join(plant_path, status)

                if os.path.isdir(status_path):
                    for image_file in os.listdir(status_path): 
                        image_path = os.path.join(status_path, image_file)
                        if os.path.isfile(image_path):
                            writer.writerow([serial_number, plant_folder, status, image_path])
                            serial_number += 1  # Increment serial number

print(f'csv file saved to {op_csv}')

Transformation

In [3]:
# Define transformations for training (Data Augmentation)
train_transforms = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to a fixed size
    transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
    transforms.RandomRotation(15),  # Rotate the image randomly by up to 15 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # Random color jitter
    transforms.ToTensor(),  # Convert the image to a tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize image with ImageNet values
])

# Define transformations for validation/testing (No Augmentation, only resizing and normalization)
valid_transforms = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to a fixed size
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize image with ImageNet values
])

# Print out the transformations to verify
print("Training Transforms:", train_transforms)
print("Validation Transforms:", valid_transforms)

Training Transforms: Compose(
    Resize(size=(128, 128), interpolation=bilinear, max_size=None, antialias=True)
    RandomHorizontalFlip(p=0.5)
    RandomRotation(degrees=[-15.0, 15.0], interpolation=nearest, expand=False, fill=0)
    ColorJitter(brightness=(0.8, 1.2), contrast=(0.8, 1.2), saturation=(0.8, 1.2), hue=(-0.2, 0.2))
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)
Validation Transforms: Compose(
    Resize(size=(128, 128), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)


Dataset Class

In [12]:
class PlantHealthCheckDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data = pd.read_csv(csv_file)  # Load the CSV
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = self.data.iloc[idx, 2]  # Image path from CSV
        label = self.data.iloc[idx, 1]  # Healthy/NotHealthy label
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path)  # Open the image
        
        if self.transform:
            image = self.transform(image)  # Apply transformations

        return image, label

Data Loaders

In [None]:
train_dataset = PlantHealthCheckDataset(
    csv_file='path_to_train_csv', 
    root_dir='path_to_train_images', 
    transform=train_transforms
)

valid_dataset = PlantHealthCheckDataset(
    csv_file='path_to_valid_csv', 
    root_dir='path_to_valid_images', 
    transform=valid_transforms
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

PlantHealthCheck CNN class

In [6]:
class PlantHealthCheckCNN(nn.Module):
    def __init__(self):
        super(PlantHealthCheckCNN, self).__init__()
        
        # Convolutional layers with ReLU activation and Max Pooling
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(128 * 16 * 16, 512)  # Adjust input size based on image size after convolutions
        self.fc2 = nn.Linear(512, 2)  # Output 2 classes: Healthy, NotHealthy
    
    def forward(self, x):
        # Apply convolution, activation (ReLU), and pooling for each layer
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)  # Max Pooling
        
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)  # Max Pooling
        
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2)  # Max Pooling
        
        # Flatten the output for the fully connected layers
        x = x.view(x.size(0), -1)  # Flatten (batch_size, 128 * 16 * 16)
        
        # Pass through fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # Output layer (2 classes: Healthy, NotHealthy)
        
        return x

Training

In [8]:
def train_model(model, train_loader, valid_loader, num_epochs=10, lr=0.001):
    # Define optimizer (Adam optimizer)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # Define loss function (CrossEntropyLoss for classification)
    criterion = nn.CrossEntropyLoss()
    
    # Track training and validation losses
    train_loss_history = []
    valid_loss_history = []
    
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        
        # Training loop
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch"):
            images, labels = images.to(device), labels.to(device)  # Move to GPU if available
            
            # Zero the gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(images)
            
            # Compute loss
            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            
            # Update weights
            optimizer.step()
            
            # Accumulate loss and accuracy
            train_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_train += (predicted == labels).sum().item()
            total_train += labels.size(0)
        
        # Average loss for this epoch
        train_loss /= total_train
        train_loss_history.append(train_loss)
        
        # Validation loop
        model.eval()  # Set model to evaluation mode
        valid_loss = 0.0
        correct_valid = 0
        total_valid = 0
        
        with torch.no_grad():  # No need to compute gradients for validation
            for images, labels in valid_loader:
                images, labels = images.to(device), labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                valid_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs, 1)
                correct_valid += (predicted == labels).sum().item()
                total_valid += labels.size(0)
        
        # Average validation loss for this epoch
        valid_loss /= total_valid
        valid_loss_history.append(valid_loss)
        
        # Print epoch statistics
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}")
        print(f"Train Accuracy: {100 * correct_train / total_train:.2f}%, Validation Accuracy: {100 * correct_valid / total_valid:.2f}%")
    
    return model, train_loss_history, valid_loss_history