In [1]:
import os
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm
from sentence_transformers import SentenceTransformer, models, LoggingHandler, InputExample, losses, evaluation
from sentence_transformers.losses import SiameseDistanceMetric
import numpy as np
import random
seed = 42
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

VOC_NAMES = ["Alpha", "Beta", "Delta", "Gamma", "Omicron"]
LOSS_NAME = "ContrastiveLoss"
POOLING_MODE = "max"

In [2]:
def get_mutated_omicron(rbd=False):
    N = 11 # number of mutations
    if rbd:
        N = 4
    # change N random characters in each omicron sequence and save as a new dataset
    omicron_sequences = pd.read_csv("data/unique_Omicron_2k.csv")["sequence"].tolist()
    import copy
    omicron_sequences_mutated = copy.deepcopy(omicron_sequences)
    aa_list = ["A","B","C","D","E","F","G","H","I","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"]
    #print("number of AAs:", len(aa_list))
    random.seed(42)
    omicron_sequences_mutated = [list(seq) for seq in omicron_sequences_mutated]
    for seq in omicron_sequences_mutated:
        # select random indices
        if rbd:
            idxs = random.sample(range(317, 541), N)
        else:
            idxs = random.sample(range(len(seq)), N)
        #print(idxs)
        for i in range(N):
            idx = idxs[i]
            # change the character at idx to a random character. make sure it is not the same as the original character
            original_aa = seq[idx]
            while seq[idx] == original_aa:
                seq[idx] = random.choice(aa_list)
            # print(f"changed {original_aa} to {seq[idx]}")
    omicron_sequences_mutated = ["".join(seq) for seq in omicron_sequences_mutated]
    #pd.DataFrame({"sequence": omicron_sequences_mutated}).to_csv("data/unique_Omicron_2k_mutated.csv", index=False)
    return omicron_sequences_mutated


# Construct Contrastive Dataset

In [19]:
omicron_sequences = pd.read_csv("data/unique_Omicron_2k.csv")["sequence"].tolist()
eris_sequences = pd.read_csv("data/unique_Eris_2081.csv")["sequence"].tolist()
new_sequences = pd.read_csv("data/unique_Omicron_New_548.csv")["sequence"].tolist()
new_sequences = random.sample(new_sequences, 500)
mutant_sequences = get_mutated_omicron(rbd=False)
mutant_rbd_sequences = get_mutated_omicron(rbd=True)

NEG_SET = "eris_rbd" # mutant or eris or new

if NEG_SET == "eris":
        neg_sequences = random.sample(eris_sequences, 500)
elif NEG_SET == "mutant":
        neg_sequences = random.sample(mutant_sequences, 500)
elif NEG_SET == "mutant_rbd":
        neg_sequences = random.sample(mutant_rbd_sequences, 500)



# sample 1000 random sequences from the omicron dataset
omicron_samples = random.sample(omicron_sequences, 1000)


test_examples = []
for i, seq in enumerate(new_sequences):
        test_examples.append(InputExample(texts=[omicron_samples[i], seq], label = 1))

for i, seq in enumerate(neg_sequences):
        test_examples.append(InputExample(texts=[omicron_samples[i+500], seq], label = 0))

# shuffle the test examples
random.shuffle(test_examples)
print("len(test_examples): ", len(test_examples))

len(test_examples):  1000


In [20]:
import os

# Create output directory if needed
output_dir = f"./{LOSS_NAME}_output_{POOLING_MODE}_{NEG_SET}_omicron"
evaluator_dir = f"{output_dir}/eval"

if not os.path.exists(output_dir):
    os.makedirs(output_dir)
if not os.path.exists(evaluator_dir):
    os.makedirs(evaluator_dir)


In [24]:
print("Comparing omicron_new and", NEG_SET)
test_evaluator = evaluation.BinaryClassificationPredictor(sentences1=[test_example.texts[0] for test_example in test_examples],
                                                     sentences2=[test_example.texts[1] for test_example in test_examples],
                                                     threshold=None,
                                                     batch_size=10000,
                                                     show_progress_bar=False,
                                                     name="test")

# get list of labels from test_examples
trues = [test_example.label for test_example in test_examples]

# load best model
best_model = SentenceTransformer(f"./{LOSS_NAME}_output_{POOLING_MODE}_best_model")
best_model.to("cuda")

# print test duration

import time
start_time = time.time()
preds = best_model.evaluate(test_evaluator)
duration = time.time() - start_time
print(f"Test duration: {duration:.4f} seconds")

print(len(preds), "test examples")

# export predictions
preds = [pred for pred in preds]
trues = [true for true in trues]

df = pd.DataFrame({"pred": preds, "true": trues})
df.to_csv(os.path.join(output_dir, "eval", f"test_preds_{NEG_SET}_omicron.csv"), index=False)

# print accuracy
correct_count = sum([1 for pred, true in zip(preds, trues) if pred == true])
print(f"Accuracy: {correct_count / len(trues):.4f}")

print(f"{(duration * 1000 / len(test_examples)):.4f} ms per example")


Comparing omicron_new and eris_rbd
Mean Euclidean distance: 1.7661213874816895
Using threshold for Euclidean distance: 1.7661213874816895
Test duration: 0.8574 seconds
1000 test examples
Accuracy: 0.4380
0.8574 ms per example
