In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pathlib
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from collections import Counter
from torchvision import transforms
from torch.utils.data import random_split, Dataset, DataLoader, WeightedRandomSampler

import random
import numpy as np
import torch

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True  # Slower, more reproducible
    torch.backends.cudnn.benchmark = False

seed_everything(63)


dataset = np.load('../LabelData/dataset.npy')

# Step 1: Extract all image arrays
images = np.array([item[1] for item in dataset])  # shape: (n, 10, 10, 19)
print("images.shape:", images.shape)

# Step 2: Reshape to (n * 10 * 10, 19)
pixels = images.reshape(-1, 19)
print("pixels.shape:", pixels.shape)
print()

# Step 3: Compute mean and std across all valid (non-NaN) pixels for each channel
channel_means = np.nanmean(pixels, axis=0)
channel_stds = np.nanstd(pixels, axis=0)

print("Channel Means (ignoring NaNs):", channel_means)
print("Channel STDs (ignoring NaNs):", channel_stds)
print()

targets = np.array([item[0] for item in dataset])
print("targets.shape:", targets.shape)
unique_vals, counts = np.unique(targets, return_counts=True)
print("Unique values:", unique_vals)
print("Counts:", counts)
print()

# Example: 80% train, 10% val, 10% test
total_size = len(dataset)
train_size = int(0.8 * total_size)
val_size = int(0.1 * total_size)
test_size = total_size - train_size - val_size  # handles rounding

train_dataset, val_dataset, test_dataset = random_split(
    dataset, [train_size, val_size, test_size],
    generator=torch.Generator().manual_seed(63)
)

print("Train size:", len(train_dataset))
print("Val size:", len(val_dataset))
print("Test size:", len(test_dataset))
print()

class MultiChannelDataset(Dataset):
    def __init__(self, data, transform=None, target_transform=None):
        self.data = data
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        label, image = self.data[idx]

        # Convert image to torch tensor (should already be in C x H x W format)
        image = torch.tensor(image, dtype=torch.float32)

        # Transpose the image from (H, W, C) to (C, H, W)
        image = image.permute(2, 0, 1) # Original shape (10, 10, 19) -> Permuted shape (19, 10, 10)

        image = torch.nan_to_num(image, nan=0.0)

        if self.transform:
            image = self.transform(image)

        if self.target_transform:
            label = self.target_transform(label)

        return image, label

normalize = transforms.Normalize(mean=channel_means, std=channel_stds)

# Pass this to your dataset
train_ds = MultiChannelDataset(train_dataset, transform=normalize)
val_ds = MultiChannelDataset(val_dataset, transform=normalize)
test_ds = MultiChannelDataset(test_dataset, transform=normalize)

for image, label in train_ds:
    print("Label:", label)
    print("Image shape:", image.shape)
    break
print()

# Extract labels
labels = [label for _, label in train_ds]

# Compute weights
class_counts = np.bincount(labels)
class_weights = 1. / class_counts
sample_weights = [class_weights[label] for label in labels]
sample_weights = torch.DoubleTensor(sample_weights)

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),  # or more for oversampling
    replacement=True
)

train_loader = DataLoader(train_ds, batch_size=4, sampler=sampler)
val_loader = DataLoader(val_ds, batch_size=4, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=4, shuffle=False)

print("Number of batches in train_loader:", len(train_loader))
print("Number of batches in val_loader:", len(val_loader))
print("Number of batches in test_loader:", len(test_loader))
print()

for batch_imgs, batch_labels in train_loader:
    print("Batch class distribution:", Counter(batch_labels.tolist()))
    break  # only show first batch
print()

# Define relevant variables for the ML task
batch_size = 4
num_classes = len(class_counts)
learning_rate = 0.001
num_epochs = 80

# Device will determine whether to run the training on GPU or CPU.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
print()

# Creating a CNN class
class ConvNeuralNet(nn.Module):
#  Determine what layers and their order in CNN object
    def __init__(self, num_classes):
        super(ConvNeuralNet, self).__init__()
        self.conv_layer1 = nn.Conv2d(in_channels=19, out_channels=32, kernel_size=3)
        self.conv_layer2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3)
        self.max_pool1 = nn.MaxPool2d(kernel_size = 2, stride = 2)

        self.conv_layer3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.conv_layer4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3)
        self.max_pool2 = nn.MaxPool2d(kernel_size = 2, stride = 2)

        self.fc1 = nn.Linear(64, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    # Progresses data across layers
    def forward(self, x):
        out = self.conv_layer1(x)
#        out = self.conv_layer2(out)
        out = self.max_pool1(out)

        out = self.conv_layer3(out)
#        out = self.conv_layer4(out)
        out = self.max_pool2(out)

        out = out.reshape(out.size(0), -1)

        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

model = ConvNeuralNet(num_classes)

# Set Loss function with criterion
class_weights_tensor = torch.tensor(1. / class_counts, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)

# Set optimizer with optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.005, momentum = 0.9)

total_step = len(train_loader)

for (images, labels) in train_loader:
    print(images.shape, labels)
    break
print()

torch.autograd.set_detect_anomaly(True)

# We use the pre-defined number of epochs to determine how many iterations to train the network on
for epoch in range(num_epochs):
# Load in the data in batches using the train_loader object
    for i, (images, labels) in enumerate(train_loader):
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)

        # Cast labels to torch.long
        labels = labels.to(torch.long)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))

with torch.no_grad():
    dataset_names = ['train', 'val', 'test']
    loaders       = [train_loader, val_loader, test_loader]

    for name, loader in zip(dataset_names, loaders):
        correct, total = 0, 0
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, preds = outputs.max(1)
            total   += labels.size(0)
            correct += (preds == labels).sum().item()

        acc = 100 * correct / total
        print(f'Accuracy on {name:5s}: {acc:5.2f}%')

from sklearn.metrics import confusion_matrix, roc_auc_score
from sklearn.preprocessing import label_binarize
import numpy as np

def evaluate_model(model, dataloader, num_classes, device, class_names=None):
    model.eval()
    y_true = []
    y_pred = []
    y_probs = []

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            _, predicted = torch.max(probs, 1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
            y_probs.extend(probs.cpu().numpy())

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    y_probs = np.array(y_probs)

    print("🔍 Per-Class Accuracy:")
    cm = confusion_matrix(y_true, y_pred)
    per_class_acc = cm.diagonal() / cm.sum(axis=1)
    for i, acc in enumerate(per_class_acc):
        name = class_names[i] if class_names else f"Class {i}"
        print(f"  {name:<20}: Accuracy = {acc:.3f}")

    print("\n📈 Per-Class ROC AUC:")
    try:
        y_true_bin = label_binarize(y_true, classes=list(range(num_classes)))
        for i in range(num_classes):
            auc = roc_auc_score(y_true_bin[:, i], y_probs[:, i])
            name = class_names[i] if class_names else f"Class {i}"
            print(f"  {name:<20}: AUC = {auc:.3f}")
    except ValueError as e:
        print("ROC AUC computation failed:", e)

    print(cm)

evaluate_model(model, test_loader, num_classes=num_classes, device=device)


FileNotFoundError: [Errno 2] No such file or directory: '../LabelData/dataset.npy'

In [2]:
print("Train Class Counts:", np.bincount([lbl for _, lbl in train_ds]))


Train Class Counts: [330 183 111  74  58]
