# ML for biological data analysis - practical examples

## 1. Image classification using convolutional neural networks
We will use images of micropatterned organoids with healthy (WT) and diseased background (Huntington's disease, HD), from [Metzger et al. 2022](https://www.sciencedirect.com/science/article/pii/S2667237522001795?via%3Dihub). We will use a much reduced dataset (about 100 images per condition) and downsampled images to increase computational speed. Usually, datasets for deep learning are much larger. This first part is mostly an adaptation of [PyTorch's Transfer Learning for Computer Vision Tutorial](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html).


In [None]:
import pandas as pd
import torch 
from glob import glob
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
from tempfile import TemporaryDirectory

from sklearn.metrics import confusion_matrix
import seaborn as sns
import umap.umap_ as umap

Prepare training ('train') and validation ('val') sets, and prepare data augmentation for training. Here, we will not do data augementation for validation. Are there cases where it could make sense to use data augmentation also for validation?

In [None]:
# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = 'data/'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Display some example images for the two conditions, WT and HD.

In [None]:
def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

Definde the training function.

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    # Create a temporary directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)

            print()

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path, weights_only=True))
    return model

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')

                # Set title to show both predicted and true labels
                ax.set_title(f'predicted: {class_names[preds[j]]}\ntrue: {class_names[labels[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)


In [None]:
model_ft = models.resnet18(weights='IMAGENET1K_V1')
num_ftrs = model_ft.fc.in_features

model_ft.fc = nn.Linear(num_ftrs, 2) # Here the size of each output sample is set to 2, because we have two conditions, WT and HD

model_ft = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)


We can now run the model. How many epochs should we run it for?

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=4) 

Visualize the predictions of the model

In [None]:
visualize_model(model_ft)

To evaluate the model performance, we can plot a confusion matrix. 

In [None]:
def evaluate_model_and_collect_predictions(model, dataloaders):
    model.eval()  # Set the model to evaluation mode
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloaders['val']:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            # Append predictions and true labels
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return np.array(all_preds), np.array(all_labels)

In [None]:
def plot_confusion_matrix(y_true, y_pred, class_names):
    # Generate confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Plot the confusion matrix
    plt.figure(figsize=(4, 3))
    sns.heatmap(cm, annot=True, fmt='d', cmap='crest_r', xticklabels=class_names, yticklabels=class_names, linewidth=.5)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.title('Confusion Matrix')


In [None]:
# Assuming you have your model, dataloaders, and class_names defined

# Collect predictions and true labels
preds, true_labels = evaluate_model_and_collect_predictions(model_ft, dataloaders)

In [None]:
# Plot the confusion matrix
plot_confusion_matrix(true_labels, preds, class_names)

What other evaluation metrics can you think of? Are you satisfied with the results? What could be improved?

## 2. Unsupervised analysis using PCA, UMAP and autoencoders
We will now use unsupervised learning on the same images of micropatterned organoids. Why would we do this? What can we learn from this? What can we learn from this that we couldn't learn using the classification approach?


Both PCA and UMAP operate on vectors (i.e. 1-dimensional data), not 2-dimensional data such as the images we have here. We therefore need to flatten the images to one long 1-D vector. What do we lose in this process? 

In [None]:
def flatten_images(dataloader):
    all_flattened_images = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloaders['val']:
            # Move images to CPU and flatten them
            flattened = inputs.view(inputs.size(0), -1).cpu().numpy()  # Flatten each image
            all_flattened_images.append(flattened)
            all_labels.append(labels.cpu().numpy())

    # Concatenate all batches
    all_flattened_images = np.concatenate(all_flattened_images, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)

    return all_flattened_images, all_labels


In [None]:
from sklearn.decomposition import PCA
import umap
import matplotlib.pyplot as plt

def apply_pca_umap(flattened_images, labels, class_names):
    # Apply PCA
    pca = PCA(n_components=2)
    pca_result = pca.fit_transform(flattened_images)

    # Apply UMAP
    # reducer = umap.UMAP(random_state=42)

    umap_result = umap.UMAP(n_components=2).fit_transform(flattened_images)

    # Plot PCA result
    plt.figure(figsize=(8, 4))

    plt.subplot(1, 2, 1)
    for class_idx, class_name in enumerate(class_names):
        plt.scatter(pca_result[labels == class_idx, 0], pca_result[labels == class_idx, 1], 
                    label=class_name, alpha=0.7)
    plt.title('PCA')
    plt.legend()
    sns.despine()
    
    # Plot UMAP result
    plt.subplot(1, 2, 2)
    for class_idx, class_name in enumerate(class_names):
        plt.scatter(umap_result[labels == class_idx, 0], umap_result[labels == class_idx, 1], 
                    label=class_name, alpha=0.7)
    plt.title('UMAP')
    plt.legend()
    sns.despine()


In [None]:
# Step 1: Flatten the images
flattened_images, image_labels = flatten_images(dataloaders)

In [None]:
# Step 2: Apply PCA and UMAP and visualize the result
apply_pca_umap(flattened_images, image_labels, class_names)

In the final step, we will use a convolutional autoencoder to analyze the images. Why would we do this? Do you expect this to work better or worse than PCA/UMAP? Carefully inspect the code for the autoencoder and check that you understand the basic structure. Why does the forward function return two outputs (encoded, decoded)?

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, stride=2, padding=1),  # Input: (3, 64, 64), Output: (16, 32, 32)
            nn.ReLU(),
            nn.Conv2d(16, 32, 3, stride=2, padding=1), # Output: (32, 16, 16)
            nn.ReLU(),
            nn.Conv2d(32, 64, 7)                      # Output: (64, 10, 10)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 7),            # Output: (32, 16, 16)
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, 3, stride=2, padding=1, output_padding=1), # Output: (16, 32, 32)
            nn.ReLU(),
            nn.ConvTranspose2d(16, 3, 3, stride=2, padding=1, output_padding=1),  # Output: (3, 64, 64)
            nn.Sigmoid()  # Use Sigmoid if the images are normalized to [0, 1]
        )

    def forward(self, x):
        encoded = self.encoder(x)  # Forward pass through the encoder
        decoded = self.decoder(encoded)  # Forward pass through the decoder
        return encoded, decoded  # Return both encoded (latent) and decoded (reconstructed) outputs


In [None]:
# Loss function and optimizer
autoencoder = ConvAutoencoder().to(device)
criterion = nn.MSELoss()  # Mean Squared Error for reconstruction
optimizer = optim.Adam(autoencoder.parameters(), lr=1e-3)

# Training function that returns the loss history
def train_autoencoder(model, dataloader, num_epochs=20):
    model.train()
    train_losses = []  # List to store the loss after each epoch
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            
            # Zero the gradients
            optimizer.zero_grad()
            
            # Forward pass
            encoded, decoded = model(inputs)
            
            # Compute the loss
            loss = criterion(decoded, inputs)
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Compute the average loss for this epoch
        avg_loss = running_loss / len(dataloader)
        train_losses.append(avg_loss)  # Append the average loss to the list
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss}')
    
    # Return the loss history
    return train_losses

In [None]:
# Train the model
train_losses = train_autoencoder(autoencoder, dataloaders['train'], num_epochs=5)

Let's plot the loss function (i.e. how well the network learns over the epchs). What do you observe? Are you satisfied with the results? How could they be improved?

In [None]:
# Plot function
def plot_loss(train_losses):
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')    
    plt.legend()

# Plot the loss
plot_loss(train_losses)

To cluster the results, we will get the latent space of the autoencoder for each images of the validation set. What is the latent space and why is it useful?

In [None]:
def get_latent_space(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    latent_vectors = []
    all_labels = []

    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data
            inputs = inputs.to(device)
            encoded, _ = model(inputs)
            
            # Flatten the latent space for clustering (if needed)
            latent_vectors.append(encoded.view(encoded.size(0), -1).cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    
    return np.concatenate(latent_vectors, axis=0), np.concatenate(all_labels, axis=0)


In [None]:
from sklearn.decomposition import PCA
import umap.umap_ as umap
import matplotlib.pyplot as plt

# PCA on Latent Space
pca = PCA(n_components=2)
pca_result = pca.fit_transform(latent_vectors)

# UMAP on Latent Space
umap_result = umap.UMAP(n_components=2).fit_transform(latent_vectors)

# Plot PCA and UMAP results
plt.figure(figsize=(8, 4))

# PCA plot
plt.subplot(1, 2, 1)
plt.scatter(pca_result[:, 0], pca_result[:, 1], c=true_labels, cmap='viridis', s=10)
plt.title('PCA on Latent Space')
sns.despine()

# UMAP plot
plt.subplot(1, 2, 2)
plt.scatter(umap_result[:, 0], umap_result[:, 1], c=true_labels, cmap='viridis', s=10)
plt.title('UMAP on Latent Space')
sns.despine()



How do the autoencoder results compare to PCA and UMAP done on the flattened images? What are your conclusions about the different approaches? Which one would you focus on going forward? How could this be improved further?

How are the results from the second part (PCA, UMAP, autoencoder) different to the first (classifcation using a convolutional neural network)? Which approach would be useful for which scenario? Discuss!