In [27]:
"""
Full optimization + hyperparameter tuning pipeline using Optuna for your CNN.
- Make sure configs.MULTIVIEW_TRAIN_DIR and configs.MULTIVIEW_TEST_DIR are set.
- Requires: torch, torchvision, optuna, tqdm, sklearn, matplotlib, seaborn, numpy
"""

import os
import sys
import time
import copy
import random
import warnings
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import DataLoader, Subset

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, balanced_accuracy_score, f1_score, precision_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.manifold import TSNE

import optuna
from optuna.integration import PyTorchLightningPruningCallback
from optuna.exceptions import TrialPruned

# Add your utils/configs path if needed
sys.path.append('../Utils')
import configs

warnings.filterwarnings("ignore")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [28]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)


In [29]:
def count_trainable_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# MixUp helper
def mixup_data(x, y, alpha=1.0):
    if alpha <= 0:
        return x, y, None, 1.0
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, preds, y_a, y_b, lam):
    return lam * criterion(preds, y_a) + (1 - lam) * criterion(preds, y_b)


In [30]:
class TunableDeepCNN(nn.Module):
    def __init__(self, num_classes=7, base_filters=32, dropout_p=0.5, use_dropout2d=True):
        super(TunableDeepCNN, self).__init__()
        f = base_filters
        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(1, f, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f),
            nn.ReLU(inplace=True),
            nn.Conv2d(f, f, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
        )
        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(f, f*2, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f*2),
            nn.ReLU(inplace=True),
            nn.Conv2d(f*2, f*2, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f*2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
        )
        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(f*2, f*4, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f*4),
            nn.ReLU(inplace=True),
            nn.Conv2d(f*4, f*4, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(f*4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        # Global Average Pooling -> produces f*4 features
        self.gap = nn.AdaptiveAvgPool2d((1, 1))

        # Fully connected
        self.fc1 = nn.Linear(f*4, max(128, f*4))
        self.fc2 = nn.Linear(max(128, f*4), num_classes)

        self.dropout = nn.Dropout(dropout_p)
        self.use_dropout2d = use_dropout2d
        self.dropout2d = nn.Dropout2d(0.3) if use_dropout2d else nn.Identity()

        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.block1(x)
        x = self.dropout2d(x)
        x = self.block2(x)
        x = self.dropout2d(x)
        x = self.block3(x)
        x = self.gap(x)               # [B, C, 1, 1]
        x = x.view(x.size(0), -1)     # [B, C]
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


In [31]:
# Standard transforms (we will create train/test transforms inside objective for augmentation choices)
base_transform_train = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=12),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

base_transform_test = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


In [32]:
# load full datasets once (transforms will be replaced by dataset.transform later)
full_train_dataset = datasets.ImageFolder(configs.MULTIVIEW_TRAIN_DIR, transform=base_transform_train)
full_test_dataset = datasets.ImageFolder(configs.MULTIVIEW_TEST_DIR, transform=base_transform_test)

NUM_CLASSES = len(full_train_dataset.classes)
print("Classes:", full_train_dataset.classes)
print("NUM_CLASSES:", NUM_CLASSES)


Classes: ['Ash', 'Beech', 'Douglas Fir', 'Oak', 'Pine', 'Red Oak', 'Spruce']
NUM_CLASSES: 7


In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, device, use_mixup=False, mixup_alpha=0.4):
    model.train()
    running_loss, running_corrects, total = 0, 0, 0

    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        if use_mixup:
            inputs, y_a, y_b, lam = mixup_data(inputs, labels, alpha=mixup_alpha)
            outputs = model(inputs)
            loss = mixup_criterion(criterion, outputs, y_a, y_b, lam)
            _, preds = torch.max(outputs, 1)
        else:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        if not use_mixup:
            running_corrects += torch.sum(preds == labels.data)
        total += inputs.size(0)

    return running_loss/total, (running_corrects.double()/total if not use_mixup else None)

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss, running_corrects, total = 0, 0, 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            running_loss += loss.item()*inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total += inputs.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return running_loss/total, running_corrects.double()/total, all_labels, all_preds


In [38]:
def objective(trial):
    set_seed(42)

    # Hyperparameters
    base_filters = trial.suggest_categorical("base_filters", [16,32,48])
    dropout_p = trial.suggest_uniform("dropout_p", 0.2,0.6)
    use_dropout2d = trial.suggest_categorical("use_dropout2d",[True,False])
    optimizer_name = trial.suggest_categorical("optimizer",["adam","sgd"])
    lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) if optimizer_name=="adam" else trial.suggest_loguniform("lr", 1e-4, 1e-1)
    weight_decay = trial.suggest_loguniform("weight_decay", 1e-7, 1e-3)
    batch_size = trial.suggest_categorical("batch_size",[16,32,48])
    mixup = trial.suggest_categorical("mixup",[False,True])
    mixup_alpha = trial.suggest_uniform("mixup_alpha",0.1,0.6) if mixup else 0
    label_smoothing = trial.suggest_uniform("label_smoothing",0.0,0.2)

    max_epochs, patience = 30, 8

    # Train/val split
    num_train = len(full_train_dataset)
    indices = list(range(num_train))
    random.shuffle(indices)
    split = int(0.2*num_train)
    val_idx, train_idx = indices[:split], indices[split:]

    train_subset, val_subset = Subset(full_train_dataset, train_idx), Subset(full_train_dataset, val_idx)
    train_labels = [full_train_dataset.targets[i] for i in train_idx]
    class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
    class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

    # Model & optimizer
    model = TunableDeepCNN(NUM_CLASSES, base_filters, dropout_p, use_dropout2d).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights_tensor, label_smoothing=label_smoothing)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) if optimizer_name=="adam" else \
                optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)

    best_val_acc, patience_counter, best_model_wts = 0,0,copy.deepcopy(model.state_dict())

    for epoch in range(max_epochs):
        train_loss, _ = train_one_epoch(model, train_loader, criterion, optimizer, device, mixup, mixup_alpha)
        val_loss, val_acc, _, _ = validate(model, val_loader, criterion, device)

        trial.report(val_acc, epoch)
        if trial.should_prune(): raise TrialPruned()

        if val_acc > best_val_acc:
            best_val_acc, best_model_wts, patience_counter = val_acc, copy.deepcopy(model.state_dict()), 0
        else:
            patience_counter += 1
        if patience_counter >= patience: break

    model.load_state_dict(best_model_wts)
    return float(best_val_acc)


In [39]:
def run_study(n_trials=20):
    study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=42))
    study.optimize(objective, n_trials=n_trials)

    print("Best trial value:", study.best_trial.value)
    print("Best params:", study.best_trial.params)
    return study

study = run_study(10)  # try 10 first, then increase


[I 2025-09-09 21:47:21,460] A new study created in memory with name: no-name-db136655-f140-41a2-b8e3-dd4056c3afbe


: 

In [None]:
def evaluate_best(study):
    best_params = study.best_trial.params
    model = TunableDeepCNN(NUM_CLASSES, best_params["base_filters"],
                           best_params["dropout_p"], best_params["use_dropout2d"]).to(device)

    # reload best state dict if saved, else retrain
    print("Evaluating with best params:", best_params)

    test_loader = DataLoader(full_test_dataset, batch_size=best_params.get("batch_size",32), shuffle=False)
    all_preds, all_labels = [], []
    model.eval()
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs,1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print("Accuracy:", accuracy_score(all_labels, all_preds))
    print("Report:\n", classification_report(all_labels, all_preds))

    cm = confusion_matrix(all_labels, all_preds)
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.show()

evaluate_best(study)
