In [1]:
import os
import torch
from torch import nn
from poutyne import Model, CSVLogger
from poutyne.framework import ModelCheckpoint, EarlyStopping, plot_history
import numpy as np
import torchmetrics
from datetime import datetime
import sys
import pandas as pd
from custom_lib.data_prep import data_transformation_pipeline, data_loader
import matplotlib as plt
import torchvision.models as models
import time
from torch.optim.lr_scheduler import ReduceLROnPlateau


In [2]:
# These need to be passed in a parse args statement
# Tuneable Params
lr = 1e-3
data_dir = "data"
model_name = "b0"
save_logs = True
epochs = 1
rotate_angle=None
horizontal_flip_prob=None
brightess_contrast=None
gaussian_blur=None
normalize=True
seed = 39
batch_size = 32
results_folder_name = "results"
truncated_layers = 0
bootstrap_n = 5
pretrained = False


In [3]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


Using mps device


In [4]:
import torch.nn as nn
import torchvision.models as models
from torchvision.models import EfficientNet_B0_Weights, EfficientNet_B1_Weights, EfficientNet_B2_Weights, EfficientNet_B3_Weights

# Define the model mapping as a constant (outside the function)
MODEL_MAPPING = {
    "b0": ("efficientnet_b0", EfficientNet_B0_Weights.IMAGENET1K_V1),
    "b1": ("efficientnet_b1", EfficientNet_B1_Weights.IMAGENET1K_V1),
    "b2": ("efficientnet_b2", EfficientNet_B2_Weights.IMAGENET1K_V1),
    "b3": ("efficientnet_b3", EfficientNet_B3_Weights.IMAGENET1K_V1),
}

def load_efficientnet(model_name, model_mapping, pretrained):
    """
    Load an EfficientNet model based on the provided model name and model mapping.

    Args:
        model_name (str): The name of the EfficientNet model (e.g., "b0", "b1", "b2", "b3").
        model_mapping (dict): A dictionary mapping model names to their corresponding classes and weights.

    Returns:
        torch.nn.Module: The loaded EfficientNet model.

    Raises:
        ValueError: If the model name is not supported.
    """
    # Check if the model name is valid
    if model_name not in model_mapping:
        raise ValueError(f"Unsupported model name: {model_name}. Supported models are: {list(model_mapping.keys())}")

    # Get the model class and weights from the mapping
    model_class_name, weights = model_mapping[model_name]
    model_class = getattr(models, model_class_name)

    if pretrained:
        # Load the model with pretrained weights
        effnet = model_class(weights=weights)
    else:
        torch.manual_seed(seed)
        effnet = model_class(weights=None)
    return effnet


try:
    effnet = load_efficientnet(model_name, MODEL_MAPPING, pretrained=pretrained)
    print(f"Successfully loaded EfficientNet {model_name}.")
except ValueError as e:
    print(e)

Successfully loaded EfficientNet b0.


In [5]:
if model_name == "b0":
    image_size = 224
if model_name == "b1":
    image_size = 240
if model_name == "b2":
    image_size = 260
if model_name == "b3":
    image_size = 300

In [6]:
train_transform = data_transformation_pipeline(image_size = image_size,
                                               rotate_angle=rotate_angle,
                                               horizontal_flip_prob=horizontal_flip_prob,
                                               gaussian_blur=gaussian_blur,
                                               normalize=normalize,
                                               is_train=True)
test_transform = data_transformation_pipeline(image_size = image_size,
                                               rotate_angle=rotate_angle,
                                               horizontal_flip_prob=horizontal_flip_prob,
                                               gaussian_blur=gaussian_blur,
                                               normalize=normalize,
                                               is_train=False)
val_transform = data_transformation_pipeline(image_size = image_size,
                                               rotate_angle=rotate_angle,
                                               horizontal_flip_prob=horizontal_flip_prob,
                                               gaussian_blur=gaussian_blur,
                                               normalize=normalize,
                                               is_train=False)

train_loader , val_loader, test_loader, num_classes = data_loader(data_dir, 
                                                     train_transform=train_transform,
                                                     test_transform=test_transform,
                                                     val_transform=val_transform,
                                                     seed=seed,
                                                     batch_size=batch_size
                                                     )


Train size: 6177, Validation size: 772, Test size: 773


In [7]:


class TruncatedEffNet(nn.Module):
    def __init__(self, effnet, num_classes, removed_layers, batch_size, image_size):
        super(TruncatedEffNet, self).__init__()

        # Truncate the EfficientNet backbone
        layers = 7 - removed_layers
        self.effnet_truncated = nn.Sequential(*list(effnet.features.children())[:layers])

        # Global average pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

        # Dynamically calculate the input size for the fully connected layer
        with torch.no_grad():  # Disable gradient tracking for this forward pass
            dummy_input = torch.randn(batch_size, 3, image_size, image_size)  # Example input (batch_size=1, channels=3, height=224, width=224)
            dummy_output = self.effnet_truncated(dummy_input)
            dummy_output = self.global_avg_pool(dummy_output)
            fc_input_size = dummy_output.view(dummy_output.size(0), -1).size(1)  # Flatten and get the size

        # Define the fully connected layer
        self.fc = nn.Linear(fc_input_size, num_classes)

    def forward(self, x):
        x = self.effnet_truncated(x)  # Extract features
        x = self.global_avg_pool(x)  # Pooling
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc(x)  # Classification
        return x

# Instantiate the model with the truncated backbone
model = TruncatedEffNet(effnet, num_classes, removed_layers=truncated_layers, batch_size=batch_size, image_size=image_size)

In [8]:
if save_logs:
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")

    # Create directory for saving all logs and model outputs 
    results_dir = os.path.join(f"{results_folder_name}/{model_name}_reduced_layers_{truncated_layers}_{timestamp}")
    os.makedirs(results_dir, exist_ok=True)
    print(f"Logs and output will be saved in: {results_dir}")


Logs and output will be saved in: results/b0_reduced_layers_0_2025-02-07_22-52


In [9]:


# # 6. Wrap the model with Poutyne
# poutyne_model = Model(
#     model,
#     optimizer=torch.optim.Adam(model.parameters(), lr=lr),
#     loss_function=nn.CrossEntropyLoss(),
#     batch_metrics=["accuracy"],
#     device=device
# )

In [10]:
# Move model to the device (CPU in this case)
model.to(device)

# Define loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()

# Select Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr = .001) # Learning rate used in Kaur et. al

# Learning Rate Scheduler
scheduler = ReduceLROnPlateau(optimizer, mode = 'max', factor = 0.3, patience = 2)

# Define accuracy function
def calculate_accuracy(outputs, labels):
    _, preds = torch.max(outputs, 1)
    corrects = (preds == labels).sum().item()
    return corrects

In [11]:
# Set initial values for epoch tracking lists
train_losses = [] # Training loss list
val_losses = [] # Validation loss list
train_accuracies = [] # Training accuarcy list
val_accuracies = [] # Validation accuracy list
min_valid_loss = np.inf # Initialize minimum validation loss as infinite
max_valid_accuracy = 0 # Initalize maximum validation accuracy as 0


start_time = time.time()
# Epoch training loop
for epoch in range(epochs):
    model.train()  # Set the model to training mode
    train_loss = 0.0 # Initialize training loss
    running_corrects = 0

    for batch_images, batch_labels in train_loader:
        batch_images, batch_labels = batch_images.to(device), batch_labels.to(device)  # Move data to the GPU
        
        optimizer.zero_grad()  # Zero the gradients
        outputs = model(batch_images)  # Forward pass
        loss = criterion(outputs, batch_labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update model parameters
        
        train_loss += loss.item() * batch_images.size(0)  # Accumulate loss
        running_corrects += calculate_accuracy(outputs, batch_labels)  # Accumulate correct predictions
    
    print("     Epoch training complete, now starting validation")

    ### Validation
    val_loss = 0.0
    val_corrects = 0
    model.eval()
    with torch.no_grad():  # Disable gradient calculation
        for batch_images, batch_labels in val_loader:
            batch_images, batch_labels = batch_images.to(device), batch_labels.to(device)  # Move data to the GPU
            
            outputs = model(batch_images)
            loss = criterion(outputs, batch_labels)
            val_loss += loss.item() * batch_images.size(0)  # Accumulate loss
            val_corrects += calculate_accuracy(outputs, batch_labels)  # Accumulate correct predictions

    train_loss /= len(train_loader.dataset)
    val_loss /= len(val_loader.dataset)
    train_acc = running_corrects / len(train_loader.dataset)
    val_acc = val_corrects / len(val_loader.dataset)

    print(f'Epoch {epoch + 1} \t\t Training Loss: {train_loss:.4f} \t\t Validation Loss: {val_loss:.4f} \t\t Training Accuracy: {train_acc:.4f} \t\t Validation Accuracy: {val_acc:.4f}')

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    if save_logs:

        if min_valid_loss > val_loss:
            print(f'Validation loss decreased({min_valid_loss:.6f}--->{val_loss:.6f}) \t Saving the model')
            min_valid_loss = val_loss
            torch.save(model.state_dict(), f"{results_dir}/best_model.pth")

        
    scheduler.step(val_loss)

if save_logs:
    torch.save(model.state_dict(),  f"{results_dir}/final_model.pth")
if save_logs:
    train_val_loss_acc = pd.DataFrame({
        "train_loss": [train_losses],
        "val_loss": [val_losses],
        "train_accuracy": [train_accuracies],
        "val_accuracy": [val_accuracies]
    })

    train_val_loss_acc.to_csv( f"{results_dir}/train_val_loss_acc.csv")

end_time = time.time()

run_time = end_time - start_time

     Epoch training complete, now starting validation
Epoch 1 		 Training Loss: 0.5938 		 Validation Loss: 0.4441 		 Training Accuracy: 0.7818 		 Validation Accuracy: 0.8679
Validation loss decreased(inf--->0.444110) 	 Saving the model


In [12]:
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, Subset
from poutyne import Model
from sklearn.metrics import f1_score, recall_score, precision_score, confusion_matrix

import torch
from sklearn.metrics import precision_score, recall_score, f1_score

def evaluate_model(model, test_loader, device='cuda'):
    """
    Evaluate a model on a test dataset and compute accuracy, precision, recall, and F1 score.

    Args:
        model (torch.nn.Module): The trained model to evaluate.
        test_loader (torch.utils.data.DataLoader): DataLoader for the test dataset.
        device (str): Device to use for evaluation ('cuda' or 'cpu').

    Returns:
        dict: A dictionary containing accuracy, precision, recall, and F1 score.
    """
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch_images, batch_labels in test_loader:
            # Move data to the specified device
            batch_images, batch_labels = batch_images.to(device), batch_labels.to(device)
            
            # Forward pass
            outputs = model(batch_images)
            _, predicted = torch.max(outputs, 1)
            
            # Store predictions and true labels
            predictions.extend(predicted.cpu().numpy())
            true_labels.extend(batch_labels.cpu().numpy())
            
            # Update total and correct counts
            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()

    # Calculate metrics
    accuracy = 100 * correct / total
    precision = precision_score(true_labels, predictions, average='macro')
    recall = recall_score(true_labels, predictions, average='macro')
    f1 = f1_score(true_labels, predictions, average='macro')

    # Return results as a dictionary
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }

def bootstrap_evaluation(model, test_loader, save_logs, results_dir, device, n_bootstraps=1000, seed=42):
    """
    Perform bootstrap evaluation of a PyTorch model on a test dataset.

    Args:
        model: The trained PyTorch model to evaluate.
        test_loader: DataLoader for the test dataset.
        save_logs: Whether to save the metric distributions to CSV.
        results_dir: Directory to save the bootstrap distribution CSV.
        n_bootstraps: Number of bootstrap samples to generate.
        seed: Random seed for reproducibility.
        device: Device to use for evaluation ('cuda' or 'cpu').

    Returns:
        A pandas DataFrame with mean and confidence intervals for:
        - Accuracy
        - F1 Score
        - Sensitivity (Recall)
        - Specificity
    """
    rng = np.random.RandomState(seed)

    # Store bootstrapped metrics
    metrics = {
        "accuracy": [],
        "f1_score": [],
        "sensitivity": [],
        "specificity": [],
    }

    for _ in range(n_bootstraps):
        # Create a bootstrap sample
        sampled_indices = rng.choice(len(test_loader.dataset), len(test_loader.dataset), replace=True)
        sampled_subset = Subset(test_loader.dataset, sampled_indices)
        sampled_loader = DataLoader(sampled_subset, batch_size=test_loader.batch_size, shuffle=False)

        # Evaluate using PyTorch model
        model.to(device)
        results = evaluate_model(model, sampled_loader, device=device)

        # Compute specificity
        y_true, y_pred = [], []
        for inputs, labels in sampled_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

        cm = confusion_matrix(y_true, y_pred)
        specificity_values = []
        for i in range(cm.shape[0]):
            col_sum = cm[:, i].sum()
            if col_sum > 0:
                specificity_values.append(cm[i, i] / col_sum)
        specificity = np.mean(specificity_values) if specificity_values else 0.0

        # Store results
        metrics["accuracy"].append(results["accuracy"])
        metrics["f1_score"].append(results["f1"])
        metrics["sensitivity"].append(results["recall"])
        metrics["specificity"].append(specificity)

    if save_logs:
        # Save the full bootstrap distributions
        dist_df = pd.DataFrame(metrics)
        dist_df.to_csv(f"{results_dir}/bootstrap_distribution.csv", index=False)

    # Compute mean and confidence intervals
    def compute_ci(values):
        return np.mean(values), np.percentile(values, 2.5), np.percentile(values, 97.5)

    results_dict = {f"{metric}_{stat}": value
                    for metric, values in metrics.items()
                    for stat, value in zip(["mean", "low", "high"], compute_ci(values))}

    # Convert to DataFrame
    results_df = pd.DataFrame([results_dict])

    return results_df



In [14]:

# # Run bootstrapping evaluation with your Poutyne model
boot_strap_results = bootstrap_evaluation(model, test_loader, n_bootstraps = bootstrap_n, save_logs=True, results_dir=results_dir,
                                          device=device)


Validation Accuracy: 87.97%
Precision Score: 0.8845
Recall Score: 0.8744
F1 Score: 0.8749
Validation Accuracy: 88.75%
Precision Score: 0.8988
Recall Score: 0.8769
F1 Score: 0.8832
Validation Accuracy: 89.91%
Precision Score: 0.9078
Recall Score: 0.8918
F1 Score: 0.8958
Validation Accuracy: 89.26%
Precision Score: 0.9004
Recall Score: 0.8862
F1 Score: 0.8891
Validation Accuracy: 87.45%
Precision Score: 0.8767
Recall Score: 0.8627
F1 Score: 0.8666


In [15]:
boot_strap_results

Unnamed: 0,accuracy_mean,accuracy_low,accuracy_high,f1_score_mean,f1_score_low,f1_score_high,sensitivity_mean,sensitivity_low,sensitivity_high,specificity_mean,specificity_low,specificity_high
0,88.667529,87.503234,89.844761,0.881921,0.86747,0.895129,0.878424,0.863894,0.891242,0.893641,0.877444,0.907083


In [16]:
from thop import profile

# Create a dummy input tensor with the same shape as your model's input
dummy_input = torch.randn(batch_size, 3, image_size, image_size).to(device)  # Batch size = 1, Channels = 3, Height = image_size, Width = image_size

# Compute FLOPs and parameters
flops, params = profile(model, inputs=(dummy_input,))

gflops = flops / 1000000000

print(f"GFLOPs: {gflops}")
print(f"Parameters: {params}")

[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv2d'>.
[INFO] Register count_normalization() for <class 'torch.nn.modules.batchnorm.BatchNorm2d'>.
[INFO] Register count_adap_avgpool() for <class 'torch.nn.modules.pooling.AdaptiveAvgPool2d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.container.Sequential'>.
[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
GFLOPs: 11.628742656
Parameters: 2878928.0


In [17]:
# Save logs and plots
if save_logs:
    with open(f"{results_dir}/model_overview.txt", "w") as file:
        file.write(f"Model Structure:\n{model}\n")
        file.write(f"Using {device} device\n")

# Check if CSV exists
    if os.path.exists(f"{results_folder_name}/test_results.csv"):
        test_results_df = pd.read_csv(f"{results_folder_name}/test_results.csv")
    else:
        test_results_df = pd.DataFrame(columns=[
            "model_id", "model", "pretrained",  "truncated_layers", "epochs", "run_time", "lr", "image_size",
            "rotate_angle", "horizontal_flip_prob", "gaussian_blur", "normalize", "seed"
        ])

    # Create a DataFrame for the new model's metadata
    new_results_df = pd.DataFrame({
        "model_id": [f"{model_name}_reduced_layers_{truncated_layers}_{timestamp}"],
        "model": [model_name],
        "pretrained": [pretrained],
        "truncated_layers": [truncated_layers],
        "epochs": [epochs],  
        "run_time": [run_time / 60],  
        "lr": [lr],
        "image_size": [image_size],  
        "rotate_angle": [rotate_angle],  
        "horizontal_flip_prob": [horizontal_flip_prob],  
        "gaussian_blur": [gaussian_blur],  
        "normalize": [normalize],
        "seed": [seed],
        "gflops": [gflops],
        "params": [params]
    })

    # Combine test metadata with bootstrapped results (column-wise merge)
    new_results_df = pd.concat([new_results_df, boot_strap_results], axis=1)

    # Append to existing DataFrame
    test_results_df = pd.concat([test_results_df, new_results_df], ignore_index=True)

    # Save updated results
    test_results_df.to_csv(f"{results_folder_name}/test_results.csv", index=False)

    # # Plot training history
    # plot_history(
    #     history,
    #     metrics=['loss', 'acc'],
    #     labels=['Loss', 'Accuracy'],
    #     titles=f"{model_name} Training",
    #     save=True,  
    #     save_filename_template='{metric}_plot',  
    #     save_directory=results_dir,  
    #     save_extensions=('png',)  
    # )


NameError: name 'run_time' is not defined