### Imports

In [None]:
from torchvision.datasets import ImageFolder
from torchvision import datasets
from torch.utils.data import DataLoader, random_split
from torch import Generator, Tensor, mean

### 4.2 Data preparation

In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image

class ImageFolderWithSize(Dataset):
    def __init__(self, root, transform=None):
        self.samples = datasets.ImageFolder(root=root).samples
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        img = Image.open(path)
        original_size = img.size  # (width, height)
        if self.transform:
            img = self.transform(img)
        return img, label, original_size


In [None]:
def compute_class_weights(dataset, num_classes=3):
    """Compute class weights for an imbalanced dataset."""
    counts = [0.0] * num_classes
    for _, label in dataset.samples:
        counts[label] += 1

    mean_count = sum(counts) / num_classes
    weights = Tensor([mean_count / c for c in counts])

    # not my best work hardcoding the labels here... 
    # but for this assignments I'll let it slide. 
    print(f"Class counts: Normal: {counts[0]}, Benign: {counts[1]}, Malignant: {counts[2]}")
    print(f"Class weights: {weights.tolist()}")
    return weights


def init_dataset(transform, seed=69420, compute_weights=False, compute_size_freq=False):
    dataset = ImageFolderWithSize('data/', transform=transform)
    total_size = len(dataset)
    train_size = int(0.7 * total_size)
    val_size = int(0.15 * total_size)
    test_size = total_size - train_size - val_size

    # Compute weights
    weights = None
    if compute_weights:
        weights = compute_class_weights(dataset)

    # Compute image size frequencies
    size_freq = None
    if compute_size_freq:
        size_freq = {}
        for _, _, orig_size in dataset:
            size_freq[orig_size] = size_freq.get(orig_size, 0) + 1
        print(f"Image size frequency count: {size_freq}")

    # For reproducibility
    generator = Generator().manual_seed(seed)

    return random_split(
        dataset, [train_size, val_size, test_size], generator=generator
    ), weights, size_freq

# batch_size suitable for a GTX 1650ti
def load_dataset(dataset, batch_size=16, pin=True):
    train, val, test = dataset
    return (
        DataLoader(train, batch_size=batch_size, shuffle=True, pin_memory=pin),
        DataLoader(val, batch_size=batch_size, shuffle=False, pin_memory=pin),
        DataLoader(test, batch_size=batch_size, shuffle=False, pin_memory=pin)
    )


In [None]:
from torchvision.transforms import Compose, Resize, Grayscale, Normalize, ToTensor
cnn_dataset, weights, freqs = init_dataset(Compose([
    Resize((256, 256)),
    Grayscale(num_output_channels=1),
    ToTensor()
]), compute_weights=True, compute_size_freq=True)

cnn_dataset = load_dataset(cnn_dataset)

vgg_dataset, _, _ = init_dataset(Compose([
    Resize((224,224)),
    ToTensor(),
    Grayscale(num_output_channels=3),
    Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
]))
vgg_dataset = load_dataset(vgg_dataset)

In [None]:
import matplotlib.pyplot as plt
import torch
import numpy as np

def show_sample(loader, n=3, force_channels=None, title=None):
    """
    Display 'n' samples from a dataloader using matplotlib subplots.

    force_channels:
        None  -> infer from tensor (use 1 channel = grayscale, 3 = RGB)
        1     -> convert to grayscale before display
        3     -> convert to RGB before display

    title:
        Optional figure-level title
    """
    images, labels, _ = next(iter(loader))  # dimensions: list/tuple of (x, y)
    print

    n = min(n, images.shape[0])
    fig, axes = plt.subplots(1, n, figsize=(4 * n, 4))

    if n == 1:
        axes = [axes]

    if title is not None:
        fig.suptitle(title, fontsize=14)

    for i, ax in enumerate(axes):
        img = images[i]

        # Decide how many channels we want to DISPLAY
        if force_channels is not None:
            c = force_channels
        else:
            c = img.shape[0]  # 1 or 3

        # Convert tensor to numpy
        arr = img.detach().cpu().numpy()

        # --- Handle channel variations ---
        if c == 1:
            # RGB -> grayscale if needed
            if img.shape[0] == 3:
                arr = arr.mean(axis=0)
            else:
                arr = arr[0]

            ax.imshow(arr, cmap="gray")
        
        elif c == 3:
            # Grayscale -> RGB if needed
            if img.shape[0] == 1:
                arr = np.repeat(arr, 3, axis=0)

            arr = arr.transpose(1, 2, 0)
            ax.imshow(arr)

        else:
            raise ValueError("force_channels must be None, 1, or 3")

        ax.axis("off")

        # Print dimensions below image
        #dim = dimensions[i]
        #ax.set_xlabel(f"Dimensions: {dim}", fontsize=10)

    plt.tight_layout()
    plt.show()


In [None]:
show_sample(cnn_dataset[0])
show_sample(vgg_dataset[0]) # will warn because of normalization

### 4.3 Architecture selection and model training

In [None]:
import torch.nn as nn

class SimpleLCC(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2), # shrink to 128x128
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2), # to 64x64,
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2), # 32x32
            nn.Conv2d(128, 256, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2), # 16x16,
            # Added to reduce overfitting
            nn.AdaptiveAvgPool2d((8,8)) 
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 8 * 8, 64),
            nn.ReLU(),
            # Added to reduce overfitting
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(64, 3)
        )

    def forward(self, x):
        x = self.features(x) 
        x = self.classifier(x)
        return x

In [None]:
from torchvision.models import vgg16, VGG16_Weights

class VGGLCC(nn.Module):
    def __init__(self):
        super().__init__()
        from torchvision.models import vgg16
        self.model = vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
        # freeze all CNN weights
        for p in self.model.features.parameters():
            p.requires_grad = False
        # replace output layer to 3 labels, these will have requires_grad=True
        self.model.classifier[-1] = nn.Linear(self.model.classifier[-1].in_features, 3)
    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
simple = SimpleLCC()
vgg = VGGLCC()
print(simple)
print("Parameter count:", sum((p.numel() for p in simple.parameters())))
print(vgg)
print("Parameter count:", sum((p.numel() for p in vgg.parameters())))



In [None]:
from torch.utils.tensorboard import SummaryWriter
    
def train(device, model, optimizer, loss_fn, epochs, train_dataloader, validation_dataloader, patience=2):
    writer = SummaryWriter() # TensorBoard writer
    
    best_validation_loss = float('inf')
    epochs_no_improve = 0
    
    model.to(device)
    
    for epoch in range(epochs):
        # Train Step
        model.train()
        running_train_loss = 0.0
        correct_train = 0
        total_train = 0
        for batch_idx, batch in enumerate(train_dataloader):
            # Inference
            inputs, labels, _image_dims = batch
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            
            # Optimization
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_train_loss += loss.item()
            
            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
            
            # Log training loss and accuracy every 16 batches to TensorBoard
            if batch_idx % 16 == 0:
                writer.add_scalar('Training Loss/batch', loss.item(), epoch * len(train_dataloader) + batch_idx)
                writer.add_scalar('Training Accuracy/batch', (correct_train / total_train), epoch * len(train_dataloader) + batch_idx)
                print(f"Epoch: {epoch}, Batch: {batch_idx}, Training Loss: {loss.item():.4f}, Training Accuracy: {(correct_train / total_train):.4f}")
                writer.flush() # Flush data
        
        
        # Calculate and print average training loss and accuracy for the epoch
        avg_train_loss = running_train_loss / len(train_dataloader)
        train_accuracy = correct_train / total_train
        writer.add_scalar('Training Loss/epoch', avg_train_loss, epoch)
        writer.add_scalar('Training Accuracy/epoch', train_accuracy, epoch)
        print(f"Epoch: {epoch}, Average Training Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}")
        writer.flush() # Flush data after epoch
        
        
        # Validation Step
        model.eval()
        running_validation_loss = 0.0
        correct_validation = 0
        total_validation = 0
        with torch.no_grad():
            for batch_idx, batch in enumerate(validation_dataloader):
                # Inference
                inputs, labels, _image_dims = batch
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                running_validation_loss += loss.item()
                
                # Calculate accuracy
                _, predicted = torch.max(outputs.data, 1)
                total_validation += labels.size(0)
                correct_validation += (predicted == labels).sum().item()
                
                # Log validation loss and accuracy every 16 batches to TensorBoard
                if batch_idx % 16 == 0:
                    writer.add_scalar('Validation Loss/batch', loss.item(), epoch * len(validation_dataloader) + batch_idx)
                    writer.add_scalar('Validation Accuracy/batch', (correct_validation / total_validation), epoch * len(validation_dataloader) + batch_idx)
                    print(f"Epoch: {epoch}, Batch: {batch_idx}, Validation Loss: {loss.item():.4f}, Validation Accuracy: {(correct_validation / total_validation):.4f}")
                    writer.flush() # Flush data
        
        
        # Calculate and print average validation loss and accuracy for the epoch
        avg_validation_loss = running_validation_loss / len(validation_dataloader)
        validation_accuracy = correct_validation / total_validation
        writer.add_scalar('Validation Loss/epoch', avg_validation_loss, epoch)
        writer.add_scalar('Validation Accuracy/epoch', validation_accuracy, epoch)
        print(f"Epoch: {epoch}, Average Validation Loss: {avg_validation_loss:.4f}, Validation Accuracy: {validation_accuracy:.4f}")
        writer.flush() # Flush data after epoch
        
        
        # Early stopping logic
        
        if torch.isnan(inputs).any():
            print("Input contains NaN!")
            break
        
        if torch.isnan(outputs).any():
            print("Model output contains NaN!")
            break
            
        if torch.isnan(loss):
            print("Loss is NaN!")
            break
        
        threshold = 0.05
        if avg_validation_loss + threshold < best_validation_loss: 
            best_validation_loss = avg_validation_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve == patience:
                print(f"Early stopping triggered after {patience} epochs with no improvement.")
                break
    
    writer.close()

In [None]:
import torch
from torch.optim import Adam, SGD 
gpu = torch.device("cuda")
epochs = 50

In [None]:
optimizer = Adam(simple.parameters(), lr=8e-4)
loss = nn.CrossEntropyLoss(weight=weights.to(gpu))
train(
    gpu, 
    simple,  
    optimizer, 
    loss, 
    epochs,
    cnn_dataset[0], # training 
    cnn_dataset[1], # validation
    patience=4)

In [None]:
optimizer = Adam(vgg.model.classifier[-1].parameters(), lr=3e-4)
loss = nn.CrossEntropyLoss(weight=weights.to(gpu))
train(
    gpu, 
    vgg,  
    optimizer, 
    loss, 
    epochs,
    vgg_dataset[0], # training 
    vgg_dataset[1], # validation
    patience=5)

In [None]:
import matplotlib.pyplot as plt
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
import os
import glob

log_dir = 'runs/'

# Check if the log directory exists
if not os.path.exists(log_dir):
    print(f"Error: Log directory '{log_dir}' not found.")
else:
    # Find the latest run directory
    run_dirs = glob.glob(os.path.join(log_dir, '*'))
    if not run_dirs:
        print(f"Error: No run directories found in '{log_dir}'.")
    else:
        # Assuming the latest run directory is the one with the latest modification time
        latest_run_dir = max(run_dirs, key=os.path.getmtime)
        print(f"Using latest run directory: {latest_run_dir}")

        try:
            # Load the TensorBoard event file from the latest run directory
            event_acc = EventAccumulator(latest_run_dir)
            event_acc.Reload()

            # Print available scalar keys
            print("Available scalar keys:", event_acc.Tags()['scalars'])

            # Extract the scalar data
            training_loss_epochs = event_acc.Scalars('Training Loss/epoch')
            validation_loss_epochs = event_acc.Scalars('Validation Loss/epoch')

            # Get the steps (epochs) and values
            epochs_train = [s.step for s in training_loss_epochs]
            loss_train = [s.value for s in training_loss_epochs]

            epochs_validation = [s.step for s in validation_loss_epochs]
            loss_validation = [s.value for s in validation_loss_epochs]


            # Plot the losses
            plt.figure(figsize=(10, 6))
            plt.plot(epochs_train, loss_train, label='Training Loss')
            plt.plot(epochs_validation, loss_validation, label='Validation Loss')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.title('Training vs. Validation Loss per Epoch')
            plt.legend()
            plt.grid(True)
            plt.show()

        except KeyError as e:
            print(f"KeyError: {e}. Make sure the scalar keys exist in the TensorBoard logs.")
        except Exception as e:
            print(f"An error occurred: {e}")

### 4.4 Model evaluation

In [None]:
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix
)
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

def test_model(device, model, test_loader):
    all_preds = []
    all_labels = []
    
    for batch_id, batch in enumerate(test_loader):
        inputs, labels_cpu, _ = batch
        inputs, labels = inputs.to(device), labels_cpu.to(device)
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)
        # Accumulate
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels_cpu.numpy())
    return np.array(all_preds), np.array(all_labels)

def evaluate_results(y_true, y_pred):
    acc = accuracy_score(y_true, y_pred)
    print(f"Test Accuracy: {acc:.4f}")

    print(classification_report(
        y_true,
        y_pred,
        target_names=["Normal", "Benign", "Malignant"]
    ))


    cm = confusion_matrix(y_true, y_pred)
    
    sns.heatmap(
        cm, annot=True, fmt="d", cmap="Blues",
        xticklabels=["Normal","Benign","Malignant"],
        yticklabels=["Normal","Benign","Malignant"]
    )
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix (Test data)")
    plt.show()    

In [None]:
#SimpleLCC
y_pred, y_true = test_model(gpu, simple, cnn_dataset[2])
evaluate_results(y_true, y_pred)

In [None]:
#VGG16
y_pred, y_true = test_model(gpu, vgg, vgg_dataset[2])
evaluate_results(y_true, y_pred)