## Purpose
The purpose of this notebook to train a custom CNN model to classify paralyzed vs non-paralyzed vocal cords from ultrasound images.<br>

0 - not paralyzed
1 - paralyzed

This cell contains the imports needed for the program.

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
from PIL import Image
import os
from tqdm import tqdm
import csv
from IPython.display import clear_output
import matplotlib.pyplot as plt

This cell creates a folder to save the models.

In [None]:
try:
    if not os.path.exists('models'):
        os.makedirs('models')
except OSError:
    print('Error creating data directory.')

Image filename meanings:
- Healthy - regular healthy images of vocal cords
- Healthy2 - split in half, and blended back together images of healthy vocal cords, hopefully helps alleviate a potential mdodel issue with artifacts in the synthetic images.
- Leftpar - synthetic image, with the left side of the image stretched 50% vertically, right side is original image. 
- Rightpar - synthetic image, with the right side of the image stretched 50% vertically, left side is original image.

Defining a dataset class for our specific images.

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir  # Store image directory path
        self.transform = transform

        # List all image files in the directory
        self.img_labels = []
        print(f"Looking in directory: {img_dir}")

        # Check if directory exists
        if not os.path.exists(img_dir):
            raise FileNotFoundError(f"Directory {img_dir} not found")

        for file_name in os.listdir(img_dir):
            # Debugging: print file names to see what's being processed
#             print(f"Found file: {file_name}")
            
            # Check if it's a .png file and obtain class from filename
            if file_name.endswith('.png'):  
                label = None
                if 'healthy' in file_name.lower():
                    label = 0  # Healthy
                elif 'healthy2' in file_name.lower():
                    label = 0  # Healthy (modified)
                elif 'leftpar' in file_name.lower():
                    label = 1  # Left vocal cord paralysis
                elif 'rightpar' in file_name.lower():
                    label = 1  # Right vocal cord paralysis

                if label is not None:
                    self.img_labels.append((file_name, label))
                else:
                    print(f"Skipping unknown file: {file_name}")
                
        # Debugging: print number of labels found
        print(f"Found {len(self.img_labels)} .png files in the directory")

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels[idx][0])
        image = Image.open(img_path)  # Load the .png file using PIL
        
        if self.transform:
            image = self.transform(image)
        
        label = self.img_labels[idx][1]  # Extract label (0 for healthy, 1 for paralysis)
        file_name = self.img_labels[idx][0]  # Extract file name
        
        return image, label, file_name

In [None]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize to 256x256
    transforms.Grayscale(num_output_channels=1),  # Ensure single grayscale channel
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize for grayscale images
])

Create the dataset.

In [None]:
# Create dataset
img_dir = '/data/ai_club/team_13_2024-25/VIPR/Data/training_images'
dataset = CustomDataset(img_dir=img_dir, transform=transform)

Split the data and create dataloaders.

In [None]:
# Split the dataset into train and validation sets (80%-20% split)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoader for train and validation sets
batch_size = 64  # You can adjust this batch size according to your needs
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
print(len(dataset))

Setting up the model.

In [None]:
# Define a smaller custom CNN for binary classification
class VIPRnet(nn.Module):
    def __init__(self):
        super(VIPRnet, self).__init__()
        
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # Grayscale (1 channel)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 32 * 32, 128),  # Adjusted for 256x256 input
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1),  # Only 1 output neuron for binary classification
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x  # No sigmoid here; use BCEWithLogitsLoss instead

# Model setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VIPRnet().to(device)

# Loss & optimizer
criterion = nn.BCEWithLogitsLoss()  # For binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

print(model)  # Display model architecture

Train the model, with optional saving and loading of model checkpoints.

In [None]:
# Choose a checkpoint to load if desired:
load_model = False
saved_checkpoint_name = "VIPRnet-30-0.0776.pth"
checkpoint_path = os.path.join('models' ,saved_checkpoint_name)

# Load existing model if checkpoint exists
if load_model and os.path.exists(checkpoint_path):
    print(f"Loading model from {checkpoint_path}...")
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1  # Continue from next epoch
    print(f"Resuming training from epoch {start_epoch}...")
else:
    print('Starting training from scratch.')
    start_epoch = 0  # Start from scratch
    epoch_data_train = []
    epoch_data_val = []
    train_accuracies = []
    val_accuracies = []


# Set desired number of epochs
num_epochs = 50


model.train() # set model to training mode
for epoch in range(start_epoch,num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    
    # TRAINING PHASE
    total_loss_train = 0
    total_correct_train = 0
    total_examples_train = 0
    batch_count = 0
    
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1} - TRAIN", unit="batch")
    for batch_idx, (inputs, labels, _) in progress_bar:
        inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)  # Ensure shape [batch_size, 1]
        optimizer.zero_grad()

        outputs = model(inputs)

        loss_train = criterion(outputs, labels)
        loss_train.backward()
        optimizer.step()

        total_loss_train += loss_train.item()
        predictions_train = torch.round(torch.sigmoid(outputs))
        
        # Count correct predictions and total examples
        total_correct_train += (predictions_train.squeeze() == labels.squeeze()).sum().item()
        total_examples_train += labels.numel()
        batch_count += 1

    average_loss_train = total_loss_train / batch_count
    average_accuracy_train = total_correct_train / total_examples_train
    print(f"TRAINING | Loss: {average_loss_train:.4f}, Accuracy: {average_accuracy_train:.2%}")
    epoch_data_train.append([epoch+1, average_loss_train, average_accuracy_train])
    train_accuracies.append(average_accuracy_train)
    
    # VALIDATION PHASE
    model.eval()
    total_loss_val = 0
    total_correct_val = 0
    total_examples_val = 0
    batch_count = 0
    
    progress_bar_val = tqdm(enumerate(val_loader), total=len(val_loader), desc="VAL", unit="batch")
    for batch_idx, (inputs, labels, _) in progress_bar_val:
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = model(inputs)

        loss_val = criterion(outputs.squeeze(), labels.float().squeeze())
        total_loss_val += loss_val.item()
        
        predictions_val = torch.round(torch.sigmoid(outputs))
        total_correct_val += (predictions_val.squeeze() == labels.squeeze()).sum().item()
        total_examples_val += labels.numel()
        batch_count += 1
        
    average_loss_val = total_loss_val / batch_count
    average_accuracy_val = total_correct_val / total_examples_val
    print(f"VALIDATION | Loss: {average_loss_val:.4f}, Accuracy: {average_accuracy_val:.2%}")
    epoch_data_val.append([epoch+1, average_loss_val, average_accuracy_val])
    val_accuracies.append(average_accuracy_val)
    
    # Optionally save model checkpoint here
    save = True
    checkpoint_path = f'models/VIPRnet_V3--B{batch_size}--E{epoch+1}--L{average_loss_train:.4f}.pth'
    
    if save:
        # Save model checkpoint
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': average_loss_train,
            'train_data': epoch_data_train[-1],
            'val_data': epoch_data_val[-1]
        }, checkpoint_path)
        print(f"Model saved to {checkpoint_path}")
    
    # Switch back to train mode for next epoch
    model.train()

Alternatively, train the model using DataParallel across multiple GPUs (minor speed increase)

In [None]:
# Check for multiple GPUs and set up DataParallel
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs for training...")
    model = nn.DataParallel(model)  # Wrap model for multi-GPU training

model = model.to(device)

# Choose a checkpoint to load if desired:
load_model = False
saved_checkpoint_name = "VIPRnet-30-0.0776.pth"
checkpoint_path = os.path.join('models', saved_checkpoint_name)

# Load existing model if checkpoint exists
if load_model and os.path.exists(checkpoint_path):
    print(f"Loading model from {checkpoint_path}...")
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1  # Continue from next epoch
    print(f"Resuming training from epoch {start_epoch}...")
else:
    print('Starting training from scratch.')
    start_epoch = 0  # Start from scratch
    epoch_data_train = []
    epoch_data_val = []
    train_accuracies = []
    val_accuracies = []

# Set desired number of epochs
num_epochs = 50

# Training loop
model.train()  # Set model to training mode
for epoch in range(start_epoch, num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")

    # TRAINING PHASE
    total_loss_train = 0
    total_correct_train = 0
    total_examples_train = 0
    batch_count = 0
    
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1} - TRAIN", unit="batch")
    for batch_idx, (inputs, labels, _) in progress_bar:
        inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)  # Ensure shape [batch_size, 1]

        optimizer.zero_grad()
        outputs = model(inputs)  # Model runs in parallel across all GPUs

        loss_train = criterion(outputs, labels)
        loss_train.backward()
        optimizer.step()

        total_loss_train += loss_train.item()
        predictions_train = torch.round(torch.sigmoid(outputs))
        
        # Count correct predictions and total examples
        total_correct_train += (predictions_train.squeeze() == labels.squeeze()).sum().item()
        total_examples_train += labels.numel()
        batch_count += 1

    average_loss_train = total_loss_train / batch_count
    average_accuracy_train = total_correct_train / total_examples_train
    print(f"TRAINING | Loss: {average_loss_train:.4f}, Accuracy: {average_accuracy_train:.2%}")
    epoch_data_train.append([epoch+1, average_loss_train, average_accuracy_train])
    train_accuracies.append(average_accuracy_train)

    # VALIDATION PHASE
    model.eval()  # Switch to evaluation mode
    total_loss_val = 0
    total_correct_val = 0
    total_examples_val = 0
    batch_count = 0

    progress_bar_val = tqdm(enumerate(val_loader), total=len(val_loader), desc="VAL", unit="batch")
    with torch.no_grad():  # No gradients for validation
        for batch_idx, (inputs, labels, _) in progress_bar_val:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)

            loss_val = criterion(outputs.squeeze(), labels.float().squeeze())
            total_loss_val += loss_val.item()

            predictions_val = torch.round(torch.sigmoid(outputs))
            total_correct_val += (predictions_val.squeeze() == labels.squeeze()).sum().item()
            total_examples_val += labels.numel()
            batch_count += 1

    average_loss_val = total_loss_val / batch_count
    average_accuracy_val = total_correct_val / total_examples_val
    print(f"VALIDATION | Loss: {average_loss_val:.4f}, Accuracy: {average_accuracy_val:.2%}")
    epoch_data_val.append([epoch+1, average_loss_val, average_accuracy_val])
    val_accuracies.append(average_accuracy_val)

    # Optionally save model checkpoint
    save = True
    checkpoint_path = f'models/ResNet18--B{batch_size}--E{epoch+1}--L{average_loss_train:.4f}.pth'

    if save:
        # Save model checkpoint - use .module when using DataParallel
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.module.state_dict() if isinstance(model, nn.DataParallel) else model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': average_loss_train,
            'train_data': epoch_data_train[-1],
            'val_data': epoch_data_val[-1]
        }, checkpoint_path)
        print(f"Model saved to {checkpoint_path}")

    # Switch back to train mode for next epoch
    model.train()

In [None]:
print(train_accuracies)
print(val_accuracies)
print(num_epochs)
num_epochs_list = []
for i in range(epoch_data_train[-1][0]):
    num_epochs_list.append(i + 1)

In [None]:
plt.plot(num_epochs_list, train_accuracies, label = 'train accuracies')
plt.plot(num_epochs_list, val_accuracies, label = 'val accuracies')
plt.legend()
plt.show()

Test a model checkpoint on validation images from a the validation folder

In [None]:
# Define the model checkpoint path
model_path = 'VIPRnet_V3--B64--E50--L0.0152.pth'
model_path = os.path.join('models', model_path)

# Load saved checkpoint
checkpoint = torch.load(model_path, map_location=device)  # Ensure it loads on the correct device
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()  # Switch to evaluation mode

# Define transformations
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize to 256x256
    transforms.Grayscale(num_output_channels=1),  # Ensure single grayscale channel
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize for grayscale images
])

# Load the image
image_folder = ""  # Replace with your image folder path
image_list = os.listdir(image_folder)
for idx in range(len(image_list)):
    image = Image.open(os.path.join(image_folder,image_list[idx]))
    print('Filename:', image_list[idx])

    # Apply the transformations
    image = transform(image)
    sample_image = transforms.ToPILImage()(image)
    sample_image.show()
    image = image.unsqueeze(0)  # Add a batch dimension (since the model expects a batch of images)

    # Move the image to the same device as the model (GPU or CPU)
    image = image.to(device)

    # Make predictions
    with torch.no_grad():  # Disable gradient calculation for inference
        outputs = model(image)

        # Apply sigmoid to get probabilities, then round to get binary output (0 or 1)
        prediction = torch.round(torch.sigmoid(outputs))

    # Print the predicted label
    predictions = {0: 'Healthy', 1: 'Paralyzed'}
    print(f"Predicted label: {predictions[int(prediction.item())]}\n")  # 0 or 1 based on your binary classification


Get training graphs from folders of checkpoints

In [None]:
def find_balanced_peak_index(list1, list2, weight1=0.5, weight2=0.5):
    """
    Finds the index where the values in both lists are highest in a balanced manner.

    :param list1: First list of numerical values.
    :param list2: Second list of numerical values.
    :param weight1: Weight for the first list in the balancing formula.
    :param weight2: Weight for the second list in the balancing formula.
    :return: The index of the maximum balanced value.
    """
    if len(list1) != len(list2):
        raise ValueError("Both lists must be of the same length.")

    # Compute the balanced score for each index
    balanced_scores = [(weight1 * list1[i] + weight2 * list2[i]) for i in range(len(list1))]

    # Find the index of the maximum balanced score
    max_index = balanced_scores.index(max(balanced_scores))

    return max_index

In [None]:
# Define checkpoint folder
checkpoint_folder = 'VIPRnet_64'
checklist = os.listdir(checkpoint_folder)

# Extract data from saved checkpoints
epochs = []
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
for checkpoint_path in checklist:
    if checkpoint_path == '.ipynb_checkpoints':
        continue  # skip checkpoints
    checkpoint = torch.load(os.path.join(checkpoint_folder,checkpoint_path), map_location=torch.device('cpu'))
    epochs.append(checkpoint['epoch'] + 1)
#     train_losses.append(float((checkpoint_path.split('--')[-1])[1:-4]))
    train_losses.append(checkpoint['train_data'][1])
    train_accuracies.append(checkpoint['train_data'][2])
    val_losses.append(checkpoint['val_data'][1])
    val_accuracies.append(checkpoint['val_data'][2])
    
# Convert lists to numpy arrays and sort
epochs = np.array(epochs)
train_losses = np.array(train_losses)
train_accuracies = np.array(train_accuracies)
val_losses = np.array(val_losses)
val_accuracies = np.array(val_accuracies)

# Sort all by epoch number
sorted_indices = np.argsort(epochs)
epochs = epochs[sorted_indices]
train_losses = train_losses[sorted_indices]
train_accuracies = train_accuracies[sorted_indices]
val_losses = val_losses[sorted_indices]
val_accuracies = val_accuracies[sorted_indices]

# Plot Training & Validation Loss
plt.figure(figsize=(10, 5))
plt.plot(epochs, train_losses, label='Training Loss', marker='o')
plt.plot(epochs, val_losses, label='Validation Loss', marker='s')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training & Validation Loss')
plt.legend()
plt.grid()
plt.savefig('VIPRnet_64_loss.png')
plt.show()


# Plot Training & Validation Accuracy
plt.figure(figsize=(10, 5))
plt.plot(epochs, train_accuracies, label='Training Accuracy', marker='o')
plt.plot(epochs, val_accuracies, label='Validation Accuracy', marker='s')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training & Validation Accuracy')
plt.legend()
plt.grid()
plt.savefig('VIPRnet_64_accuracy.png')
plt.show()


highest_accuracy_idx = find_balanced_peak_index(list(train_accuracies), list(val_accuracies))
best_model = checklist[highest_accuracy_idx]
print(f'Highest scoring model is from epoch {epochs[highest_accuracy_idx]}. Path: {checklist[highest_accuracy_idx]}')