Step 1: Transformer Block \
The TransformerBlock consists of:

Multihead Attention: To capture relationships between tokens in the sequence.

Layer Normalization: To stabilize training.

Feedforward Network: To process the attention output.

Dropout: For regularization.

In [None]:
import torch
import torch.nn as nn

import numpy as np

from torch.utils.data import Dataset, DataLoader, random_split


class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, n_heads, dropout=0.1):
        """
        Args:
            embed_dim: Dimensionality of the input embeddings.
            n_heads: Number of attention heads.
            dropout: Dropout rate for regularization.
        """
        super(TransformerBlock, self).__init__()
        
        # Multihead Attention
        self.multihead_attn = nn.MultiheadAttention(embed_dim, n_heads, dropout=dropout)
        
        # Layer Normalization
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        
        # Feedforward Network
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, 4 * embed_dim),  # Expand dimension
            nn.ReLU(),
            nn.Linear(4 * embed_dim, embed_dim)   # Compress back to original dimension
        )
        
        # Dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (seq_len, batch_size, embed_dim).
        
        Returns:
            Output tensor of shape (seq_len, batch_size, embed_dim).
        """
        # Multihead Attention
        attn_output, _ = self.multihead_attn(x, x, x)  # Self-attention
        x = x + self.dropout(attn_output)              # Residual connection
        x = self.norm1(x)                              # Layer normalization
        
        # Feedforward Network
        ffn_output = self.ffn(x)
        x = x + self.dropout(ffn_output)               # Residual connection
        x = self.norm2(x)                              # Layer normalization
        
        return x

Step 4: Testing the Updated Dataset\
Let's test the updated dataset to ensure the combined embeddings are in the correct shape:

In [5]:

# Load the .npz files
tcr_data = np.load('/home/ubuntu/data/embeddings/beta/gene/TCRPeg_tcr_embeddings.npz')
epitope_data = np.load('/home/ubuntu/data/embeddings/beta/gene/TCRPeg_Epitope_embeddings.npz')

# Print the keys in each file
print("Keys in TCR embeddings file:", list(tcr_data.keys()))
print("Keys in Epitope embeddings file:", list(epitope_data.keys()))

Keys in TCR embeddings file: ['embeddings', 'labels']
Keys in Epitope embeddings file: ['embeddings']


CHECK!!  
\
Step 1: Verify the Size of the Dataset

In [6]:
# Load the TCR and epitope embeddings
tcr_data = np.load('/home/ubuntu/data/embeddings/beta/gene/TCRPeg_tcr_embeddings.npz')
epitope_data = np.load('/home/ubuntu/data/embeddings/beta/gene/TCRPeg_Epitope_embeddings.npz')

tcr_embeddings = tcr_data['embeddings']  # Use the correct key
epitope_embeddings = epitope_data['embeddings']  # Use the correct key

print(f"Number of TCR embeddings: {len(tcr_embeddings)}")
print(f"Number of epitope embeddings: {len(epitope_embeddings)}")

Number of TCR embeddings: 570500
Number of epitope embeddings: 570500


Step 2: Ensure Labels Match the Dataset Size\
If you have labels, ensure the labels array has the same length as the number of TCR/epitope embeddings. If you don't have labels, we can create dummy labels.

In [7]:
# If you have labels, load them here
path_to_labels = '/home/ubuntu/data/embeddings/beta/gene/TCRPeg_tcr_embeddings.npz'
labels_data = np.load(path_to_labels)  # Example: Load labels from a file
labels = labels_data['labels']

# If you don't have labels, create dummy labels
# labels = np.zeros(len(tcr_embeddings))  # Dummy labels (all zeros)

print(f"Number of labels: {len(labels)}")

Number of labels: 570500


Split the dataset into training, validation and test\
This is only a dummy splitting, since for real testing we'll use 
a separate test dataset.

In [None]:
# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))  # 80% training
val_size = len(dataset) - train_size   # 20% validation
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])


Step 3: Update the Dataset Class \
Update the TCR_Epitope_Dataset class to ensure it uses the correct indices and handles the dataset size properly.

In [10]:
class TCR_Epitope_Dataset(Dataset):
    def __init__(self, tcr_embeddings, epitope_embeddings, labels=None):
        """
        Args:
            tcr_embeddings: Array of TCR embeddings.
            epitope_embeddings: Array of epitope embeddings.
            labels: Optional array of labels (1 for binding, 0 for non-binding).
        """
        self.tcr_embeddings = tcr_embeddings
        self.epitope_embeddings = epitope_embeddings
        
        # Ensure the number of TCR and epitope embeddings match
        assert len(self.tcr_embeddings) == len(self.epitope_embeddings), \
            "Number of TCR and epitope embeddings must match!"
        
        # If labels are provided, use them; otherwise, create dummy labels
        self.labels = labels if labels is not None else np.zeros(len(self.tcr_embeddings))
        
    def __len__(self):
        return len(self.tcr_embeddings)
    
    def __getitem__(self, idx):
        """
        Returns:
            combined_embedding: Combined TCR and epitope embeddings of shape (2, 1024).
            label: Binding label (1 or 0).
        """
        tcr_embedding = self.tcr_embeddings[idx]
        epitope_embedding = self.epitope_embeddings[idx]
        label = self.labels[idx]
        
        # Convert to PyTorch tensors
        tcr_embedding = torch.tensor(tcr_embedding, dtype=torch.float32)
        epitope_embedding = torch.tensor(epitope_embedding, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.float32)
        
        # Combine TCR and epitope embeddings along the sequence dimension
        combined_embedding = torch.stack([tcr_embedding, epitope_embedding], dim=0)  # Shape: (2, 1024)
        
        return combined_embedding, label

Step 4: Create the Dataset and DataLoader \
Now, create the dataset and DataLoader with the correct data:

In [11]:
# Create the dataset
dataset = TCR_Epitope_Dataset(tcr_embeddings, epitope_embeddings, labels)

# Inspect the dataset
print(f"Number of samples: {len(dataset)}")
combined_embedding, label = dataset[0]  # Get the first sample
print(f"Combined embedding shape: {combined_embedding.shape}")  # Should be (2, 1024)
print(f"Label: {label}")

# Create a DataLoader
batch_size = 32
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Iterate through the DataLoader
for batch in dataloader:
    combined_embeddings, labels = batch
    print(f"Combined embeddings batch shape: {combined_embeddings.shape}")  # Should be (batch_size, 2, 1024)
    print(f"Labels batch shape: {labels.shape}")  # Should be (batch_size,)
    break  # Stop after the first batch for inspection

Number of samples: 570500
Combined embedding shape: torch.Size([2, 1024])
Label: 1.0
Combined embeddings batch shape: torch.Size([32, 2, 1024])
Labels batch shape: torch.Size([32])


Step 5: Integrate the Transformer Block\
We'll create a model class that:

Takes the combined TCR and epitope embeddings as input.

Passes them through the TransformerBlock.

Flattens the output and passes it through a classifier.

In [12]:


class TCR_Epitope_Model(nn.Module):
    def __init__(self, embed_dim, n_heads, dropout=0.1, classifier_hidden_dim=64):
        """
        Args:
            embed_dim: Dimensionality of the input embeddings (1024 in your case).
            n_heads: Number of attention heads in the transformer.
            dropout: Dropout rate for regularization.
            classifier_hidden_dim: Hidden dimension of the classifier.
        """
        super(TCR_Epitope_Model, self).__init__()
        
        # Transformer Block
        self.transformer_block = TransformerBlock(embed_dim, n_heads, dropout)
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(2 * embed_dim, classifier_hidden_dim),  # Input: flattened transformer output
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(classifier_hidden_dim, 1)  # Output: binary prediction
        )
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, 2, embed_dim).
        
        Returns:
            logits: Output tensor of shape (batch_size, 1).
        """
        # Permute input to (seq_len, batch_size, embed_dim) for the transformer
        x = x.permute(1, 0, 2)  # Shape: (2, batch_size, embed_dim)
        
        # Pass through the transformer block
        x = self.transformer_block(x)  # Shape: (2, batch_size, embed_dim)
        
        # Permute back to (batch_size, seq_len, embed_dim)
        x = x.permute(1, 0, 2)  # Shape: (batch_size, 2, embed_dim)
        
        # Flatten the output
        x = x.reshape(x.size(0), -1)  # Shape: (batch_size, 2 * embed_dim)
        
        # Pass through the classifier
        logits = self.classifier(x)  # Shape: (batch_size, 1)
        
        return logits

Step 6: Test the Model \
Let's test the model with a batch of data to ensure everything works:

In [None]:
# # Initialize the model
# embed_dim = 1024  # Dimensionality of TCR and epitope embeddings
# n_heads = 4       # Number of attention heads
# model = TCR_Epitope_Model(embed_dim, n_heads)

# # Get a batch of data from the DataLoader
# for batch in dataloader:
#     combined_embeddings, labels = batch
#     break  # Stop after the first batch

# # Pass the batch through the model
# logits = model(combined_embeddings)
# print(f"Logits shape: {logits.shape}")  # Should be (batch_size, 1)

Logits shape: torch.Size([32, 1])


we forgot to use GPU. Then...

In [22]:
import torch

# Check if GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("GPU is not available. Using CPU.")

# Move the model to the GPU (if available)
model = model.to(device)

# Move Data to GPU

for batch in dataloader:
    combined_embeddings, labels = batch
    
    # Move data to the GPU (if available)
    combined_embeddings = combined_embeddings.to(device)
    labels = labels.to(device)
    
    # Rest of the training loop...

# Verify GPU Usage

# Check model device
print(f"Model is on: {next(model.parameters()).device}")

# Check data device
print(f"Data is on: {combined_embeddings.device}")

GPU is available: Tesla T4
Model is on: cuda:0
Data is on: cuda:0


Step 7: Add a Training Loop\
Now, let's add a simple training loop to train the model. We'll use binary cross-entropy loss and the Adam optimizer.

In [23]:
import torch.optim as optim

# Initialize the model, loss function, and optimizer
model = TCR_Epitope_Model(embed_dim, n_heads).to(device)  # Move model to GPU
criterion = nn.BCEWithLogitsLoss()  # Binary cross-entropy loss with logits
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    epoch_loss = 0.0
    
    for batch in dataloader:
        combined_embeddings, labels = batch
        
        # Move data to the GPU (if available)
        combined_embeddings = combined_embeddings.to(device)
        labels = labels.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(combined_embeddings).squeeze()  # Shape: (batch_size,)
        
        # Compute loss
        loss = criterion(logits, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Accumulate loss
        epoch_loss += loss.item()
    
    # Print epoch loss
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(dataloader)}")

Epoch 1/5, Loss: 0.30530472857386276
Epoch 2/5, Loss: 0.1719683983865046
Epoch 3/5, Loss: 0.157329112976129
Epoch 4/5, Loss: 0.14955465956043631
Epoch 5/5, Loss: 0.14537664496026595


Step 1: Prepare a Validation Set\

In [None]:


# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))  # 80% training
val_size = len(dataset) - train_size   # 20% validation
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoaders for training and validation
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)


Step 2: Evaluation Function\
We'll write a function to evaluate the model on the validation set. This function will:

Switch the model to evaluation mode.

Disable gradient computation.

Compute predictions and metrics like accuracy, ROC-AUC, and precision-recall.

In [None]:
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score

def evaluate(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():  # Disable gradient computation
        for batch in dataloader:
            combined_embeddings, labels = batch
            
            # Move data to the GPU (if available)
            combined_embeddings = combined_embeddings.to(device)
            labels = labels.to(device)
            
            # Forward pass
            logits = model(combined_embeddings).squeeze()
            predictions = torch.sigmoid(logits)  # Convert logits to probabilities
            
            # Store predictions and labels
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())
    
    # Convert lists to numpy arrays
    all_labels = np.array(all_labels)
    all_predictions = np.array(all_predictions)
    
    # Compute metrics
    accuracy = accuracy_score(all_labels, all_predictions > 0.5)  # Threshold at 0.5
    roc_auc = roc_auc_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions > 0.5, zero_division=0)
    recall = recall_score(all_labels, all_predictions > 0.5, zero_division=0)
    
    return accuracy, roc_auc, precision, recall

Step 3: Evaluate the Model

In [None]:
num_epochs = 5
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    epoch_loss = 0.0
    
    # Training loop
    for batch in train_dataloader:
        combined_embeddings, labels = batch
        
        # Move data to the GPU (if available)
        combined_embeddings = combined_embeddings.to(device)
        labels = labels.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(combined_embeddings).squeeze()
        
        # Compute loss
        loss = criterion(logits, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Accumulate loss
        epoch_loss += loss.item()
    
    # Print training loss
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(train_dataloader)}")
    
    # Evaluate on the validation set
    accuracy, roc_auc, precision, recall = evaluate(model, val_dataloader, device)
    print(f"Validation Metrics - Accuracy: {accuracy:.4f}, ROC-AUC: {roc_auc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

Step 4: Save the Model

In [None]:
# Save the model
torch.save(model.state_dict(), "tcr_epitope_model.pth")

# To load the model later:

# Load the model
model = TCR_Epitope_Model(embed_dim, n_heads).to(device)
model.load_state_dict(torch.load("tcr_epitope_model.pth"))

Let's proceed with hyperparameter tuning, test set evaluation, and visualizations for your Multihead Attention Model. We'll break this down into clear steps. 

Step 1: Hyperparameter Tuning \
Hyperparameter tuning involves finding the best set of hyperparameters (e.g., learning rate, number of attention heads, dropout rate) for your model. We'll use a simple grid search approach.

Key Hyperparameters to Tune:

Learning Rate: Controls the step size during optimization.

Number of Attention Heads: Determines how many parallel attention mechanisms to use.

Dropout Rate: Regularization to prevent overfitting.

Hidden Dimension of the Classifier: Size of the hidden layer in the classifier.

In [None]:
from itertools import product

# Define hyperparameter grid
learning_rates = [1e-3, 1e-4]
n_heads_list = [2, 4, 8]
dropout_rates = [0.1, 0.2]
classifier_hidden_dims = [64, 128]

# Iterate over all combinations
best_roc_auc = 0
best_hyperparams = {}

for lr, n_heads, dropout, hidden_dim in product(learning_rates, n_heads_list, dropout_rates, classifier_hidden_dims):
    print(f"Testing: lr={lr}, n_heads={n_heads}, dropout={dropout}, hidden_dim={hidden_dim}")
    
    # Initialize model with current hyperparameters
    model = TCR_Epitope_Model(embed_dim=1024, n_heads=n_heads, dropout=dropout, classifier_hidden_dim=hidden_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCEWithLogitsLoss()
    
    # Train the model
    for epoch in range(5):  # Short training for hyperparameter tuning
        model.train()
        for batch in train_dataloader:
            combined_embeddings, labels = batch
            combined_embeddings, labels = combined_embeddings.to(device), labels.to(device)
            
            optimizer.zero_grad()
            logits = model(combined_embeddings).squeeze()
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
    
    # Evaluate on the validation set
    accuracy, roc_auc, precision, recall = evaluate(model, val_dataloader, device)
    print(f"Validation Metrics - Accuracy: {accuracy:.4f}, ROC-AUC: {roc_auc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
    
    # Track the best hyperparameters
    if roc_auc > best_roc_auc:
        best_roc_auc = roc_auc
        best_hyperparams = {
            "learning_rate": lr,
            "n_heads": n_heads,
            "dropout": dropout,
            "classifier_hidden_dim": hidden_dim
        }

print(f"Best Hyperparameters: {best_hyperparams}, Best ROC-AUC: {best_roc_auc:.4f}")

Step 2: Test Set Evaluation \
Once you've identified the best hyperparameters, evaluate the model on the test set.

Load the Test Set:

In [None]:
# Load test embeddings (assuming Scenario B: shared embeddings)
test_tcr_embeddings = all_tcr_embeddings[test_tcr_indices]
test_epitope_embeddings = all_epitope_embeddings[test_epitope_indices]

# Create test dataset and DataLoader
test_dataset = TCR_Epitope_Dataset(test_tcr_embeddings, test_epitope_embeddings, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Evaluate on the Test Set:

In [None]:
# Initialize the model with the best hyperparameters
best_model = TCR_Epitope_Model(
    embed_dim=1024,
    n_heads=best_hyperparams["n_heads"],
    dropout=best_hyperparams["dropout"],
    classifier_hidden_dim=best_hyperparams["classifier_hidden_dim"]
).to(device)

# Load the trained weights (if you saved the model)
best_model.load_state_dict(torch.load("best_model.pth"))

# Evaluate on the test set
test_accuracy, test_roc_auc, test_precision, test_recall = evaluate(best_model, test_dataloader, device)
print(f"Test Metrics - Accuracy: {test_accuracy:.4f}, ROC-AUC: {test_roc_auc:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}")

Step 3: Visualizations \
Visualizations help you understand the model's performance. We'll create:

ROC Curve: To visualize the trade-off between true positive rate (TPR) and false positive rate (FPR).

Precision-Recall Curve: To visualize the trade-off between precision and recall.

Confusion Matrix: To show the distribution of predictions.

ROC Curve:


In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

# Get predictions and labels
all_labels = []
all_predictions = []

best_model.eval()
with torch.no_grad():
    for batch in test_dataloader:
        combined_embeddings, labels = batch
        combined_embeddings, labels = combined_embeddings.to(device), labels.to(device)
        
        logits = best_model(combined_embeddings).squeeze()
        predictions = torch.sigmoid(logits)
        
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predictions.cpu().numpy())

# Compute ROC curve
fpr, tpr, thresholds = roc_curve(all_labels, all_predictions)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
from sklearn.metrics import precision_recall_curve

# Compute precision-recall curve
precision, recall, _ = precision_recall_curve(all_labels, all_predictions)

# Plot precision-recall curve
plt.figure()
plt.plot(recall, precision, color='blue', lw=2, label='Precision-Recall curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="lower left")
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Compute confusion matrix
binary_predictions = (np.array(all_predictions) > 0.5).astype(int)
cm = confusion_matrix(all_labels, binary_predictions)

# Plot confusion matrix
plt.figure()
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Not Binding', 'Binding'], yticklabels=['Not Binding', 'Binding'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()