In [1]:
import numpy as np
import scanpy as sc
import squidpy as sq
from tqdm.auto import tqdm
from torch.utils.data import Dataset, random_split, DataLoader
import torch.nn as nn
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import r2_score


	geopandas.options.use_pygeos = True

If you intended to use PyGEOS, set the option to False.
  _check_geopandas_using_shapely()


In [3]:
import os
os.getcwd()

'C:\\Users\\lg\\PycharmProjects\\spatial_atlas_ssl\\notebooks'

In [3]:
adata = sc.read("C:\\Users\\lg\\PycharmProjects\\spatial_atlas_ssl\\data\\img_119670929.h5ad")

In [4]:
# now we have the adata object of just a single image
sq.gr.spatial_neighbors(adata=adata, radius=1000, key_added="adjacency_matrix", coord_type="generic")

In [29]:
# function to get k lowest values from each row of a sparse matrix
def get_k_lowest_values(matrix, k):
    n_rows = matrix.shape[0]
    k_lowest_indices = np.empty((n_rows, k), dtype=int)
    for i in range(n_rows):
        start = matrix.indptr[i]
        end = matrix.indptr[i + 1]
        row_data = matrix.data[start:end]
        row_indices = matrix.indices[start:end]
        k_smallest_indices = np.argpartition(row_data, k)[:k]
        k_lowest_indices[i] = row_indices[k_smallest_indices]
    return k_lowest_indices

num_nearest = 10
closest_matrix = get_k_lowest_values(adata.obsp['adjacency_matrix_distances'], num_nearest)

In [30]:
# we construct dataset using closest 5 cells

X = []
y = []

for i, cell in tqdm(enumerate(adata.X), total=len(adata)):
    y.append(cell.toarray())
    five_closest_cells = np.array([adata.X[index].toarray() for index in closest_matrix[i]])
    X.append(five_closest_cells.flatten())

X = np.array(X)
y = np.concatenate(y)

  0%|          | 0/26230 [00:00<?, ?it/s]

In [31]:
#X = np.concatenate(X)
print(y.shape, X.shape)

(26230, 550) (26230, 5500)


In [32]:
# we use 80% of the data for training and 10% for validation and 10% for testing
# Create a custom dataset
class MyDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create the dataset and split it into training, validation, and testing sets
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
dataset = MyDataset(X, y)
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])

In [33]:
# Create data loaders
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
val_loader = DataLoader(val_set, batch_size=64, shuffle=False)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False)

In [34]:
# improve model structure with 2 hidden layer



class LinearModel(nn.Module):
    def __init__(self, input_dim, output_dim, num_nearest):
        super(LinearModel, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim*num_nearest,input_dim*(num_nearest-2)),
            nn.ReLU(),
            nn.Linear(input_dim*(num_nearest-2) , input_dim*(num_nearest-4)),
            nn.ReLU(),
            nn.Linear(input_dim*(num_nearest-4), output_dim)
        )

    def forward(self, x):
        return self.model(x)

In [36]:
# Set device for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create an instance of the model and move it to the device
input_dim = output_dim = y.shape[1]
#output_dim = y.shape[1]

model = LinearModel(input_dim, output_dim,num_nearest).to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training loop
num_epochs = 10
train_losses = []
val_losses = []
train_r2_scores = []
val_r2_scores = []

best_val_loss = float('inf')
patience = 20  # Number of epochs to wait for improvement in validation loss

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0
    targets_list = []
    outputs_list = []

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute the loss
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        targets_list.append(targets.cpu().numpy())
        outputs_list.append(outputs.cpu().detach().numpy())

    # Compute and store the average training loss for this epoch
    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # Calculate R2 score for the training set
    train_targets_all = np.concatenate(targets_list)
    train_outputs_all = np.concatenate(outputs_list)
    train_r2 = r2_score(train_targets_all, train_outputs_all)
    train_r2_scores.append(train_r2)

    # Validation loop
    model.eval()  # Set the model to evaluation mode
    total_val_loss = 0
    val_targets_list = []
    val_outputs_list = []

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass
            outputs = model(inputs)

            # Compute the validation loss
            val_loss = criterion(outputs, targets)
            total_val_loss += val_loss.item()

            val_targets_list.append(targets.cpu().numpy())
            val_outputs_list.append(outputs.cpu().detach().numpy())

        # Compute and store the average validation loss for this epoch
        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        # Calculate R2 score for the validation set
        val_targets_all = np.concatenate(val_targets_list)
        val_outputs_all = np.concatenate(val_outputs_list)
        val_r2 = r2_score(val_targets_all, val_outputs_all)
        val_r2_scores.append(val_r2)

        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Train R2: {train_r2:.4f}, Val Loss: {avg_val_loss:.4f}, Val R2: {val_r2:.4f}")


    # early stopping with patience
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), "../models/best_model_1.pt")
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve == patience:
            print(f"Early stopping after epoch {epoch + 1}")
            break


Epoch 1/10, Train Loss: 0.5094, Train R2: 0.1001, Val Loss: 0.4899, Val R2: 0.1284
Epoch 2/10, Train Loss: 0.4723, Train R2: 0.1513, Val Loss: 0.4844, Val R2: 0.1363
Epoch 3/10, Train Loss: 0.4492, Train R2: 0.1792, Val Loss: 0.4908, Val R2: 0.1356
Epoch 4/10, Train Loss: 0.4198, Train R2: 0.2121, Val Loss: 0.5057, Val R2: 0.1178
Epoch 5/10, Train Loss: 0.3806, Train R2: 0.2535, Val Loss: 0.5161, Val R2: 0.1142
Epoch 6/10, Train Loss: 0.3416, Train R2: 0.2945, Val Loss: 0.5128, Val R2: 0.1158
Epoch 7/10, Train Loss: 0.3115, Train R2: 0.3263, Val Loss: 0.5165, Val R2: 0.1121
Epoch 8/10, Train Loss: 0.2890, Train R2: 0.3517, Val Loss: 0.5228, Val R2: 0.1035
Epoch 9/10, Train Loss: 0.2715, Train R2: 0.3738, Val Loss: 0.5232, Val R2: 0.1022
Epoch 10/10, Train Loss: 0.2561, Train R2: 0.3950, Val Loss: 0.5272, Val R2: 0.0945


In [37]:
# evaluate performance on test loader
model.load_state_dict(torch.load("../models/best_model_1.pt"))
model.eval()
test_targets_list = []
test_outputs_list = []

# loop over test loader and save r2 and loss
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass
        outputs = model(inputs)

        test_targets_list.append(targets.cpu().numpy())
        test_outputs_list.append(outputs.cpu().detach().numpy())

test_targets_all = np.concatenate(test_targets_list)
test_outputs_all = np.concatenate(test_outputs_list)
test_r2 = r2_score(test_targets_all, test_outputs_all)
test_loss = criterion(torch.from_numpy(test_outputs_all), torch.from_numpy(test_targets_all)).item()
print(f"Test Loss: {test_loss:.4f}, Test R2: {test_r2:.4f}")

Test Loss: 0.4862, Test R2: 0.1388


In [None]:
# improved model structure with 2 hidden layers

class LinearModel2(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LinearModel, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(5*550, 3*550),
            nn.ReLU(),
            nn.Linear(3*550, 2*550),
            nn.ReLU(),
            nn.Linear(2*550, output_dim)
        )

    def forward(self, x):
        return self.model(x)

In [17]:
# create dataset using connectivity matrix of 5 closest cells and expression of 5 closest cells

X = []
y = []

for i, cell in tqdm(enumerate(adata.X), total=len(adata)):
    y.append(cell.toarray())
    five_closest_cells = np.array([adata.X[index].toarray() for index in closest_matrix[i]])
    X.append(five_closest_cells.flatten())

X = np.array(X)
y = np.concatenate(y)

  0%|          | 0/26230 [00:00<?, ?it/s]

In [18]:
# print shape
print(y.shape, X.shape)

(26230, 550) (26230, 2750)


In [None]:
# using connectivity matrix of 5 closest cells and expression of 5 closest cells with torch geometric

import torch
from torch_geometric.data import Data


In [None]:
# create dataset using connectivity matrix of 5 closest cells and expression of 5 closest cells

X = []
y = []

for i, cell in tqdm(enumerate(adata.X), total=len(adata)):
    y.append(cell.toarray())
    five_closest_cells = np.array([adata.X[index].toarray() for index in closest_matrix[i]])
    X.append(five_closest_cells.flatten())

X = np.array(X)
y = np.concatenate(y)

In [None]:
# print shape
print(y.shape, X.shape)

In [None]:
#get the connectivity matrix
connectivity_matrix = adata.obsp['adjacency_matrix']