In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Load packages

In [None]:
import os
import torch
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    classification_report, confusion_matrix
)
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch.optim as optim
import random
import copy
import time
from itertools import product


### Reproducibility & Device

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


### Define Folder Paths

In [None]:
# Benchmark configuration
benchmark_dir = '/content/drive/MyDrive/Thesis/benchmark_test/test_v5'
save_root = '/content/drive/MyDrive/Thesis/results/'

train_dir = '/content/drive/MyDrive/Thesis/no_preprocessing/train'  # CHANGE this to your actual path
val_dir   = '/content/drive/MyDrive/Thesis/no_preprocessing/valid'
test_dir  = '/content/drive/MyDrive/Thesis/no_preprocessing/test'


In [None]:
def confidence_interval(data, confidence=0.95):
    a = 1.0 * np.array(data)
    n = len(a)
    m = np.mean(a)
    se = np.std(a, ddof=1) / np.sqrt(n)
    h = se * 1.96  # 95% CI
    return m, m - h, m + h


### Transforms

In [None]:
# Data augmentation transforms for training
train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(30),
    transforms.RandomAffine(degrees=0, scale=(0.9, 1.1), shear=10),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# For validation and test (no augmentation!)
val_test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

### Create Datasets & Dataloaders with ImageFolder

In [None]:
batch_size = 32  # Adjust as needed

train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset   = datasets.ImageFolder(val_dir,   transform=val_test_transforms)
test_dataset  = datasets.ImageFolder(test_dir,  transform=val_test_transforms)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,  num_workers=2)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False, num_workers=2)

# Classes (in alphabetical order by default)
class_names = train_dataset.classes
benchmark_class_names = val_dataset.classes
num_classes = len(class_names)
print("Classes found:", class_names)
print(f"Num classes = {num_classes}")

Classes found: ['Clear Water', 'Common reed', 'Duckweed', 'Other', 'Water-starwort']
Num classes = 5


In [None]:
benchmark_dataset = datasets.ImageFolder(
    '/content/drive/MyDrive/Thesis/benchmark_test/test_v5',
    transform=val_test_transforms  # Important: same transforms as validation/test
)

benchmark_loader = DataLoader(benchmark_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
benchmark_class_names = benchmark_dataset.classes


### Define Models (ResNet, VGG, EfficientNet, DenseNet)

**ResNet50**

In [None]:
def create_resnet50(num_classes):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False  # freeze all layers initially
    # Unfreeze last two residual blocks and classifier
    for param in model.layer3.parameters():
        param.requires_grad = True
    for param in model.layer4.parameters():
        param.requires_grad = True
    for param in model.fc.parameters():
        param.requires_grad = True
    # Replace final layer
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    return model


**VGG16**

In [None]:
def create_vgg16(num_classes, dropout_rate=0.4):
    model = models.vgg16(pretrained=True)

    # Freeze all parameters first
    for param in model.parameters():
        param.requires_grad = False

    num_features = model.classifier[6].in_features
    model.classifier[6] = nn.Sequential(
    nn.Dropout(0.4),
    nn.Linear(num_features, num_classes)
    )
    # Optionally fine-tune the last conv block and final classifier layer
    for param in model.features[20:].parameters():  # last conv block
        param.requires_grad = True
    for param in model.classifier[6].parameters():  # final layer
        param.requires_grad = True
    return model


    # All layers in the new classifier are trainable by default — no need to set requires_grad again

    return model



**VGG19**


In [None]:
def create_vgg19(num_classes):
    model = models.vgg19(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    # Replace the classifier head
    num_features = model.classifier[6].in_features
    model.classifier[6] = nn.Sequential(
    nn.Dropout(0.4),
    nn.Linear(num_features, num_classes)
    )
    # Optionally fine-tune the last conv block and final classifier layer
    for param in model.features[24:].parameters():  # last conv block
        param.requires_grad = True
    for param in model.classifier[6].parameters():  # final layer
        param.requires_grad = True
    return model


**EfficientNet**

In [None]:
def create_efficientnet_b0(num_classes):
    model = models.efficientnet_b0(pretrained=True)
    # Freeze all
    for param in model.parameters():
        param.requires_grad = False
    # Unfreeze the last 2 blocks
    for param in model.features[6:].parameters():
        param.requires_grad = True
    # Modify the classifier
    num_ftrs = model.classifier[1].in_features
    model.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True),
    nn.Linear(num_ftrs, num_classes)
    )
    return model

**DenseNet**

In [None]:
def create_densenet121(num_classes):
    model = models.densenet121(pretrained=True)
    # Freeze all
    for param in model.parameters():
        param.requires_grad = False
    # Replace final layer
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Sequential(
        nn.Dropout(0.3),  # Add dropout
        nn.Linear(num_ftrs, num_classes)
    )
    for param in model.features.denseblock4.parameters():
        param.requires_grad = True
    return model

**ResNet34**

In [None]:
def create_resnet34(num_classes):
    model = models.resnet34(pretrained=True)
    # Freeze all layers
    for param in model.parameters():
        param.requires_grad = False
    # Replace the classifier
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    # Optionally unfreeze last block
    for param in model.layer3.parameters():
        param.requires_grad = True
    for param in model.layer4.parameters():
        param.requires_grad = True
    return model

**GoogLeNet**

In [None]:
def create_googlenet(num_classes):
    model = models.googlenet(pretrained=True, aux_logits=True)
    model.aux1 = None
    model.aux2 = None
    for param in model.parameters():
        param.requires_grad = False
    # Replace the final fully connected layer
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)
    # Unfreeze the final inception block and final layer
    for param in model.inception5b.parameters():
        param.requires_grad = True
    for param in model.fc.parameters():
        param.requires_grad = True
    return model


### Training Function

In [None]:

def calculate_macro_f1(y_true, y_pred):
    return f1_score(y_true, y_pred, average="macro")

def train_model(model,
                train_loader,
                val_loader,
                criterion,
                optimizer,
                scheduler=None,
                model_name="model",
                num_epochs=10,
                patience=3):
    model = model.to(device)
    best_model_wts = copy.deepcopy(model.state_dict())
    best_macro_f1 = 0.0
    best_val_loss = float('inf')
    epochs_no_improve = 0

    train_losses = []
    val_losses = []
    train_f1_scores = []
    val_f1_scores = []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print("-" * 10)

        for phase in ["train", "val"]:
            model.train() if phase == "train" else model.eval()
            loader = train_loader if phase == "train" else val_loader

            running_loss = 0.0
            all_preds = []
            all_labels = []

            for inputs, labels in loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    if isinstance(outputs, tuple):  # Handle GoogLeNet
                        outputs = outputs[0]

                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

            epoch_loss = running_loss / len(loader.dataset)
            epoch_macro_f1 = calculate_macro_f1(all_labels, all_preds)

            print(f"{phase.capitalize()} Loss: {epoch_loss:.4f} F1: {epoch_macro_f1:.4f}")

            if phase == "train":
                train_losses.append(epoch_loss)
                train_f1_scores.append(epoch_macro_f1)
            else:
                val_losses.append(epoch_loss)
                val_f1_scores.append(epoch_macro_f1)

                if epoch_macro_f1 > best_macro_f1:
                    best_macro_f1 = epoch_macro_f1
                    best_model_wts = copy.deepcopy(model.state_dict())
                    torch.save(model.state_dict(), f"/content/drive/MyDrive/Thesis/saved_models/best_{model_name}.pth")

                    df = pd.DataFrame({
                        'train_loss': train_losses,
                        'val_loss': val_losses,
                        'train_f1': train_f1_scores,
                        'val_f1': val_f1_scores
                    })
                    df.to_csv(f"/content/drive/MyDrive/Thesis/saved_models/training_curves_{model_name}.csv", index=False)
                    print(f"Saved best model and curves for {model_name}")

                if epoch_loss < best_val_loss:
                    best_val_loss = epoch_loss
                    epochs_no_improve = 0
                else:
                    epochs_no_improve += 1

                if scheduler is not None:
                    scheduler.step(epoch_loss)

        print()

        if epochs_no_improve >= patience:
            print("Early stopping triggered!")
            break

    print(f"Training complete. Best Macro F1: {best_macro_f1:.4f}")
    model.load_state_dict(best_model_wts)
    return model, train_losses, val_losses, train_f1_scores, val_f1_scores



In [None]:
# Calculate class weights
class_counts = [len(os.listdir(os.path.join(train_dir, class_name))) for class_name in class_names]
total_samples = sum(class_counts)
class_weights = [total_samples/class_count for class_count in class_counts]
class_weights[4] *= 1.2  # or a fixed custom weight
# Convert to tensor
class_weights = torch.FloatTensor(class_weights).to(device)

### Example: Training Multiple Models & Comparing

In [None]:
models_dict = {
    "resnet50": create_resnet50(num_classes),
    "vgg16": create_vgg16(num_classes),
    "effb0": create_efficientnet_b0(num_classes),
    "resnet34": create_resnet34(num_classes),
    "dense121": create_densenet121(num_classes),
    "vgg19": create_vgg19(num_classes),
    "googlenet": create_googlenet(num_classes),
}

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2, weight=None):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.weight = weight

    def forward(self, input, target):
        ce_loss = nn.CrossEntropyLoss(weight=self.weight, reduction='none')(input, target)
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss
        return focal_loss.mean()


In [None]:
# Set up for training
summary_results = []
num_epochs = 30
patience = 10

# Model and hyperparameter grid
model_name = "googlenet"
hyperparams_grid = {
    'learning_rate': [3e-4, 1e-4],
    'weight_decay': [1e-4, 1e-5],
    'label_smoothing': [0.0, 0.1],
    'scheduler_patience': [2],
    'criterion': ['ce'],
}

hyperparam_combinations = list(product(*hyperparams_grid.values()))

for i, (lr, wd, ls, sched_pat, crit_type) in enumerate(hyperparam_combinations):
    run_name = f"{model_name}_lr{lr}_wd{wd}_ls{ls}_crit{crit_type}"
    print(f"\n>> Run {i+1}/{len(hyperparam_combinations)}: {run_name}")

    model_save_dir = os.path.join(save_root, model_name, run_name)
    os.makedirs(model_save_dir, exist_ok=True)

    model = create_googlenet(num_classes).to(device)

    optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, weight_decay=wd)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=sched_pat, verbose=True)

    if crit_type == 'ce':
        criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=ls)
    else:
        raise NotImplementedError("Only 'ce' loss implemented so far")

    best_model, train_losses, val_losses, train_f1, val_f1 = train_model(
        model, train_loader, val_loader, criterion, optimizer,
        scheduler=scheduler, model_name=run_name, num_epochs=num_epochs, patience=patience
    )

    torch.save(best_model.state_dict(), os.path.join(model_save_dir, "best_model.pth"))
    df_curves = pd.DataFrame({
        'train_loss': train_losses,
        'val_loss': val_losses,
        'train_f1': train_f1,
        'val_f1': val_f1
    })
    df_curves.to_csv(os.path.join(model_save_dir, "training_curves.csv"), index=False)

    epochs_range = range(1, len(train_losses) + 1)
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, train_losses, label="Train Loss")
    plt.plot(epochs_range, val_losses, label="Val Loss")
    plt.title("Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, train_f1, label="Train F1")
    plt.plot(epochs_range, val_f1, label="Val F1")
    plt.title("Macro F1")
    plt.xlabel("Epoch")
    plt.ylabel("Macro F1")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(model_save_dir, "training_plot.png"))
    plt.close()

    # Evaluate on internal test set
    best_model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = best_model(inputs)
            if isinstance(outputs, tuple):
                outputs = outputs[0]
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    internal_acc = accuracy_score(all_labels, all_preds)
    internal_f1 = f1_score(all_labels, all_preds, average='macro')
    with open(os.path.join(model_save_dir, "results_summary.txt"), "w") as f:
        f.write(f"Internal Test Set:\n")
        f.write(f"Accuracy: {internal_acc:.4f}\n")
        f.write(f"Macro F1: {internal_f1:.4f}\n\n")

    # Evaluate on benchmark set
    all_preds, all_labels, all_confidences, wrong_patches = [], [], [], []
    total_inference_time, total_images = 0.0, 0
    with torch.no_grad():
        for inputs, labels in benchmark_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            start_time = time.time()
            outputs = best_model(inputs)
            batch_paths = [benchmark_loader.dataset.samples[i][0] for i in range(total_images, total_images + inputs.size(0))]
            end_time = time.time()
            probs = torch.softmax(outputs, dim=1)
            confidences, preds = torch.max(probs, dim=1)
            for true_label, pred_label, conf, path in zip(labels.cpu().numpy(), preds.cpu().numpy(), confidences.cpu().numpy(), batch_paths):
                if true_label != pred_label:
                    wrong_patches.append({
                        "true": benchmark_class_names[true_label],
                        "pred": benchmark_class_names[pred_label],
                        "confidence": conf,
                        "path": path
                    })
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_confidences.extend(confidences.cpu().numpy())
            total_inference_time += (end_time - start_time)
            total_images += inputs.size(0)

    conf_df = pd.DataFrame({
        "confidence": all_confidences,
        "true": [benchmark_class_names[true] for true, _ in zip(all_labels, all_preds)],
        "pred": [benchmark_class_names[pred] for _, pred in zip(all_labels, all_preds)],
    })
    conf_df["correct"] = conf_df["true"] == conf_df["pred"]

    conf_df.to_csv(os.path.join(model_save_dir, "confidence_scores.csv"), index=False)
    df_wrong = pd.DataFrame(wrong_patches)
    df_wrong.to_csv(os.path.join(model_save_dir, "wrong_predictions.csv"), index=False)

    benchmark_acc = accuracy_score(all_labels, all_preds)
    macro_f1 = f1_score(all_labels, all_preds, average='macro')
    weighted_f1 = f1_score(all_labels, all_preds, average='weighted')
    precision = precision_score(all_labels, all_preds, average='macro')
    recall = recall_score(all_labels, all_preds, average='macro')
    acc_mean, acc_low, acc_high = confidence_interval([a == b for a, b in zip(all_labels, all_preds)])
    avg_inference_time = (total_inference_time / total_images) * 1000

    with open(os.path.join(model_save_dir, "results_summary.txt"), "a") as f:
        f.write(f"Benchmark Set:\n")
        f.write(f"Accuracy: {benchmark_acc:.4f} (95% CI: {acc_low:.4f}–{acc_high:.4f})\n")
        f.write(f"Macro F1: {macro_f1:.4f}\n")
        f.write(f"Weighted F1: {weighted_f1:.4f}\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"Avg Inference Time per Image: {avg_inference_time:.2f} ms\n")

    cm_benchmark = confusion_matrix(all_labels, all_preds)
    df_cm = pd.DataFrame(cm_benchmark, index=benchmark_class_names, columns=benchmark_class_names)
    df_cm.to_csv(os.path.join(model_save_dir, "benchmark_confusion_matrix.csv"))
    plt.figure(figsize=(8, 6))
    sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
    plt.title(f"{run_name} - Benchmark Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.savefig(os.path.join(model_save_dir, "benchmark_confusion_matrix.png"))
    plt.close()

    summary_results.append({
        "model": run_name,
        "accuracy": benchmark_acc,
        "acc_low": acc_low,
        "acc_high": acc_high,
        "macro_f1": macro_f1,
        "weighted_f1": weighted_f1,
        "precision": precision,
        "recall": recall,
        "inference_time_ms": avg_inference_time
    })

# Final results
df_summary = pd.DataFrame(summary_results)
df_summary.to_csv(os.path.join(save_root, "benchmark_summary_results.csv"), index=False)
df_summary.set_index('model')[['macro_f1', 'weighted_f1', 'accuracy']].plot(kind='bar', figsize=(10, 6))
plt.title("Benchmark Performance Comparison")
plt.ylabel("Score")
plt.ylim(0, 1)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_root, "benchmark_model_comparison.png"))
plt.close()


In [None]:
# === Load the model architecture ===
def create_resnet50(num_classes):
    model = models.resnet34(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    for param in model.layer3.parameters():
        param.requires_grad = True
    for param in model.layer4.parameters():
        param.requires_grad = True
    return model

# === Paths ===
model_name = "resnet50"
run_name = "resnet50_lr5e-05_wd5e-05_ls0.1_critce"
model_path = f"/content/drive/MyDrive/Thesis/saved_models/best_{run_name}.pth"
model_save_dir = f"/content/drive/MyDrive/Thesis/results/{model_name}/{run_name}"
os.makedirs(model_save_dir, exist_ok=True)

# Load model
model = create_resnet34(num_classes)
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()

# === Evaluate on internal test set ===
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        if isinstance(outputs, tuple): outputs = outputs[0]
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

internal_acc = accuracy_score(all_labels, all_preds)
internal_f1 = f1_score(all_labels, all_preds, average='macro')

with open(os.path.join(model_save_dir, "results_summary.txt"), "w") as f:
    f.write(f"Internal Test Set:\n")
    f.write(f"Accuracy: {internal_acc:.4f}\n")
    f.write(f"Macro F1: {internal_f1:.4f}\n\n")

# === Benchmark Evaluation ===
all_preds = []
all_labels = []
all_confidences = []
wrong_patches = []
total_inference_time = 0.0
total_images = 0

with torch.no_grad():
    for inputs, labels in benchmark_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        start_time = time.time()
        outputs = model(inputs)
        end_time = time.time()

        probs = torch.softmax(outputs, dim=1)
        confidences, preds = torch.max(probs, dim=1)

        batch_paths = [benchmark_loader.dataset.samples[i][0] for i in range(total_images, total_images + inputs.size(0))]
        for true_label, pred_label, conf, path in zip(labels.cpu().numpy(), preds.cpu().numpy(), confidences.cpu().numpy(), batch_paths):
            if true_label != pred_label:
                wrong_patches.append({
                    "true": benchmark_class_names[true_label],
                    "pred": benchmark_class_names[pred_label],
                    "confidence": conf,
                    "path": path
                })

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_confidences.extend(confidences.cpu().numpy())

        total_images += inputs.size(0)
        total_inference_time += (end_time - start_time)

# === Save confidence histograms ===
conf_df = pd.DataFrame({
    "confidence": all_confidences,
    "true": [benchmark_class_names[i] for i in all_labels],
    "pred": [benchmark_class_names[i] for i in all_preds]
})

plt.figure(figsize=(10, 6))
sns.histplot(data=conf_df, x="confidence", hue="true", bins=30, kde=True, palette="tab10")
plt.title("Distribution of Prediction Confidence by True Class")
plt.tight_layout()
plt.savefig(os.path.join(model_save_dir, "confidence_histogram_by_true_class.png"))
plt.close()

conf_df["correct"] = conf_df["true"] == conf_df["pred"]
plt.figure(figsize=(10, 6))
sns.histplot(data=conf_df, x="confidence", hue="correct", bins=30, kde=True, palette="Set2")
plt.title("Confidence: Correct vs Incorrect Predictions")
plt.tight_layout()
plt.savefig(os.path.join(model_save_dir, "confidence_correct_vs_wrong.png"))
plt.close()

# === Metrics ===
benchmark_acc = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average='macro')
weighted_f1 = f1_score(all_labels, all_preds, average='weighted')
precision = precision_score(all_labels, all_preds, average='macro')
recall = recall_score(all_labels, all_preds, average='macro')
acc_mean, acc_low, acc_high = confidence_interval([a == b for a, b in zip(all_labels, all_preds)])
avg_inference_time = (total_inference_time / total_images) * 1000

report = classification_report(all_labels, all_preds, target_names=benchmark_class_names, digits=4)
with open(os.path.join(model_save_dir, "classification_report_benchmark.txt"), "w") as f:
    f.write(report)

cm_benchmark = confusion_matrix(all_labels, all_preds)
df_cm = pd.DataFrame(cm_benchmark, index=benchmark_class_names, columns=benchmark_class_names)
df_cm.to_csv(os.path.join(model_save_dir, "benchmark_confusion_matrix.csv"))

plt.figure(figsize=(8, 6))
sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
plt.title("Benchmark Confusion Matrix")
plt.tight_layout()
plt.savefig(os.path.join(model_save_dir, "benchmark_confusion_matrix.png"))
plt.close()

# === Save Summary
with open(os.path.join(model_save_dir, "results_summary.txt"), "a") as f:
    f.write(f"Benchmark Set:\n")
    f.write(f"Accuracy: {benchmark_acc:.4f} (95% CI: {acc_low:.4f}–{acc_high:.4f})\n")
    f.write(f"Macro F1: {macro_f1:.4f}\n")
    f.write(f"Weighted F1: {weighted_f1:.4f}\n")
    f.write(f"Precision: {precision:.4f}\n")
    f.write(f"Recall: {recall:.4f}\n")
    f.write(f"Avg Inference Time per Image: {avg_inference_time:.2f} ms\n")

conf_df.to_csv(os.path.join(model_save_dir, "confidence_scores.csv"), index=False)
pd.DataFrame(wrong_patches).to_csv(os.path.join(model_save_dir, "wrong_predictions.csv"), index=False)




In [None]:
# === 0. MOUNT DRIVE & IMPORTS ===
from google.colab import drive
drive.mount('/content/drive')

import os
import time
import torch
import torch.nn as nn
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
from sklearn.metrics import (
    accuracy_score, f1_score, classification_report, confusion_matrix
)
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === 1. DEFINE VGG16 ARCHITECTURE ===
def create_vgg16(num_classes, dropout_rate=0.4):
    model = models.vgg16(pretrained=True)

    # Freeze everything
    for param in model.parameters():
        param.requires_grad = False

    # Replace final classifier layer with Dropout + Linear
    num_features = model.classifier[6].in_features
    model.classifier[6] = nn.Sequential(
        nn.Dropout(dropout_rate),
        nn.Linear(num_features, num_classes)
    )

    # Unfreeze last conv block and the new classifier
    for param in model.features[20:].parameters():
        param.requires_grad = True
    for param in model.classifier[6].parameters():
        param.requires_grad = True

    return model


# === 2. PATHS & PARAMETERS ===
base_dir      = "/content/drive/MyDrive/Thesis/best_vgg16"
model_path    = os.path.join(base_dir, "best_model.pth")
test_dir      = os.path.join(base_dir, "Test_patches_original_march")
results_dir   = os.path.join(base_dir, "evaluation_results_march")
os.makedirs(results_dir, exist_ok=True)

# assume these are your class names in order
class_names = sorted(os.listdir(test_dir))
num_classes = len(class_names)

# === 3. DATASET & DATALOADER ===
# adjust size to your patch resolution, e.g. 128x128
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

dataset = datasets.ImageFolder(test_dir, transform=transform)
loader  = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=4)

# === 4. LOAD MODEL ===
model = create_vgg16(num_classes)
state = torch.load(model_path, map_location=device)
model.load_state_dict(state)
model.to(device).eval()

# === 5. EVALUATE ===
all_preds    = []
all_labels   = []
all_confidences = []
wrong_records  = []
total_time = 0.0
total_imgs = 0

with torch.no_grad():
    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        b = inputs.size(0)

        t0 = time.time()
        outputs = model(inputs)
        t1 = time.time()

        probs = torch.softmax(outputs, dim=1)
        confs, preds = torch.max(probs, dim=1)

        total_time += (t1 - t0)
        total_imgs += b

        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())
        all_confidences.extend(confs.cpu().tolist())

        # record wrong
        paths = [dataset.samples[i][0] for i in range(total_imgs-b, total_imgs)]
        for true, pred, conf, p in zip(labels.cpu(), preds.cpu(), confs.cpu(), paths):
            if true != pred:
                wrong_records.append({
                    "true": class_names[true],
                    "pred": class_names[pred],
                    "confidence": float(conf),
                    "path": p
                })

# === 6. METRICS & ARTIFACTS ===
# Classification report
report = classification_report(
    all_labels, all_preds, target_names=class_names, digits=4
)
with open(os.path.join(results_dir, "classification_report.txt"), "w") as f:
    f.write(report)

# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names)
df_cm.to_csv(os.path.join(results_dir, "confusion_matrix.csv"))

plt.figure(figsize=(8,6))
sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
plt.title("Benchmark Confusion Matrix")
plt.savefig(os.path.join(results_dir, "confusion_matrix.png"))
plt.close()

# Summary metrics
acc      = accuracy_score(all_labels, all_preds)
mac_f1   = f1_score(all_labels, all_preds, average='macro')
wei_f1   = f1_score(all_labels, all_preds, average='weighted')
avg_time = (total_time/total_imgs)*1000  # ms

with open(os.path.join(results_dir, "results_summary.txt"), "w") as f:
    f.write(f"Accuracy      : {acc:.4f}\n")
    f.write(f"Macro F1      : {mac_f1:.4f}\n")
    f.write(f"Weighted F1   : {wei_f1:.4f}\n")
    f.write(f"Avg inf time  : {avg_time:.2f} ms/image\n")

# Save confidences
pd.DataFrame({
    "true":       [class_names[i] for i in all_labels],
    "pred":       [class_names[i] for i in all_preds],
    "confidence": all_confidences
}).to_csv(os.path.join(results_dir, "confidence_scores.csv"), index=False)

# Save wrong predictions
pd.DataFrame(wrong_records).to_csv(
    os.path.join(results_dir, "wrong_predictions.csv"), index=False
)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:03<00:00, 164MB/s]
  state = torch.load(model_path, map_location=device)


In [None]:
import pandas as pd
from sklearn.metrics import (
    accuracy_score, f1_score, classification_report, confusion_matrix
)
import seaborn as sns
import matplotlib.pyplot as plt
import os

# ─── 0. CONFIG ────────────────────────────────────────────────────────────────
base_dir = "/content/drive/MyDrive/Thesis/best_vgg16"
CONF_CSV    = os.path.join(base_dir, "evaluation_results_march/confidence_scores.csv")
FILTERED_CSV = os.path.join(base_dir, "confidence_scores_no_duckweed_commonreed.csv")
REPORT_TXT  = os.path.join(base_dir, "classification_report_no_D_C.txt")
CM_CSV      = os.path.join(base_dir, "confusion_matrix_no_D_C.csv")
CM_PNG      = os.path.join(base_dir, "confusion_matrix_no_D_C.png")

# ─── 1. LOAD & FILTER ─────────────────────────────────────────────────────────
df = pd.read_csv(CONF_CSV)

# drop all rows whose true label is Duckweed or Common reed
to_remove = ["Duckweed", "Common reed"]
df_f = df[~df["true"].isin(to_remove)].reset_index(drop=True)

# save the filtered confidence scores
df_f.to_csv(FILTERED_CSV, index=False)
print(f"Filtered confidence scores saved to {FILTERED_CSV}")

# ─── 2. RECALCULATE METRICS ───────────────────────────────────────────────────
y_true = df_f["true"]
y_pred = df_f["pred"]

acc      = accuracy_score(y_true, y_pred)
macro_f1 = f1_score(y_true, y_pred, average="macro")
weighted = f1_score(y_true, y_pred, average="weighted")

# classification report, excluding the removed classes
classes = sorted(df_f["true"].unique())
report = classification_report(y_true, y_pred,
                               labels=classes,
                               target_names=classes,
                               digits=4)

with open(REPORT_TXT, "w") as f:
    f.write(f"Accuracy    : {acc:.4f}\n")
    f.write(f"Macro F1    : {macro_f1:.4f}\n")
    f.write(f"Weighted F1 : {weighted:.4f}\n\n")
    f.write("Classification report by class:\n")
    f.write(report)

print(f"Classification report saved to {REPORT_TXT}")

# ─── 3. CONFUSION MATRIX ──────────────────────────────────────────────────────
cm = confusion_matrix(y_true, y_pred, labels=classes)
df_cm = pd.DataFrame(cm, index=classes, columns=classes)
df_cm.to_csv(CM_CSV)

plt.figure(figsize=(6,5))
sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix (no Duckweed, no Common reed)")
plt.ylabel("True")
plt.xlabel("Predicted")
plt.tight_layout()
plt.savefig(CM_PNG)
plt.close()

print(f"Confusion matrix saved to {CM_CSV} and {CM_PNG}")


Filtered confidence scores saved to /content/drive/MyDrive/Thesis/best_vgg16/confidence_scores_no_duckweed_commonreed.csv
Classification report saved to /content/drive/MyDrive/Thesis/best_vgg16/classification_report_no_D_C.txt
Confusion matrix saved to /content/drive/MyDrive/Thesis/best_vgg16/confusion_matrix_no_D_C.csv and /content/drive/MyDrive/Thesis/best_vgg16/confusion_matrix_no_D_C.png


## RF and SVM

In [None]:
!pip install scikit-image




In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score
from sklearn.model_selection import GridSearchCV
from skimage.feature import graycomatrix, graycoprops
from skimage import exposure
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import joblib
import os
import pandas as pd
import torch
from sklearn.model_selection import RandomizedSearchCV
import time

In [None]:
# Define folders (already correct in your setup)
benchmark_dir = '/content/drive/MyDrive/Thesis/benchmark_test/test_v5'
train_dir     = '/content/drive/MyDrive/Thesis/no_preprocessing/train'
val_dir       = '/content/drive/MyDrive/Thesis/no_preprocessing/valid'
test_dir      = '/content/drive/MyDrive/Thesis/no_preprocessing/test'
save_dir      = "/content/drive/MyDrive/Thesis/results/traditional_models"

In [None]:
from torchvision import transforms

ml_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # keeps original 0–1 float values
])


In [None]:
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

train_dataset_ml     = ImageFolder(train_dir, transform=ml_transforms)
val_dataset_ml       = ImageFolder(val_dir, transform=ml_transforms)
test_dataset_ml      = ImageFolder(test_dir, transform=ml_transforms)
benchmark_dataset_ml = ImageFolder(benchmark_dir, transform=ml_transforms)

train_loader_ml     = DataLoader(train_dataset_ml, batch_size=32, shuffle=False, num_workers=2)
val_loader_ml       = DataLoader(val_dataset_ml, batch_size=32, shuffle=False, num_workers=2)
test_loader_ml      = DataLoader(test_dataset_ml, batch_size=32, shuffle=False, num_workers=2)
benchmark_loader_ml = DataLoader(benchmark_dataset_ml, batch_size=32, shuffle=False, num_workers=2)

# Update your class_names to match this loader
class_names = train_dataset_ml.classes


In [None]:
def compute_extra_features(img_batch):
    img_batch = img_batch.numpy()
    n_samples = img_batch.shape[0]

    mean_features = []
    std_features = []
    max_features = []
    min_features = []

    exg_features = []
    exr_features = []
    exg_exr_diff_features = []
    cive_features = []
    vari_features = []
    ngrdi_features = []
    gr_ratio_features = []
    gb_ratio_features = []

    glcm_contrast_features = []
    glcm_homogeneity_features = []
    glcm_energy_features = []
    glcm_asm_features = []
    glcm_dissimilarity_features = []
    glcm_correlation_features = []

    entropy_features = []

    for img in img_batch:
        r = img[0, :, :]
        g = img[1, :, :]
        b = img[2, :, :]

        # Basic statistics
        mean_features.append([np.mean(r), np.mean(g), np.mean(b)])
        std_features.append([np.std(r), np.std(g), np.std(b)])
        max_features.append([np.max(r), np.max(g), np.max(b)])
        min_features.append([np.min(r), np.min(g), np.min(b)])

        # Vegetation indices
        exg = 2 * g - r - b
        exg_mean = np.mean(exg)
        exg_features.append([exg_mean])

        exr = 1.4 * r - g
        exr_mean = np.mean(exr)
        exr_features.append([exr_mean])

        exg_exr_diff_features.append([exg_mean - exr_mean])

        vari = (g - r) / (g + r - b + 1e-10)
        vari_features.append([np.mean(vari)])

        ngrdi = (g - r) / (g + r + 1e-10)
        ngrdi_features.append([np.mean(ngrdi)])

        cive = 0.441 * r - 0.811 * g + 0.385 * b + 18.78745
        cive_features.append([np.mean(cive)])

        gr_ratio = r / (g + 1e-10)
        gb_ratio = g / (b + 1e-10)
        gr_ratio_features.append([np.mean(gr_ratio)])
        gb_ratio_features.append([np.mean(gb_ratio)])

        # GLCM texture (from rescaled green channel)
        g_rescaled = exposure.rescale_intensity(g, out_range=(0, 255)).astype(np.uint8)
        glcm = graycomatrix(g_rescaled, distances=[1], angles=[0], levels=256, symmetric=True, normed=True)

        glcm_contrast_features.append([graycoprops(glcm, 'contrast')[0, 0]])
        glcm_homogeneity_features.append([graycoprops(glcm, 'homogeneity')[0, 0]])
        glcm_energy_features.append([graycoprops(glcm, 'energy')[0, 0]])
        glcm_asm_features.append([graycoprops(glcm, 'ASM')[0, 0]])
        glcm_dissimilarity_features.append([graycoprops(glcm, 'dissimilarity')[0, 0]])
        glcm_correlation_features.append([graycoprops(glcm, 'correlation')[0, 0]])

        # Entropy (green channel histogram)
        hist, _ = np.histogram(g_rescaled, bins=256, range=(0, 255), density=True)
        hist = hist + 1e-10
        entropy = -np.sum(hist * np.log2(hist))
        entropy_features.append([entropy])

    # Stack all features
    all_features = np.hstack([
        mean_features,
        std_features,
        max_features,
        min_features,
        exg_features,
        exr_features,
        exg_exr_diff_features,
        cive_features,
        vari_features,
        ngrdi_features,
        gr_ratio_features,
        gb_ratio_features,
        glcm_contrast_features,
        glcm_homogeneity_features,
        glcm_energy_features,
        glcm_asm_features,
        glcm_dissimilarity_features,
        glcm_correlation_features,
        entropy_features
    ])

    return all_features


def flatten_dataset_with_features(loader):
    all_features = []
    all_labels = []

    for inputs, labels in loader:
        extras = compute_extra_features(inputs)
        all_features.append(extras)
        all_labels.append(labels.numpy())

    X = np.vstack(all_features)
    y = np.concatenate(all_labels)
    return X, y


In [None]:
X_train, y_train = flatten_dataset_with_features(train_loader_ml)
X_val, y_val     = flatten_dataset_with_features(val_loader_ml)
X_test, y_test   = flatten_dataset_with_features(test_loader_ml)
X_benchmark, y_benchmark = flatten_dataset_with_features(benchmark_loader_ml)



In [None]:
# Random Forest Grid Search
rf_params = {
    'n_estimators': [500, 600, 700],
    'max_depth': [10, 30, 50, None],
    'min_samples_split': [2, 5, 7],
    'min_samples_leaf': [1,2,3],
    'max_features': ['sqrt', 'log2', None],
    'criterion': ['gini', 'entropy'],
    'bootstrap': [True, False]

}

rf = RandomForestClassifier(random_state=42)
rf_grid = RandomizedSearchCV(rf, rf_params, n_iter = 1000, scoring='f1_macro', cv=5, verbose=3, n_jobs=-1)
start = time.time()
rf_grid.fit(X_train, y_train)
end = time.time()
print(f"Time taken: {(end - start) / 60:.2f} minutes")
print("Best RF Params:", rf_grid.best_params_)
print("Best RF F1 Score:", rf_grid.best_score_)
best_rf = rf_grid.best_estimator_
joblib.dump(rf_grid.best_estimator_, '/content/drive/MyDrive/Thesis/results/best_rf.pkl')

with open('/content/drive/MyDrive/Thesis/results/rf_best_params.txt', 'w') as f:
    f.write(str(rf_grid.best_params_))


Fitting 5 folds for each of 1000 candidates, totalling 5000 fits
Time taken: 133.34 minutes
Best RF Params: {'n_estimators': 700, 'min_samples_split': 7, 'min_samples_leaf': 1, 'max_features': 'log2', 'max_depth': 50, 'criterion': 'entropy', 'bootstrap': False}
Best RF F1 Score: 0.8144446714753389


In [None]:
print(len(rf_grid.cv_results_['params']))

1000


In [None]:
# SVM Grid Search
svm_params = {
    'svc__C': [0.01, 0.1, 1, 10, 100, 500],
    'svc__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
    'svc__kernel': ['linear', 'rbf', 'poly'],
    'svc__degree': [2, 3]  # Used only if kernel='poly'
}


# === Initialize base model ===
svm = make_pipeline(StandardScaler(), SVC(probability=True, random_state=42))

# === Perform Randomized Search ===
svm_grid = RandomizedSearchCV(
    estimator=svm,
    param_distributions=svm_params,
    n_iter=200,
    scoring='f1_macro',
    cv=5,
    verbose=3,
    n_jobs=-1,
    random_state=42
)

# === Fit model and measure time ===
start = time.time()
svm_grid.fit(X_train, y_train)
end = time.time()

print(f"⏱️ Time taken: {(end - start) / 60:.2f} minutes")
print("✅ Best SVM Params:", svm_grid.best_params_)
print("📊 Best Macro F1 Score:", svm_grid.best_score_)

# === Save best model and parameters ===
best_svm = svm_grid.best_estimator_
joblib.dump(best_svm, "best_svm_model.pkl")

with open("svm_best_params.txt", "w") as f:
    f.write(str(svm_grid.best_params_))


Fitting 5 folds for each of 200 candidates, totalling 1000 fits
⏱️ Time taken: 5.65 minutes
✅ Best SVM Params: {'svc__kernel': 'linear', 'svc__gamma': 'auto', 'svc__degree': 2, 'svc__C': 1}
📊 Best Macro F1 Score: 0.8479582008153004


In [None]:
def evaluate_model(model, X, y, set_name, model_name):
    preds = model.predict(X)
    acc = accuracy_score(y, preds)
    f1 = f1_score(y, preds, average='macro')
    report = classification_report(y, preds, target_names=class_names, output_dict=True)
    cm = confusion_matrix(y, preds)

    # Save report
    report_df = pd.DataFrame(report).transpose()
    report_df.to_csv(os.path.join(save_dir, f"{model_name}_{set_name}_report.csv"))

    # Save confusion matrix
    cm_df = pd.DataFrame(cm, index=class_names, columns=class_names)
    cm_df.to_csv(os.path.join(save_dir, f"{model_name}_{set_name}_confusion_matrix.csv"))

    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_df, annot=True, fmt="d", cmap="Blues")
    plt.title(f"{model_name} - Confusion Matrix ({set_name})")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f"{model_name}_{set_name}_confusion_matrix.png"))
    plt.close()

    print(f"[{model_name} on {set_name}] Accuracy: {acc:.4f} | Macro F1: {f1:.4f}")
    return acc, f1

evaluate_model(best_rf, X_test, y_test, set_name="Test", model_name="RF")
evaluate_model(best_rf, X_benchmark, y_benchmark, set_name="Benchmark", model_name="RF")
evaluate_model(best_svm, X_test, y_test, set_name="Test", model_name="SVM")
evaluate_model(best_svm, X_benchmark, y_benchmark, set_name="Benchmark", model_name="SVM")

NameError: name 'best_rf' is not defined