In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from transformers import pipeline, T5Tokenizer, T5EncoderModel
import torch
import re
import numpy as np
import sys

## Negative Samples Creation w/ Embeddings and Coisne Similarity

Here we chose the approach to create embeddings from the prot_t5_xl model and then check if the embedding is in a certain "un-similarity range" which is a self-defined threshold. This becuase evaluations showed that this is (at least as far as we consider) the "best" approach.

In [None]:
if not 'precision' in locals():
  precision = "gene" # gene or allele

if not 'read_path_train' in locals():
  read_path_train = f"../../data/splitted_data/{precision}/beta/train.tsv"

if not 'read_path_validation' in locals():
  read_path_validation = f"../../data/splitted_data/{precision}/beta/validation.tsv"

if not 'read_path_test' in locals():
  read_path_test = f"../../data/splitted_data/{precision}/beta/test.tsv"

if not 'temp_path' in locals():
  temp_path = "../../data/customDatasets/negative_samples/temp/"

if not 'output_path' in locals():
  output_path = f"../../data/customDatasets/negative_samples/{precision}/"

if not 'train_output_name' in locals():
  train_output_name = "beta_train_concatenated_with_negative.tsv"

if not 'validation_output_name' in locals():
  validation_output_name = "beta_validation_concatenated_with_negative.tsv"

if not 'test_output_name' in locals():
  test_output_name = "beta_test_concatenated_with_negative.tsv"

beta_train_df = pd.read_csv(read_path_train, sep="\t")
beta_validation_df = pd.read_csv(read_path_validation, sep="\t")
beta_test_df = pd.read_csv(read_path_test, sep="\t")

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("Using device: {}".format(device))

In [None]:

#@title Load encoder-part of ProtT5 in half-precision. { display-mode: "form" }
# Load ProtT5 in half-precision (more specifically: the encoder-part of ProtT5-XL-U50 in half-precision)
transformer_link = "Rostlab/prot_t5_xl_half_uniref50-enc"
print("Loading: {}".format(transformer_link))
model = T5EncoderModel.from_pretrained(transformer_link)
if device==torch.device("cpu"):
  print("Casting model to full precision for running on CPU ...")
  model.to(torch.float32) # only cast to full-precision if no GPU is available
model = model.to(device)
model = model.eval()
tokenizer = T5Tokenizer.from_pretrained(transformer_link, do_lower_case=False, legacy=True)


In [None]:
epitopes_train = set(beta_train_df["Epitope"].to_list())
epitopes_validation = set(beta_validation_df["Epitope"].to_list())
epitopes_test = set(beta_test_df["Epitope"].to_list())

In [None]:
# this will replace all rare/ambiguous amino acids by X and introduce white-space between all amino acids
processed_train_epitopes = [(sequence, " ".join(list(re.sub(r"[UZOB]", "X", sequence)))) for sequence in epitopes_train]
processed_validation_epitopes = [(sequence, " ".join(list(re.sub(r"[UZOB]", "X", sequence)))) for sequence in epitopes_validation]
processed_test_epitopes = [(sequence, " ".join(list(re.sub(r"[UZOB]", "X", sequence)))) for sequence in epitopes_test]
# processed_epitopes

In [None]:
def process_batch(processed_seqs):
    # Extract just the processed sequences for tokenization
    sequences = [seq[1] for seq in processed_seqs]
    ids = tokenizer.batch_encode_plus(sequences, add_special_tokens=True, padding="longest", return_tensors="pt")
    input_ids = ids['input_ids'].to(device)
    attention_mask = ids['attention_mask'].to(device)
    
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    
    last_hidden_states = outputs.last_hidden_state
    
    # Now, return embeddings mapped to the original sequence
    embeddings = {}
    for i, (original_seq, _) in enumerate(processed_seqs):
        seq_len = len(original_seq)
        valid_embeddings = last_hidden_states[i,:seq_len]
        per_protein_embedding = valid_embeddings.mean(dim=0)        
        embedding = per_protein_embedding.cpu().numpy()
        embeddings[original_seq] = embedding  # Use original sequence as key

    return embeddings

In [None]:
file_train_name = "negative_train_samples_beta_embeddings_dict.npz"
file_validation_name = "negative_validation_samples_beta_embeddings_dict.npz"
file_test_name = "negative_test_samples_beta_embeddings_dict.npz"

In [None]:

batch_size = 128
sequence_train_to_embedding = {}
sequence_validation_to_embedding = {}
sequence_test_to_embedding = {}

# Batch processing with a dictionary, using original sequences as keys
for i in range(0, len(processed_train_epitopes), batch_size):
    batch_sequences = processed_train_epitopes[i:i+batch_size]
    batch_embeddings = process_batch(batch_sequences)
    sequence_train_to_embedding.update(batch_embeddings)

for i in range(0, len(processed_validation_epitopes), batch_size):
    batch_sequences = processed_validation_epitopes[i:i+batch_size]
    batch_embeddings = process_batch(batch_sequences)
    sequence_validation_to_embedding.update(batch_embeddings)

for i in range(0, len(processed_test_epitopes), batch_size):
    batch_sequences = processed_test_epitopes[i:i+batch_size]
    batch_embeddings = process_batch(batch_sequences)
    sequence_test_to_embedding.update(batch_embeddings)


In [None]:
np.savez(temp_path+file_train_name, **sequence_train_to_embedding)
np.savez(temp_path+file_validation_name, **sequence_validation_to_embedding)
np.savez(temp_path+file_test_name, **sequence_test_to_embedding)

In [None]:
epitope_train_to_embedding = np.load(temp_path+file_train_name)
epitope_validation_to_embedding = np.load(temp_path+file_validation_name)
epitope_test_to_embedding = np.load(temp_path+file_test_name)

In [None]:
max_train_index = len(beta_train_df) - 1 
max_validation_index = len(beta_validation_df) - 1 
max_test_index = len(beta_test_df) - 1 
negative_train_epitopes_cosine = []
negative_validation_epitopes_cosine = []
negative_test_epitopes_cosine = []

In [None]:
def cosine_similarity(embedding1, embedding2): 
    cosine = np.dot(embedding1,embedding2)/(np.linalg.norm(embedding1)*np.linalg.norm(embedding2))
    return cosine

In [None]:
def is_valid_negative(cosine_similarity, current_epitope, random_epitope): 
    is_valid = False
    cosine_min = -1
    cosine_max = 0.75

    if (cosine_similarity >= cosine_min \
        and cosine_similarity <= cosine_max) \
        and (current_epitope != random_epitope): 
        is_valid = True 

    return is_valid


In [None]:
sys_max_depth = sys.getrecursionlimit()
max_attempts_by_system = sys_max_depth - 1

In [None]:
np.random.seed(42) 

In [None]:
def search_negative_epitope_embedding(df, index, current_epitope, epitope_to_embedding, max_attempts=max_attempts_by_system): 
    current_epitope = df["Epitope"][index]
    current_embedding = epitope_to_embedding[current_epitope]
    attempt = 0
    
    while attempt < max_attempts:
        random_epitope_index = np.random.randint(0, len(df))
        random_epitope = df["Epitope"][random_epitope_index]
        random_mhc = df["MHC"][random_epitope_index]
        
        if random_epitope_index == index:
            attempt += 1
            continue  # Skip the rest of the loop and try again
        
        random_epitope_embedding = epitope_to_embedding[random_epitope]
        cosine = cosine_similarity(current_embedding, random_epitope_embedding)
        
        if is_valid_negative(cosine, current_epitope, random_epitope) or attempt == max_attempts - 1:
            return (random_epitope, random_mhc)  # Return the found valid or last attempt epitope
        
        attempt += 1
    
    # This point should theoretically never be reached because of the check in the loop,
    # but it's a fallback to return a random different epitope if for some reason it does.
    while True:
        random_epitope_index = np.random.randint(0, len(df))
        if random_epitope_index != index:
            return df["Epitope"][random_epitope_index]


In [None]:
for i, epitope in enumerate(beta_train_df["Epitope"]):
    negative_train_epitope = search_negative_epitope_embedding(beta_train_df, i, epitope, epitope_train_to_embedding)
    negative_train_epitopes_cosine.append(negative_train_epitope)

for i, epitope in enumerate(beta_validation_df["Epitope"]):
    negative_validation_epitope = search_negative_epitope_embedding(beta_validation_df, i, epitope, epitope_validation_to_embedding)
    negative_validation_epitopes_cosine.append(negative_validation_epitope)

for i, epitope in enumerate(beta_test_df["Epitope"]):
    negative_test_epitope = search_negative_epitope_embedding(beta_test_df, i, epitope, epitope_test_to_embedding)
    negative_test_epitopes_cosine.append(negative_test_epitope)

In [None]:
epitopes_train = []
epitopes_validation = []
epitopes_test = []
mhc_train = []
mhc_validation = []
mhc_test = []

In [None]:
for row_infos in negative_train_epitopes_cosine:
    epitopes_train.append(row_infos[0]) 
    mhc_train.append(row_infos[1])
    
for row_infos in negative_validation_epitopes_cosine:
    epitopes_validation.append(row_infos[0]) 
    mhc_validation.append(row_infos[1])

for row_infos in negative_test_epitopes_cosine:
    epitopes_test.append(row_infos[0]) 
    mhc_test.append(row_infos[1])

In [None]:
negative_train_epitopes_cosine_dict = {"Negative Epitope": epitopes_train, "MHC": mhc_train}
negative_train_epitopes_cosine_df = pd.DataFrame(negative_train_epitopes_cosine_dict)

negative_validation_epitopes_cosine_dict = {"Negative Epitope": epitopes_validation, "MHC": mhc_validation}
negative_validation_epitopes_cosine_df = pd.DataFrame(negative_validation_epitopes_cosine_dict)

negative_test_epitopes_cosine_dict = {"Negative Epitope": epitopes_test, "MHC": mhc_test}
negative_test_epitopes_cosine_df = pd.DataFrame(negative_test_epitopes_cosine_dict)

In [None]:
beta_train_negative_epitope_df = beta_train_df.drop(["MHC"], axis=1).copy(deep=True)
beta_train_negative_epitope_df["Epitope"] = epitopes_train
beta_train_negative_epitope_df["MHC"] = mhc_train
beta_train_negative_epitope_df["Binding"] = 0
beta_train_negative_epitope_df

beta_validation_negative_epitope_df = beta_validation_df.drop(["MHC"], axis=1).copy(deep=True)
beta_validation_negative_epitope_df["Epitope"] = epitopes_validation
beta_validation_negative_epitope_df["MHC"] = mhc_validation
beta_validation_negative_epitope_df["Binding"] = 0
beta_validation_negative_epitope_df

beta_test_negative_epitope_df = beta_test_df.drop(["MHC"], axis=1).copy(deep=True)
beta_test_negative_epitope_df["Epitope"] = epitopes_test
beta_test_negative_epitope_df["MHC"] = mhc_test
beta_test_negative_epitope_df["Binding"] = 0
beta_test_negative_epitope_df


In [None]:
beta_train_with_negative_df = pd.concat([beta_train_df.copy(deep=True), beta_train_negative_epitope_df], axis=0)
beta_train_with_negative_df

beta_validation_with_negative_df = pd.concat([beta_validation_df.copy(deep=True), beta_validation_negative_epitope_df], axis=0)
beta_validation_with_negative_df

beta_test_with_negative_df = pd.concat([beta_test_df.copy(deep=True), beta_test_negative_epitope_df], axis=0)
beta_test_with_negative_df


In [None]:
columns_to_ignore_for_duplicates = beta_train_with_negative_df.columns.difference(["TCR_name", "Binding"])
beta_train_with_negative_df.drop_duplicates(inplace=True, subset=columns_to_ignore_for_duplicates, keep="first")
beta_train_with_negative_df["TCR_name"] = range(1, len(beta_train_with_negative_df)+1)
beta_train_with_negative_df.reset_index(drop=True, inplace=True)
beta_train_with_negative_df

columns_to_ignore_for_duplicates = beta_validation_with_negative_df.columns.difference(["TCR_name", "Binding"])
beta_validation_with_negative_df.drop_duplicates(inplace=True, subset=columns_to_ignore_for_duplicates, keep="first")
beta_validation_with_negative_df["TCR_name"] = range(1, len(beta_validation_with_negative_df)+1)
beta_validation_with_negative_df.reset_index(drop=True, inplace=True)
beta_validation_with_negative_df

columns_to_ignore_for_duplicates = beta_test_with_negative_df.columns.difference(["TCR_name", "Binding"])
beta_test_with_negative_df.drop_duplicates(inplace=True, subset=columns_to_ignore_for_duplicates, keep="first")
beta_test_with_negative_df["TCR_name"] = range(1, len(beta_test_with_negative_df)+1)
beta_test_with_negative_df.reset_index(drop=True, inplace=True)
beta_test_with_negative_df


In [None]:
beta_train_with_negative_df.to_csv(output_path+"/"+train_output_name, sep="\t", index=False)
beta_validation_with_negative_df.to_csv(output_path+"/"+validation_output_name, sep="\t", index=False)
beta_test_with_negative_df.to_csv(output_path+"/"+test_output_name, sep="\t", index=False)