In [None]:
# !git clone https://github.com/williamjohn972/PlantVillage_Leaf_Disease_Classification.git
# %cd PlantVillage_Leaf_Disease_Classification
# !pip install optuna

# from google.colab import files
# files.upload()   # Upload your kaggle.json here

# !mkdir -p ~/.kaggle
# !mv kaggle.json ~/.kaggle/
# !chmod 600 ~/.kaggle/kaggle.json

# !kaggle datasets download -d emmarex/plantdisease --unzip

In [1]:
import sys
import os

# Set the project root directory
project_root = os.path.abspath("..")   # one level above /notebooks
sys.path.append(project_root)

**Imports**

In [2]:
import numpy as np
import random

import optuna
import torch
import torch.nn as nn 
import torch.optim as optim 

from src.train import Trainer
from src.train_utils import build_model, build_trainer, build_optim, calc_class_weights
from src.dataset import get_datasets, create_data_loaders 

  from .autonotebook import tqdm as notebook_tqdm


**Setting Seed**

In [3]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [4]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

train_set, val_set, test_set, classes = get_datasets("../data/PlantVillage",val_split=0.1,test_split=0.1)
num_classes = len(classes)

class_weights = calc_class_weights(train_set, DEVICE) # These weights are to be passed into the loss function later 


In [5]:
print(f"{'-'*10} Class Weights {'-'*10}")
for target, weight in enumerate(class_weights):

    print(f"Class {target}: {weight:.3f}")

---------- Class Weights ----------
Class 0: 0.802
Class 1: 0.541
Class 2: 0.799
Class 3: 0.799
Class 4: 5.240
Class 5: 0.376
Class 6: 0.799
Class 7: 0.419
Class 8: 0.839
Class 9: 0.451
Class 10: 0.477
Class 11: 0.569
Class 12: 0.249
Class 13: 2.138
Class 14: 0.502


# **HyperParam Tuning**

In [10]:
class OptunaPruner:

    def __init__(self, trial: optuna.Trial):
        self.trial = trial

    def __call__(self, trainer: Trainer):

        # Report F1 score (Maximizing metric)
        val_f1_score = trainer.history['val_f1s'][-1]
        self.trial.report(val_f1_score, trainer.cur_epoch)

        # Check for Pruning
        if self.trial.should_prune():
            print(f"Trial {self.trial.number} pruned. Exiting training loop at epoch {trainer.cur_epoch}...")
            raise optuna.TrialPruned()

In [None]:
EPOCHS = 25

def objective(trial: optuna.Trial):

    # Model HyperParams  
    lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
    batch_size = trial.suggest_int("batch_size", 32, 256, step=16)
    dropout = trial.suggest_float("dropout", 0.0, 0.7, step=0.1)

    
    # Other Params
    early_stopper_patience = 5
    delta = 1e-4
    lr_patience = early_stopper_patience // 2
    lr_factor = 0.1
    min_lr = 1e-6

    
    # Dataloaders
    train_loader, val_loader, _  = create_data_loaders(train_dataset=train_set,
                                                       val_dataset=val_set,
                                                       test_dataset=None,
                                                       batch_size=batch_size)
    
    # Model
    model = build_model(num_classes=num_classes, dropout=dropout, freeze_base=True,
                        device=DEVICE)
    
    # Loss, Optimizer
    loss_fn = nn.CrossEntropyLoss(weight=class_weights) 
    optim = build_optim(model=model, lr=lr, weight_decay=weight_decay)

                                  
    # Trainer
    trainer = build_trainer(model, loss_fn, optim,
                            early_stopper_patience, delta, False, None,
                            lr_factor, lr_patience, min_lr,
                            DEVICE)

    # Optuna Pruner 
    optuna_pruner = OptunaPruner(trial)
    
    # Train Model
    trainer.train_val_model(epochs=EPOCHS,
                            train_loader=train_loader,
                            val_loader=val_loader, 
                            callback= optuna_pruner)
    
    val_f1 = max(trainer.history['val_f1s'])

    return val_f1

In [None]:
pruner = optuna.pruners.MedianPruner(
    n_startup_trials=5,
    n_warmup_steps=5,
    interval_steps=1,
)

sampler = optuna.samplers.TPESampler(seed=SEED)

study = optuna.create_study(
    study_name="plant_village_study",
    direction="maximize",    # because we want to maximise the f1 score 
    storage="sqlite:///../data/HyperParams/plant_village_study.db",
    load_if_exists=True,
    sampler=sampler,
    pruner=pruner)  

study.optimize(objective, n_trials=20)

print(f"Best trial: {study.best_trial.number}")
print(f"Best val f1-macro: {study.best_value}")
print(f"Best hyperparameters: {study.best_params}")

Best trial: 14
Best val f1-macro: 0.9048782310909461
Best hyperparameters: {'lr': 0.0009753707380279384, 'weight_decay': 2.5208757425689212e-05, 'batch_size': 160, 'dropout': 0.0}


### **Visualize Tuning Results**

In [7]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_parallel_coordinate(study)

In [13]:
optuna.visualization.plot_slice(study)

In [14]:
optuna.visualization.plot_param_importances(study)

# **ReTrain the Model Using Best HyperParam Combinations**

In [11]:
study = optuna.load_study(study_name="plant_village_study", storage="sqlite:///../data/HyperParams/plant_village_study.db")

### **Training Phase 1 - Frozen BackBone**

### **Training Phase 2 - Unfrozen**

In [None]:
# # Load Best Model
# best_model_checkpoint = 
# new_best_model_checkpoint = 

# model = build_model(num_classes=num_classes, dropout=, freeze_base=False)
# model.load_state_dict(torch.load(best_model_checkpoint['model_state_dict']))


# # Lower lr
# lr = 


# # Data Loaders
# train_loader, val_loader, _  = create_data_loaders(train_dataset=train_set,
#                                                     val_dataset=val_set,
#                                                     test_dataset=None,
#                                                     batch_size=batch_size)


# # Loss, Optimizer
# loss_fn = nn.CrossEntropyLoss()
# optim = build_optim(model=model, lr=lr, weight_decay=weight_decay)

                               
# # Trainer
# trainer = build_trainer(model, loss_fn, optim,
#                         early_stopper_patience, delta, checkpoint_path,
#                         lr_factor, lr_patience, min_lr,
#                         DEVICE)

# # Train Model
# trainer.train_val_model(epochs=EPOCHS,
#                         train_loader=train_loader,
#                         val_loader=val_loader)


In [46]:
# Plot Metric Curves 

In [None]:
# _, _, test_loader  = create_data_loaders(train_dataset=None, val_dataset=None,
#                                          test_dataset=test_set)

In [None]:
# def are_dataloaders_identical(dl1, dl2):
#     # Check if lengths are the same
#     if len(dl1) != len(dl2):
#         return False
    
#     # Iterate through both dataloaders simultaneously
#     for batch1, batch2 in zip(dl1, dl2):
#         # Assuming batches are tensors or tuples of tensors
#         if isinstance(batch1, torch.Tensor):
#             if not torch.equal(batch1, batch2):
#                 return False
#         elif isinstance(batch1, (list, tuple)):
#             for t1, t2 in zip(batch1, batch2):
#                 if not torch.equal(t1, t2):
#                     return False
#         else:
#             # Handle other data types in batches as needed
#             if batch1 != batch2:
#                 return False
#     return True