In [None]:
!unzip /kaggle/input/leaf-classification/images.zip -d /kaggle/working/
!unzip /kaggle/input/leaf-classification/train.csv.zip -d /kaggle/working/

# Import used libraries

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from torchvision import transforms
from PIL import Image
import os
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.functional as F
import itertools
from tqdm import tqdm

# Visualizing a subset of the leaf images

In [None]:
def visualize_images(df, images_dir, num_images=5):
    plt.figure(figsize=(15, 8))
    random_rows = df.sample(num_images)
    for i, row_idx in enumerate(random_rows.index):
        
        img_id = row_idx
        img_path = os.path.join(images_dir, f'{img_id}.jpg')
        img = Image.open(img_path)
        plt.subplot(1, num_images, i + 1)
        plt.imshow(img, cmap='gray')
        plt.title(f'ID: {img_id}')
        plt.axis('off')
    plt.suptitle('Sample Leaf Images')
    plt.tight_layout(rect=[0, 0, 1, 0.95]) # Makes title not overlap
    plt.show()

train_df = pd.read_csv('/kaggle/working/train.csv')
visualize_images(train_df, '/kaggle/working/images')

# Load and prepare data for training

In [None]:
def load_and_preprocess_data(csv_path, images_dir):
    # Load data
    data = pd.read_csv(csv_path)

    # Separate shape, texture, and margin features
    shape_cols = [col for col in data.columns if 'shape' in col]
    texture_cols = [col for col in data.columns if 'texture' in col]
    margin_cols = [col for col in data.columns if 'margin' in col]

    features = data[shape_cols + texture_cols + margin_cols]
    labels = data['species']

    # Check for missing values
    print("\nMissing values:")
    print(features.isnull().sum().sum())

    # Check for duplicates
    print("\nDuplicate rows:")
    print(data.duplicated().sum())

    # Standardize features
    scaler = StandardScaler()
    features_scaled = pd.DataFrame(
        scaler.fit_transform(features),
        columns=features.columns,
        index=data['id']
    )

    # Encode labels
    label_encoder = LabelEncoder()
    labels_encoded = label_encoder.fit_transform(labels)

    print("Length of classes: ", len(label_encoder.classes_))

    return features_scaled, labels_encoded, label_encoder.classes_

# PyTorch Custom Dataset for Leaf Classification: Combining Image and Tabular Data

In [None]:
class LeafDataset(Dataset):
    """
    Custom Dataset class for leaf classification that handles both image and tabular data.
    
    Attributes:
        features (pd.DataFrame): Numerical features (shape, texture, margin)
        images_dir (str): Directory containing leaf images
        labels (array-like, optional): Class labels for each sample
        transform (callable, optional): Optional transform to be applied on images
        image_ids (array): Array of image IDs
    """
    def __init__(self, features, images_dir, labels=None, transform=None):
        self.features = features
        self.images_dir = images_dir
        self.labels = labels
        self.transform = transform
        self.image_ids = features.index.values

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        """
        Get a sample from the dataset.
        
        Args:
            idx (int): Index of the sample
            
        Returns:
            tuple: (image, numerical_features, label) if labels are provided,
                  (image, numerical_features) otherwise
        """
        img_id = self.image_ids[idx]
        img_path = os.path.join(self.images_dir, f'{img_id}.jpg')
        
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
            
        numerical_features = torch.FloatTensor(self.features.iloc[idx].values)
        
        if self.labels is not None:
            label = torch.LongTensor([self.labels[idx]])[0]
            return image, numerical_features, label
        return image, numerical_features

# Model Architecture

In [None]:
class LeafCNN(nn.Module):
    """
    Hybrid CNN model that processes both image and tabular data for leaf classification.
    
    The model consists of:
    1. CNN branch for processing images
    2. MLP branch for processing numerical features
    3. Combined layers for final classification
    
    Args:
        num_numerical_features (int): Number of numerical input features
        num_classes (int): Number of output classes
        dropout_rate (float): Dropout rate for regularization
    """
    def __init__(self, num_numerical_features, num_classes, dropout_rate=0.5):
        super(LeafCNN, self).__init__()
        
        # CNN architecture for image processing
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.AdaptiveAvgPool2d((1, 1))
        )
        
        # MLP for processing numerical features
        self.numerical_layers = nn.Sequential(
            nn.Linear(num_numerical_features, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        
        # Combined layers for final classification
        self.combined_layers = nn.Sequential(
            nn.Linear(256 + 256, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            
            nn.Linear(512, num_classes)
        )

    def forward(self, image, numerical_features):
        """
        Forward pass of the model.
        
        Args:
            image (torch.Tensor): Input image tensor
            numerical_features (torch.Tensor): Numerical feature tensor
            
        Returns:
            torch.Tensor: Model predictions
        """
        x_img = self.conv_layers(image)
        x_img = x_img.view(x_img.size(0), -1)
        
        x_num = self.numerical_layers(numerical_features)
        
        combined = torch.cat((x_img, x_num), dim=1)
        return self.combined_layers(combined)

# Training function

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, early_stopping_patience=10):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    best_val_loss = float('inf')
    best_epoch = 0
    epochs_without_improvement = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        correct_train_predictions = 0
        total_train_samples = 0
        for images, numerical_features, labels in train_loader:
            images, numerical_features, labels = images.to(device), numerical_features.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, numerical_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train_samples += labels.size(0)
            correct_train_predictions += (predicted == labels).sum().item()

        train_loss /= len(train_loader)
        train_accuracy = correct_train_predictions / total_train_samples
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        # Evaluate on Validation set
        model.eval()
        val_loss = 0.0
        correct_val_predictions = 0
        total_val_samples = 0
        with torch.no_grad():
            for images, numerical_features, labels in val_loader:
                images, numerical_features, labels = images.to(device), numerical_features.to(device), labels.to(device)
                outputs = model(images, numerical_features)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val_samples += labels.size(0)
                correct_val_predictions += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_accuracy = correct_val_predictions / total_val_samples
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f} - Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")
        #Scheduler step

        scheduler.step(val_loss) # This needs the validation loss to reduce the learning rate

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_epoch = epoch
            epochs_without_improvement = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
           epochs_without_improvement += 1

        if epochs_without_improvement >= early_stopping_patience:
           print(f"Early stopping triggered at epoch {epoch+1}")
           break

    print(f"Best validation loss: {best_val_loss:.4f}, found at epoch {best_epoch+1}")
    model.load_state_dict(torch.load('best_model.pth'))
    return train_losses, val_losses, train_accuracies, val_accuracies

# Evaluating the model on testing data

In [None]:
def evaluate_model(model, test_loader, criterion, device, classes):
    """
    Evaluate the model's performance on a test set.
    
    Args:
        model (nn.Module): Trained model
        test_loader (DataLoader): Test data loader
        criterion (nn.Module): Loss function
        device (torch.device): Device to run evaluation on
        classes (list): List of class names
        
    Returns:
        dict: Dictionary containing various performance metrics
    """
    model.eval()
    running_loss = 0.0
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for images, numerical_features, labels in test_loader:
            images = images.to(device)
            numerical_features = numerical_features.to(device)
            labels = labels.to(device)
            
            outputs = model(images, numerical_features)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    accuracy = np.mean(np.array(all_predictions) == np.array(all_labels))
    precision = precision_score(all_labels, all_predictions, average='weighted')
    recall = recall_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_predictions)
    
    # Calculate per-class metrics
    per_class_precision = precision_score(all_labels, all_predictions, average=None)
    per_class_recall = recall_score(all_labels, all_predictions, average=None)
    per_class_f1 = f1_score(all_labels, all_predictions, average=None)
    
    metrics = {
        'test_loss': running_loss / len(test_loader),
        'accuracy': accuracy * 100,
        'precision': precision * 100,
        'recall': recall * 100,
        'f1_score': f1 * 100,
        'confusion_matrix': conf_matrix,
        'per_class_metrics': {
            'precision': dict(zip(classes, per_class_precision)),
            'recall': dict(zip(classes, per_class_recall)),
            'f1': dict(zip(classes, per_class_f1))
        }
    }
    
    return metrics

# Function to plot the training and validation losess and accuracies cuvers during training

In [None]:
def plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies):
    """
    Plot training and validation curves.
    
    Args:
        train_losses (list): Training losses
        val_losses (list): Validation losses
        train_accuracies (list): Training accuracies
        val_accuracies (list): Validation accuracies
    """
    plt.figure(figsize=(12, 5))
    
    # Plot losses
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    # Plot accuracies
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

# Confusion matrix

In [None]:
def plot_confusion_matrix(conf_matrix, classes, top_k=10):
    class_totals = conf_matrix.sum(axis=1)
    top_classes_idx = np.argsort(class_totals)[-top_k:]
    
    cm_subset = conf_matrix[top_classes_idx][:, top_classes_idx]
    class_names_subset = [classes[i] for i in top_classes_idx]
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm_subset, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names_subset,
                yticklabels=class_names_subset)
    plt.title(f'Confusion Matrix (Top {top_k} Classes)')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.xticks(rotation=45)
    plt.yticks(rotation=45)
    plt.tight_layout()
    plt.show()

# Grid searching most optimal hyperparameters using 2 epochs

In [None]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)


def grid_search():
    """Main function to run grid search and training pipeline."""
    # Define hyperparameter grid
    param_grid = {
        'batch_size': [32, 64, 128],
        'learning_rate': [0.01, 0.001],
        'dropout_rate': [0.5, 0.7],
        'weight_decay': [1e-5, 1e-7],
        'optimizer_name': ['adam', 'sgd', 'rmsprop'],
        'scheduler_patience': [5, 10]
    }
    
    # Image transformations
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])
    
    # Load and preprocess data
    features_scaled, labels_encoded, classes = load_and_preprocess_data('/kaggle/working/train.csv', '/kaggle/working/images')
    
    # Split data into train, validation, and test sets
    X_temp, X_test, y_temp, y_test = train_test_split(
        features_scaled, labels_encoded,
        test_size=0.2, random_state=42, stratify=labels_encoded
    )
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp,
        test_size=0.2, random_state=42, stratify=y_temp
    )
    
    # Create datasets
    train_dataset = LeafDataset(X_train, '/kaggle/working/images', y_train, transform=transform)
    val_dataset = LeafDataset(X_val, '/kaggle/working/images', y_val, transform=transform)
    test_dataset = LeafDataset(X_test, '/kaggle/working/images', y_test, transform=transform)
    
    # Initialize device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Generate all combinations of hyperparameters
    param_combinations = [dict(zip(param_grid.keys(), v)) 
                         for v in itertools.product(*param_grid.values())]
    
    # Store results
    results = []
    best_val_accuracy = 0
    best_params = None
    best_model_state = None
    
    # Grid search
    for params in tqdm(param_combinations, desc="Grid Search Progress"):
        # Create dataloaders with current batch size
        train_loader = DataLoader(train_dataset, batch_size=params['batch_size'], shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=params['batch_size'])
        
        # Initialize model
        model = LeafCNN(
            num_numerical_features=X_train.shape[1],
            num_classes=len(classes),
            dropout_rate=params['dropout_rate']
        ).to(device)
        
        # Initialize optimizer
        if params['optimizer_name'] == 'adam':
            optimizer = optim.Adam(model.parameters(), 
                                 lr=params['learning_rate'], 
                                 weight_decay=params['weight_decay'])
        elif params['optimizer_name'] == 'sgd':
            optimizer = optim.SGD(model.parameters(), 
                                lr=params['learning_rate'], 
                                momentum=0.9, 
                                weight_decay=params['weight_decay'])
        else:  # adamw
            optimizer = optim.AdamW(model.parameters(), 
                                  lr=params['learning_rate'], 
                                  weight_decay=params['weight_decay'])
        
        # Initialize loss function and scheduler
        criterion = nn.CrossEntropyLoss()
        scheduler = ReduceLROnPlateau(optimizer, 
                                    mode='min', 
                                    factor=0.1, 
                                    patience=params['scheduler_patience'], 
                                    verbose=True)
        
        # Train model
        train_losses, val_losses, train_accuracies, val_accuracies = train_model(
            model, train_loader, val_loader, criterion, optimizer, scheduler, 
            num_epochs=2, device=device
        )
        
        # Get final validation accuracy
        final_val_accuracy = val_accuracies[-1]
        
        # Store results
        results.append({
            'params': params,
            'final_val_accuracy': final_val_accuracy,
            'final_val_loss': val_losses[-1]
        })
        
        # Update best model if necessary
        if final_val_accuracy > best_val_accuracy:
            best_val_accuracy = final_val_accuracy
            best_params = params
            best_model_state = model.state_dict()
        
        # Print current results
        print(f"\nParameters: {params}")
        print(f"Validation Accuracy: {final_val_accuracy:.2f}%")
        print(f"Validation Loss: {val_losses[-1]:.4f}")
    
    # Sort results by validation accuracy
    results.sort(key=lambda x: x['final_val_accuracy'], reverse=True)
    
    # Print top 5 configurations
    print("\nTop 5 Configurations:")
    for i, result in enumerate(results[:5]):
        print(f"\n{i+1}. Validation Accuracy: {result['final_val_accuracy']:.2f}%")
        print(f"Parameters: {result['params']}")
    
    # Load best model and evaluate on test set
    best_model = LeafCNN(
        num_numerical_features=X_train.shape[1],
        num_classes=len(classes),
        dropout_rate=best_params['dropout_rate']
    ).to(device)
    best_model.load_state_dict(best_model_state)
    
    test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])
    test_metrics = evaluate_model(best_model, test_loader, criterion, device, classes)
    
    # Print test metrics for best model
    print("\nBest Model Test Metrics:")
    print(f"Loss: {test_metrics['test_loss']:.4f}")
    print(f"Accuracy: {test_metrics['accuracy']:.2f}%")
    print(f"Precision: {test_metrics['precision']:.2f}%")
    print(f"Recall: {test_metrics['recall']:.2f}%")
    print(f"F1 Score: {test_metrics['f1_score']:.2f}%")
    
    # Plot confusion matrix
    plot_confusion_matrix(test_metrics['confusion_matrix'], classes)
    
    # Save best model
    torch.save({
        'model_state_dict': best_model_state,
        'best_params': best_params,
        'classes': classes,
        'test_metrics': test_metrics,
        'grid_search_results': results
    }, 'best_leaf_classifier_checkpoint.pth')

if __name__ == '__main__':
    grid_search()

In [None]:
def best_model():
    """Main function to run the training and evaluation pipeline."""
    # Hyperparameters
    batch_size = 32
    learning_rate = 0.01
    num_epochs = 30
    dropout_rate = 0.5
    weight_decay = 1e-7
    early_stopping_patience = 10  # Patience for early stopping

    # Image transformations
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])

    # Load and preprocess data
    features_scaled, labels_encoded, classes = load_and_preprocess_data('/kaggle/working/train.csv', '/kaggle/working/images')

    # Split data into train, validation, and test sets
    X_temp, X_test, y_temp, y_test = train_test_split(
        features_scaled, labels_encoded,
        test_size=0.2, random_state=42, stratify=labels_encoded
    )

    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp,
        test_size=0.2, random_state=42, stratify=y_temp
    )

    # Create datasets and dataloaders
    train_dataset = LeafDataset(X_train, '/kaggle/working/images', y_train, transform=transform)
    val_dataset = LeafDataset(X_val, '/kaggle/working/images', y_val, transform=transform)
    test_dataset = LeafDataset(X_test, '/kaggle/working/images', y_test, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    # Initialize model and training components
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = LeafCNN(
        num_numerical_features=X_train.shape[1],
        num_classes=len(classes),
        dropout_rate=dropout_rate
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    # optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    # scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True) # Patience must be the same as early_stopping

    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)  # Patience must be the same as early_stopping


    # Train model
    train_losses, val_losses, train_accuracies, val_accuracies = train_model(
        model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, early_stopping_patience
    )

    # Plot training curves
    plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies)

    # Evaluate model on test set
    test_metrics = evaluate_model(model, test_loader, criterion, device, classes)

    # Print test metrics
    print("\nTest Set Metrics:")
    print(f"Loss: {test_metrics['test_loss']:.4f}")
    print(f"Accuracy: {test_metrics['accuracy']:.2f}%")
    print(f"Precision: {test_metrics['precision']:.2f}%")
    print(f"Recall: {test_metrics['recall']:.2f}%")
    print(f"F1 Score: {test_metrics['f1_score']:.2f}%")

    # Plot confusion matrix
    plot_confusion_matrix(test_metrics['confusion_matrix'], classes)

    # Save model
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'classes': classes,
        'test_metrics': test_metrics
    }, 'best_model.pth')


best_model()