## Libraries

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
import torchvision.datasets
from torch.utils.data import random_split
import torch.optim as optim

import pandas as pd 
import os 
import numpy as np 
import matplotlib.pyplot as plt

from skimage.transform import resize 
from skimage.io import imread  
from sklearn.model_selection import GridSearchCV 
from sklearn.model_selection import train_test_split 
from sklearn.metrics import accuracy_score 
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

import seaborn as sns # Beautify CM

from torchvision.models import ResNet50_Weights, VGG16_Weights, EfficientNet_B0_Weights

## Preprocessing

In [None]:
# Transformations for the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize using ImageNet stats
])

# Load dataset
dataset_path = "C:/Users/rishi/Desktop/JHU/Critical Infrastructure Protection/Major Project/Data Sets/Sonar/TrainSetMotionBlur"
dataset = torchvision.datasets.ImageFolder(root=dataset_path, transform=transform)

# Train-test split
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Classes
class_names = dataset.classes
num_classes = len(class_names)


In [None]:
print(class_names, num_classes)

## Model Setup

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Running on: ", device)

In [None]:
# Set seeds for reproducibility
torch.manual_seed(42)  # For PyTorch
np.random.seed(42)     # For NumPy

In [None]:
def setup_model_with_dropout(model_type, num_classes, dropout_rate=0.0):
    if model_type == 'resnet':
        model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        model.fc = nn.Sequential(
            nn.Dropout(dropout_rate),  # Add dropout before final FC
            nn.Linear(model.fc.in_features, num_classes)
        )

    elif model_type == 'vgg':
        model = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
        model.classifier[6] = nn.Sequential(
            nn.Dropout(dropout_rate),  # Add dropout before final FC
            nn.Linear(model.classifier[6].in_features, num_classes)
        )

    elif model_type == 'efficientnet':
        model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
        model.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),  # Add dropout before final FC
            nn.Linear(model.classifier[1].in_features, num_classes)
        )

    model = model.to(device)
    return model


### Optimizer Setup

In [None]:
def setup_optimizer(model, optimizer_name, learning_rate, weight_decay):
    if optimizer_name == 'adam':
        return optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'sgd':
        return optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
    elif optimizer_name == 'rmsprop':
        return optim.RMSprop(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    else:
        raise ValueError(f"Unsupported optimizer: {optimizer_name}")


## Training and Evaluation Function

In [None]:
def train_and_evaluate(model, optimizer, criterion, train_loader, test_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")

    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predictions.cpu().numpy())

    accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    report = classification_report(y_true, y_pred, target_names=class_names, zero_division = 1)

    print(f"Accuracy: {accuracy * 100:.2f}%")
    print("Classification Report:\n", report)

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    return accuracy, report, cm

## Hyperparameter Search

In [None]:
def hyperparameter_search(model_types, param_grid, train_dataset, test_dataset, num_classes):
    best_model, best_metrics, best_params = None, {'accuracy': 0}, None

    for model_type in model_types:
        for lr in param_grid['learning_rate']:
            for batch_size in param_grid['batch_size']:
                for opt in param_grid['optimizer']:
                    for wd in param_grid['weight_decay']:
                        for dr in param_grid['dropout_rate']:
                            print(f"Training {model_type} with lr={lr}, batch_size={batch_size}, "
                                  f"optimizer={opt}, weight_decay={wd}, dropout_rate={dr}")

                            # Create DataLoaders
                            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
                            test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

                            # Set up the model with the specified dropout rate
                            model = setup_model_with_dropout(model_type, num_classes, dr)

                            # Set up the optimizer
                            optimizer = setup_optimizer(model, opt, lr, wd)

                            # Define the loss function
                            criterion = nn.CrossEntropyLoss()

                            # Train and evaluate
                            accuracy, _, _ = train_and_evaluate(
                                model, optimizer, criterion, train_loader, test_loader
                            )

                            # Track the best model
                            if accuracy > best_metrics['accuracy']:
                                best_model = model
                                best_metrics['accuracy'] = accuracy
                                best_params = {
                                    'model': model_type,
                                    'learning_rate': lr,
                                    'batch_size': batch_size,
                                    'optimizer': opt,
                                    'weight_decay': wd,
                                    'dropout_rate': dr
                                }

    print(f"Best Model: {best_params['model']} | Accuracy: {best_metrics['accuracy']:.2f}% | Params: {best_params}")
    return best_model, best_metrics, best_params


## Defining Hyperparameter Grid and Running the Models

In [None]:
param_grid = {
    'learning_rate': [0.001, 0.0005, 0.0001],
    'batch_size': [16, 32, 64],
    'optimizer': ['adam', 'sgd', 'rmsprop'],
    'weight_decay': [0.0, 1e-4, 1e-3],
    'dropout_rate': [0.0, 0.3, 0.5]
}

model_types = ['resnet', 'vgg', 'efficientnet']

best_model, best_metrics, best_params = hyperparameter_search(
    model_types, param_grid, train_dataset, test_dataset, num_classes
)

# Save the best model
torch.save(best_model.state_dict(), f"best_model_{best_params['model']}.pth")
print("Best model saved successfully!")

In [None]:
# The above code block ran for 81764 seconds = 23 hours

In [None]:
torch.save(best_model, f"best_model_{best_params['model']}_complete.pth")
print("Best model saved successfully!")

#### When you save the entire model, you are saving:
##### -->The model architecture: The complete definition of the neural network, including its layers and structure.
##### -->The model parameters (weights and biases): The learned values from training.

#### The state dictionary only saves:
##### -->The model's parameters: This includes weights, biases, and any other trainable tensors.

# Initial Individual Models -- Ignore

## ResNet

In [None]:
resnet_model = models.resnet50(weights=ResNet50_Weights.DEFAULT)
resnet_model.fc = nn.Linear(resnet_model.fc.in_features, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model.parameters(), lr=0.001)

print("Training and Evaluating ResNet:")
resnet_accuracy, resnet_report, resnet_cm = train_and_evaluate(
    resnet_model, optimizer, criterion, train_loader, test_loader, num_epochs=10
)

## VGG

In [None]:
vgg_model = models.vgg16(weights=VGG16_Weights.DEFAULT)
vgg_model.classifier[6] = nn.Linear(vgg_model.classifier[6].in_features, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(vgg_model.parameters(), lr=0.001)

print("Training and Evaluating VGG:")
vgg_accuracy, vgg_report, vgg_cm = train_and_evaluate(
    vgg_model, optimizer, criterion, train_loader, test_loader, num_epochs=10
)

## EfficientNet

In [None]:
efficientnet_model = models.efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
efficientnet_model.classifier[1] = nn.Linear(efficientnet_model.classifier[1].in_features, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(efficientnet_model.parameters(), lr=0.001)

print("Training and Evaluating EfficientNet:")
efficientnet_accuracy, efficientnet_report, efficientnet_cm = train_and_evaluate(
    efficientnet_model, optimizer, criterion, train_loader, test_loader, num_epochs=10
)

## Print Results for All Models

In [None]:
print("Summary of Results:")
print(f"ResNet Accuracy: {resnet_accuracy * 100:.2f}%")
print(f"VGG Accuracy: {vgg_accuracy * 100:.2f}%")
print(f"EfficientNet Accuracy: {efficientnet_accuracy * 100:.2f}%")