# PyTorch model to predict spot annotation

In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import anndata as ad
import scanpy as sc
#import squidpy as sq

import torch
from torch import nn
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda
from torch.utils.data import DataLoader, ConcatDataset
from torchvision import transforms

from sklearn.model_selection import KFold
from sklearn.metrics import f1_score

import os

## Create anndata object

In [2]:
counts_df = pd.read_csv("filtered_ST_matrix_and_meta_data/filtered_matrix.tsv.gz", compression="gzip", delimiter="\t")
counts_df = counts_df.transpose()
counts_df.columns = counts_df.iloc[0]
counts_df.drop(counts_df.index[0], inplace = True)
adata = sc.AnnData(counts_df)
meta = pd.read_csv("filtered_ST_matrix_and_meta_data/meta_data.tsv", sep="\t")
adata.obs = meta
adata.obsm["spatial"] = adata.obs[["new_x", "new_y"]].copy().to_numpy()
adata

AnnData object with n_obs × n_vars = 3111 × 39739
    obs: 'nGene', 'nUMI', 'Sample', 'weeks', 'ChipBatch', 'ChipNr', 'Experiment_date', 'Experiment_procedure', 'Sequencing_date', 'Raw_reads', 'new_x', 'new_y', 'percent.mito', 'res.0.8'
    obsm: 'spatial'

In [3]:
adata.obs

Unnamed: 0,nGene,nUMI,Sample,weeks,ChipBatch,ChipNr,Experiment_date,Experiment_procedure,Sequencing_date,Raw_reads,new_x,new_y,percent.mito,res.0.8
1x17x20,666,1046,FH5_1000L3_CN20_C1,5,1000L3,CN20,151105,manual,151117,35432474,16.92,19.99,0.018164,0
1x17x21,891,1361,FH5_1000L3_CN20_C1,5,1000L3,CN20,151105,manual,151117,35432474,17.06,20.95,0.007348,2
1x17x22,1138,1878,FH5_1000L3_CN20_C1,5,1000L3,CN20,151105,manual,151117,35432474,17.02,22.02,0.007455,0
1x18x18,766,1170,FH5_1000L3_CN20_C1,5,1000L3,CN20,151105,manual,151117,35432474,17.99,18.00,0.021368,4
1x18x19,536,820,FH5_1000L3_CN20_C1,5,1000L3,CN20,151105,manual,151117,35432474,17.95,18.99,0.014634,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19x9x11,1133,2330,FH9_1000L3_CN31_E2,9,1000L3,CN31,151105,manual,160111,83186609,8.92,11.08,0.019313,3
19x9x12,865,1599,FH9_1000L3_CN31_E2,9,1000L3,CN31,151105,manual,160111,83186609,8.98,12.01,0.023139,0
19x9x13,1265,2897,FH9_1000L3_CN31_E2,9,1000L3,CN31,151105,manual,160111,83186609,8.88,13.01,0.018985,1
19x9x8,1343,2594,FH9_1000L3_CN31_E2,9,1000L3,CN31,151105,manual,160111,83186609,8.95,8.02,0.014649,3


## Create train and test set
Create sets based on random indices

Training data 80%

Test data 20%

CustomDataset contains two tensors: one tensor from the gene-spot matrix and one tensor containing the labels

In [4]:
class CustomDataset(Dataset):
    def __init__(self, labels, data, transform=None, target_transform=None):
        self.labels = labels
        self.data = data
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # returns the tensor with data and corresponding label in a tuple
        data_point = self.data[idx]
        label = self.labels.iloc[idx]
        if self.transform:
            data_point = self.transform(data_point).float()
        if self.target_transform:
            label = self.target_transform(label)
        return data_point, label

In [5]:
test_indices = np.random.choice(len(adata.X), size=int(len(adata.X)*0.2), replace=False) 
train_indices = np.setdiff1d(np.arange(0, len(adata.X)), test_indices)

training_matrix = adata.X[train_indices,:]
test_matrix = adata.X[test_indices,:]

training_data = CustomDataset(labels = adata.obs["res.0.8"][train_indices], data = training_matrix.astype(float), 
                              transform= torch.from_numpy, target_transform =None)

test_data = CustomDataset(labels = adata.obs["res.0.8"][test_indices], data = test_matrix.astype(float), 
                          transform= torch.from_numpy, target_transform =None)

  training_data = CustomDataset(labels = adata.obs["res.0.8"][train_indices], data = training_matrix.astype(float),
  test_data = CustomDataset(labels = adata.obs["res.0.8"][test_indices], data = test_matrix.astype(float),


## Build the Neural Network

In [6]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(in_features= 39739, out_features=20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [7]:
def train_loop(dataloader, model, loss_fn, optimizer, batch_size):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            #print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    return loss


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    all_predictions = []
    all_labels = []

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

            _, predicted = pred.max(1)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    test_loss /= num_batches
    correct /= size
    
    acc = (100*correct)
    #print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f}")
    
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    return acc, test_loss, f1

## Model training with k-fold cross vallidation

In [8]:
def reset_weights(m):
    '''
    Resetting model weights to avoid
    weight leakage.
    '''
    for layer in m.children():
        if hasattr(layer, 'reset_parameters'):
            print(f'Reset trainable parameters of layer = {layer}')
            layer.reset_parameters()
            
def cross_val_train(dataset, learning_rate):
    print('--------------------------------')
    print("Learning rate: ", learning_rate)
    k_folds = 5
    num_epochs = 5
    batch_size = 64
    loss_function = nn.CrossEntropyLoss()
    best_model_state_dict = None

    # For fold results
    results = {}
    results_f1 = {}

    # Set fixed random number seed
    torch.manual_seed(42)

    # Define the K-fold Cross Validator
    kfold = KFold(n_splits=k_folds, shuffle=True)
    # K-fold Cross Validation model evaluation
    for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):

        # Print
        print(f'\nFOLD {fold}')
        print('--------------------------------')

        # Sample elements randomly from a given list of ids, no replacement.
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
        test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)

        # Define data loaders for training and testing data in this fold
        trainloader = torch.utils.data.DataLoader(
                          dataset, 
                          batch_size=64, sampler=train_subsampler)
        testloader = torch.utils.data.DataLoader(
                          dataset,
                          batch_size=64, sampler=test_subsampler)
        
        # Initialize the model with random parameters
        network = NeuralNetwork().to("cpu")
        network.apply(reset_weights)

        # Initialize optimizer
        optimizer = torch.optim.SGD(network.parameters(), lr=learning_rate)

        # Run the training loop for defined number of epochs
        for epoch in range(0, num_epochs):

            # Iterate over the DataLoader for training data
            train_loop(trainloader, network, loss_function, optimizer, batch_size)

            # Saving the model
            save_path = f'./model-fold-{fold}.pth'
            torch.save(network.state_dict(), save_path)

            # Iterate over the test data and generate predictions
            test_accuracy, test_loss, f1 = test_loop(testloader, network, loss_function)
            results[fold] = test_accuracy
            results_f1[fold] = f1
                
            print(f'Epoch {epoch+1}, Loss: {test_loss}, Accuracy: {test_accuracy}, F1: {f1}')
            
    # Print fold results
    print(f'\nK-FOLD CROSS VALIDATION RESULTS FOR {k_folds} FOLDS')
    print('--------------------------------')
    sum = 0.0
    for key, value in results.items():
        print(f'Fold {key}: {value} %')
        sum += value
    print(f'Average accuracy: {sum/len(results.items())} %')
    
    sum = 0.0
    for key, value in results_f1.items():
        print(f'Fold {key}: {value} %')
        sum += value
    print(f'Average F1: {sum/len(results_f1.items())} %\n')
    
    state_dict = network.state_dict()
    return f1, state_dict

## learning rate optimization

In [9]:
learning_rates = [1e-4, 1e-3, 1e-2]
best_performance = 0.0

# If current model has better performance, update best_model_state_dict
for lr in learning_rates:
    f1, state_dict = cross_val_train(training_data, lr)

    if f1 > best_performance:
        best_lr = lr
        best_performance = f1
        best_model_state_dict = state_dict
print("Best learning rate: ", best_lr)

--------------------------------
Learning rate:  0.0001

FOLD 0
--------------------------------
Reset trainable parameters of layer = Linear(in_features=39739, out_features=20, bias=True)
Reset trainable parameters of layer = Linear(in_features=20, out_features=20, bias=True)
Reset trainable parameters of layer = Linear(in_features=20, out_features=10, bias=True)
Epoch 1, Loss: 2.309708744287491, Accuracy: 1.0445962233828847, F1: 0.030632999797728155
Epoch 2, Loss: 2.2877223193645477, Accuracy: 2.7320208919244675, F1: 0.10556369701262576
Epoch 3, Loss: 2.2757404148578644, Accuracy: 2.611490558457212, F1: 0.11248163556184511
Epoch 4, Loss: 2.2645259499549866, Accuracy: 2.6516673362796306, F1: 0.11662157225588297
Epoch 5, Loss: 2.2544761896133423, Accuracy: 2.571313780634793, F1: 0.11505598529932079

FOLD 1
--------------------------------
Reset trainable parameters of layer = Linear(in_features=39739, out_features=20, bias=True)
Reset trainable parameters of layer = Linear(in_features=

Epoch 1, Loss: 1.997536614537239, Accuracy: 6.267577340297308, F1: 0.19324377971656745
Epoch 2, Loss: 1.56981460750103, Accuracy: 8.035355564483728, F1: 0.33833786794316456
Epoch 3, Loss: 1.7256812751293182, Accuracy: 7.8746484531940535, F1: 0.3123420283383784
Epoch 4, Loss: 1.4585669189691544, Accuracy: 7.955002008838891, F1: 0.3492464948935673
Epoch 5, Loss: 1.6736319810152054, Accuracy: 8.115709120128566, F1: 0.33492452408212664

FOLD 1
--------------------------------
Reset trainable parameters of layer = Linear(in_features=39739, out_features=20, bias=True)
Reset trainable parameters of layer = Linear(in_features=20, out_features=20, bias=True)
Reset trainable parameters of layer = Linear(in_features=20, out_features=10, bias=True)
Epoch 1, Loss: 2.9739395678043365, Accuracy: 5.142627561269586, F1: 0.14770797207418465
Epoch 2, Loss: 1.6975446492433548, Accuracy: 6.106870229007633, F1: 0.21806208403695138
Epoch 3, Loss: 1.7697659581899643, Accuracy: 6.106870229007633, F1: 0.2106786

## Final Model

In [10]:
best_model = NeuralNetwork()
best_model.load_state_dict(best_model_state_dict)

<All keys matched successfully>

## Evaluate the final model on the test set

In [11]:
test_dataloader = torch.utils.data.DataLoader(
                          test_data,
                          batch_size=64)
test_loss, test_accuracy, test_f1 = test_loop(test_dataloader, best_model, nn.CrossEntropyLoss())

print(f"Test Loss: {test_loss:.4f}\nTest Accuracy: {test_accuracy:.2f}%\nTest F1 Score: {test_f1:.4f}")

Test Loss: 37.6206
Test Accuracy: 1.65%
Test F1 Score: 0.3140
