In [None]:
### Inclusions ###

import sys, os, pickle
from IPython.display import clear_output
from tqdm import tqdm
sys.path.append('/home/sj/ml/lib/python3.10/site-packages/') # *Change path as required*

from scipy.stats import percentileofscore
import csv

import numpy as np
import pandas as pd
import importlib
import random

%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
# Plots stuff
import matplotlib as mpl
from matplotlib import patches
from pandas.plotting import table

import sklearn
from sklearn import metrics
from sklearn.model_selection import train_test_split, KFold
from skbio import DNA, Protein

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import L1Loss
import copy
from scipy.optimize import fsolve
import math

A,L=20,9

# import keras
# from keras.models import Sequential
# from keras.layers import Dense, Dropout
# from keras import regularizers

def overlap_seqs(list1,list2):
    overlap=[]
    for i in range(len(list1)):
        if list1[i] in list2:
            overlap.append(list1[i])
    return overlap

def flatten_list(listoflist):
    listoflist_fl = [];
    for l in range(len(listoflist)):
        for u in range(len(listoflist[l])):
            listoflist_fl.append(listoflist[l][u])
    return listoflist_fl

curr_int = np.int16
def convert_number(seqs): # convert to numbers already aligned seqs
    aa = ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V',  'W', 'Y','-']
    aadict = {aa[k]: k for k in range(len(aa))}

    msa_num = np.array(list(map(lambda x: [aadict[y] for y in x], seqs[0:])), dtype=curr_int, order="c") ### Here change ####

    return msa_num

def uniqueIndexes(l):
    seen = set()
    res = []
    for i, n in enumerate(l):
        if n not in seen:
            res.append(i)
            seen.add(n)
    return res

def convert_letter(seqs_n): # convert to numbers already aligned seqs
    aa = ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V',  'W', 'Y','-']
    aadictinv = {k: aa[k] for k in range(len(aa))}
    seqs=[]
    if type(seqs_n[0]) == curr_int:
        seqs.append(''.join([aadictinv[e] for e in seqs_n]))
    else:
        for t in range(len(seqs_n)):
            seqs.append(''.join([aadictinv[e] for e in seqs_n[t]]))
    return seqs

# add some functions for independent site models
def loglikelihood_indip_model(fields, logZ, seqs):
    return fields[np.arange(len(fields)), seqs].sum(axis=1) - logZ

def add_pseudocount(fields, n):
    return np.array([(f + 1/n) / np.sum((f + 1/n)) for f in fields])

def build_model(dims, dropout_prob=0.5):
    assert dims[0] == A * L
    assert dims[-1] == 1

    layers = [torch.nn.Flatten(), torch.nn.Linear(dims[0], dims[1])]
    for l in range(2, len(dims)):
        layers.append(torch.nn.LeakyReLU())
        layers.append(torch.nn.Linear(dims[l - 1], dims[l]))
        if l < len(dims) - 1:  # Add dropout except for the last layer
            layers.append(torch.nn.Dropout(p=dropout_prob))
    return torch.nn.Sequential(*layers)

def getAllModels(A=20, L=9, dropout_prob=0.5):
    return [
        build_model([A * L, 1], dropout_prob), # Perceptron (Parsimonious)
        build_model([A * L, 2, 1], dropout_prob),
        build_model([A * L, 4, 1], dropout_prob),
        build_model([A * L, 8, 1], dropout_prob),
        build_model([A * L, 16, 1], dropout_prob),
        build_model([A * L, 32, 1], dropout_prob),
        build_model([A * L, 64, 1], dropout_prob),
        build_model([A * L, 128, 1], dropout_prob),
        build_model([A * L, 16, 8, 1], dropout_prob),
        build_model([A * L, 32, 8, 1], dropout_prob),
        build_model([A * L, 64, 8, 1], dropout_prob),
        build_model([A * L, 128, 8, 1], dropout_prob),
        build_model([A * L, 32, 16, 1], dropout_prob),
        build_model([A * L, 64, 16, 1], dropout_prob),
        build_model([A * L, 128, 16, 1], dropout_prob),
        build_model([A * L, 32, 16, 8, 1], dropout_prob),
        build_model([A * L, 64, 16, 8, 1], dropout_prob),
        build_model([A * L, 64, 32, 16, 1], dropout_prob),
        build_model([A * L, 128, 32, 16, 1], dropout_prob),
        build_model([A * L, 128, 64, 16, 1], dropout_prob),
        build_model([A * L, 128, 64, 32, 1], dropout_prob),
        build_model([A * L, 128, 64, 32, 8, 1], dropout_prob),
        build_model([A * L, 128, 64, 32, 16, 1], dropout_prob)]

def getModelsBest(A=20, L=9, dropout_prob=0.5):
    # PRIME-Trained uses best = 6
    return [
        build_model([A * L, 1], dropout_prob), # Perceptron (Parsimonious)
        build_model([A * L, 64, 1], dropout_prob)]

def getNumInputFeatures(X_train, X_test):
    allLetters = ''
    for i in X_train:
        allLetters= allLetters + i
    for i in X_test:
        allLetters= allLetters + i
    A = len(np.unique([*allLetters]))
    return A

In [None]:
def getPeps(HLA, isSarsCov2 = False):

    ## Block that filters the data -  https://www.iedb.org/database_export_v3.php ##
    # import IEDB -  for T cell assays - and read relevant columns #
    filename_lab = 'tcell_full_v3.csv'
    iedb = pd.read_csv(filename_lab, sep=',', low_memory=False)
    head = iedb.columns

    index_species = 43 # column indicating in what species the antigens are tested
    index_antigen = 11 # column with antigens

    condition0 = iedb[head[146]] == 'Linear peptide'  # to be sure a peptide immunization study was used
    condition1 = iedb[head[145]] == 'Epitope' # to have the peptide as first immunogen, not the full protein or virus
    condition2 = iedb[head[144]] == 'I' # to select CD8 epitopes

    list_hosts = list(np.unique(iedb[head[index_species]].astype(str)))
    list_mus = [st for st in list_hosts if 'Mus' in st]
    #condition3 = iedb[head[index_species]].isin(list_mus)
    list_homo = [st for st in list_hosts if 'Homo' in st]
    #condition3 = iedb[head[index_species]].isin(list_homo + list_mus)
    condition3 = iedb[head[index_species]].isin(list_homo) # Human only

    # Exclude SARS-CoV-2 data for train data
    list_antigenNames = list(np.unique(iedb[head[23]].astype(str)))
    list_sarscov2 = [st for st in list_antigenNames if 'SARS-CoV2' in st or 'Severe acute respiratory syndrome coronavirus 2' in st]
    if isSarsCov2 == False:
        condition4 = ~iedb[head[23]].isin(list_sarscov2)
    else:
        condition4 = iedb[head[23]].isin(list_sarscov2)

    iedbT = iedb[condition0 & condition1 & condition2 & condition3 & condition4]

    listT = ['T cell CD4+','T cell CD4-'] ## exclude CD4 responders in case there was someone left
    conditionT = ~iedbT[head[95]].isin(listT) # note: conditionT no strictly necessary #

    conditionP0 = iedbT[head[94]] != 'Restimulation in vitro' # Here, if the evidence comes from in vitro, exclude a restimulation step
    conditionP1 = iedbT[head[122]].isin(['Positive', 'Positive-High'])
    conditionP10 = iedbT[head[122]].isin(['Positive', 'Positive-High','Positive-Intermediate' ,'Positive-Low'])
    conditionN1 = iedbT[head[122]] == 'Negative'

    iedbTN = iedbT[conditionP0 & conditionN1 & conditionT]
    iedbTP = iedbT[conditionP0 & conditionP1 & conditionT]
    iedbTP0 = iedbT[conditionP0 & conditionP10 & conditionT]

#     list_hla = list(np.unique(iedbTN[head[141]].values))
#     list_hlaFn=[]
#     for hh in list_hla:
#         print(hh)
#         ii = iedbTN[iedbTN[head[141]] == hh]
#         if len(np.unique((ii[head[index_antigen]].values))) > 50:
#             list_hlaFn.append(hh)

#     list_hla = list(np.unique(iedbTP[head[141]].values))
#     list_hlaFp=[]
#     for hh in list_hla:
#         print(hh)
#         ii = iedbTP[iedbTP[head[141]] == hh]
#         if len(np.unique((ii[head[index_antigen]].values))) > 50:
#             list_hlaFp.append(hh)

#     list_hlaF0 = overlap_seqs(list_hlaFn,list_hlaFp)
#     list_hlaF = [f for f in list_hlaF0 if '*' in f]

#     iedbTNa = iedbTN[iedbTN[head[141]].isin(list_hlaF)]
#     iedbTPa = iedbTP[iedbTP[head[141]].isin(list_hlaF)]

    hh = HLA
    if hh[6] != 0:
        hhM = hh[:5] + hh[7]
    else:
        hhM = hh[:5] + hh[6:7]

    iedbTNa = iedbTN[iedbTN[head[141]]==hh]
    iedbTPa = iedbTP[iedbTP[head[141]]==hh]
    iedbTP0a = iedbTP0[iedbTP0[head[141]].isin([hh,hhM])]
    iedbTN0a = iedbTN[iedbTN[head[141]].isin([hh,hhM])]
    pep_imm = list(np.unique((iedbTPa[head[index_antigen]].values)))
    pep_imm0 = list(np.unique((iedbTP0a[head[index_antigen]].values)))
    pep_immN = list(np.unique((iedbTNa[head[index_antigen]].values)))
    pep_immN0 = list(np.unique((iedbTN0a[head[index_antigen]].values)))

    pep_imm9 = [pep_imm[p] for p in range(len(pep_imm)) if len(pep_imm[p]) in range_len and 'X' not in pep_imm[p] and 'l' not in pep_imm[p] and pep_imm[p] not in pep_immN0]
    pep_imm9N = [pep_immN[p] for p in range(len(pep_immN)) if len(pep_immN[p]) in range_len and pep_immN[p] not in pep_imm0]
    pep_imm9 = [pep for pep in pep_imm9 if len(pep) == 9]
    pep_imm9N = [pep for pep in pep_imm9N if len(pep) == 9]
    return list(pep_imm9), list(pep_imm9N)

In [None]:
# Build Training Database

range_len = [8, 9, 10, 11]
list_hlas = ['HLA-A*01:01', 'HLA-A*02:01', 'HLA-A*03:01', 'HLA-A*11:01', 'HLA-A*24:02', 'HLA-B*07:02', 'HLA-B*08:01', 'HLA-B*15:01', 'HLA-B*35:01', 'HLA-B*40:01'] # 10 most frequent alleles for this problem
#list_hlas = ['HLA-B*08:01', 'HLA-B*15:01', 'HLA-B*35:01']

pep_dict_train = {}  # Dictionary to store peptides for each HLA allele

pepsP_train, pepsN_train = [], []

for HLA in list_hlas:
    hlaPeps = getPeps(HLA, False)
    hlaPepsP, hlaPepsN = hlaPeps[0], hlaPeps[1]

    print(HLA, ':')
    print('Num. of positives:', len(hlaPepsP))
    print('Num. of negatives:', len(hlaPepsN))

    pep_dict_train[HLA] = [hlaPepsP, hlaPepsN]  # Store positives for HLA allele in the dictionary
    pepsP_train.append(hlaPepsP)  # Store positives in the list
    pepsN_train.append(hlaPepsN)  # Store negatives in the list

    # Check no overlap left between positives and negatives #
    if len(overlap_seqs(hlaPepsP, hlaPepsN)) > 0:
        print('Overlap between positives & negatives exists.')
    print()

In [None]:
# Build Test Database

pep_dict_test = {}  # Dictionary to store peptides for each HLA allele in the test set

pepsP_test, pepsN_test = [], []

for HLA in list_hlas:
    hlaPeps = getPeps(HLA, True)
    hlaPepsP, hlaPepsN = hlaPeps[0], hlaPeps[1]

    print(HLA, ':')
    print('Num. of positives:', len(hlaPepsP))
    print('Num. of negatives:', len(hlaPepsN))

    pep_dict_test[HLA] = [hlaPepsP, hlaPepsN]  # Store positives for HLA allele in the dictionary
    pepsP_test.append(hlaPepsP)  # Store positives in the list
    pepsN_test.append(hlaPepsN)  # Store negatives in the list

    # Check no overlap left between positives and negatives #
    if len(overlap_seqs(hlaPepsP, hlaPepsN)) > 0:
        print('Overlap between positives & negatives exists.')
    print()

In [None]:
count = 0

for i in list_hlas:
    count += len(pep_dict_train[i][0])
    count += len(pep_dict_test[i][0])
count

3236

In [None]:
count = 0
for i in list_hlas:
    count += len(pep_dict_train[i][1])
print(count)

7094

In [None]:
count = 0
for i in list_hlas:
    count += len(pep_dict_train[i][0])
print(count)

count = 0
for i in list_hlas:
    count += len(pep_dict_train[i][1])
print(count)

count = 0
for i in list_hlas:
    count += len(pep_dict_test[i][0])
print(count)

count = 0
for i in list_hlas:
    count += len(pep_dict_test[i][1])
print(count)

2480
6914
756
180


In [None]:
# Save/Load IEDB Data

range_len = [8, 9, 10, 11]
list_hlas = ['HLA-A*01:01', 'HLA-A*02:01', 'HLA-A*03:01', 'HLA-A*11:01', 'HLA-A*24:02', 'HLA-B*07:02', 'HLA-B*08:01', 'HLA-B*15:01', 'HLA-B*35:01', 'HLA-B*40:01']

# # Save the variables, dictionaries, and arrays to a file
# with open('Data/iedb_data.pkl', 'wb') as file:  # *Change path as required*
#     data = {
#         'pep_dict_train': pep_dict_train,
#         'pepsP_train': pepsP_train,
#         'pepsN_train': pepsN_train,
#         'pep_dict_test': pep_dict_test,
#         'pepsP_test': pepsP_test,
#         'pepsN_test': pepsN_test
#     }
#     pickle.dump(data, file)

# Load the variables, dictionaries, and arrays from the file
with open('Data/iedb_data.pkl', 'rb') as file:  # *Change path as required*
    data = pickle.load(file)
    pep_dict_train = data['pep_dict_train']
    pepsP_train = data['pepsP_train']
    pepsN_train = data['pepsN_train']
    pep_dict_test = data['pep_dict_test']
    pepsP_test = data['pepsP_test']
    pepsN_test = data['pepsN_test']


In [None]:
def getSplitData(pepsP_train, pepsN_train, pepsP_test, pepsN_test, split=0.25):

    hlaP_train, hlaN_train, hlaP_test, hlaN_test = [], [], [], []

    for alleles in range(len(list_hlas)):
        hlaP_train.append([list_hlas[alleles]] * len(pepsP_train[alleles]))
        hlaN_train.append([list_hlas[alleles]] * len(pepsN_train[alleles]))
        hlaP_test.append([list_hlas[alleles]] * len(pepsP_test[alleles]))
        hlaN_test.append([list_hlas[alleles]] * len(pepsN_test[alleles]))

    # Concatenate positive and negative peptides for each allele
    pepsP_train_concat = np.concatenate(pepsP_train)
    pepsN_train_concat = np.concatenate(pepsN_train)
    pepsP_test_concat = np.concatenate(pepsP_test)
    pepsN_test_concat = np.concatenate(pepsN_test)

    # Concatenate HLA labels for each peptide
    hlaP_train_concat = np.concatenate(hlaP_train)
    hlaN_train_concat = np.concatenate(hlaN_train)
    hlaP_test_concat = np.concatenate(hlaP_test)
    hlaN_test_concat = np.concatenate(hlaN_test)

    Ptest_cutoff = int(split*len(pepsP_test_concat))
    Ntest_cutoff = int(split*len(pepsN_test_concat))

    # Create X_train by combining non-SARS-CoV-2 data with 25% of SARS-CoV-2 data from pepsP_test and pepsN_test
    X_train = np.concatenate([pepsP_train_concat, pepsN_train_concat, pepsP_test_concat[:Ptest_cutoff], pepsN_test_concat[:Ntest_cutoff]])
    X_test = np.concatenate([pepsP_test_concat[Ptest_cutoff:], pepsN_test_concat[Ntest_cutoff:]])

    # Create y_train by assigning 1 to immunogenic peptides and 0 to non-immunogenic peptides
    y_train = np.concatenate([np.ones(len(pepsP_train_concat)), np.zeros(len(pepsN_train_concat)), np.ones(Ptest_cutoff), np.zeros(Ntest_cutoff)])
    y_test = np.concatenate([np.ones(len(pepsP_test_concat[Ptest_cutoff:])), np.zeros(len(pepsN_test_concat[Ntest_cutoff:]))])

    # Create hla_train, hla_test
    hla_train = np.concatenate([hlaP_train_concat, hlaN_train_concat, hlaP_test_concat[:Ptest_cutoff], hlaN_test_concat[:Ntest_cutoff]])
    hla_test = np.concatenate([hlaP_test_concat[Ptest_cutoff:], hlaN_test_concat[Ntest_cutoff:]])

    # Shuffle the data
    random_indices_train = np.random.permutation(len(X_train))
    random_indices_test = np.random.permutation(len(X_test))

    X_train = X_train[random_indices_train]
    hla_train = hla_train[random_indices_train]
    y_train = y_train[random_indices_train]

    X_test = X_test[random_indices_test]
    hla_test = hla_test[random_indices_test]
    y_test = y_test[random_indices_test]

    return X_train, y_train, hla_train, X_test, y_test, hla_test

def getSplitDataHLA(HLA, split=0.25):
    # (For single allele classifier)
    p_train, n_train, p_test, n_test = pep_dict_train[HLA][0], pep_dict_train[HLA][1], pep_dict_test[HLA][0], pep_dict_test[HLA][1]

    Ptest_cutoff = int(split*len(p_test))
    Ntest_cutoff = int(split*len(n_test))

    # Create X_train by combining non-sars-cov-2 data with 25% of sars-cov-2 data from pepsP_test and pepsN_test
    X_train = np.concatenate([p_train, n_train, p_test[:Ptest_cutoff], n_test[:Ntest_cutoff]])
    X_test = np.concatenate([p_test[Ptest_cutoff:], n_test[Ntest_cutoff:]])

    # Create y_train by assigning 1 to immunogenic peptides and 0 to non-immunogenic peptides
    y_train = np.concatenate([np.ones(len(p_train)), np.zeros(len(n_train)), np.ones(Ptest_cutoff), np.zeros(Ntest_cutoff)])
    y_test = np.concatenate([np.ones(len(p_test[Ptest_cutoff:])), np.zeros(len(n_test[Ntest_cutoff:]))])

    # Shuffle the data
    random_indices_train = np.random.permutation(len(X_train))
    random_indices_test = np.random.permutation(len(X_test))
    X_train = X_train[random_indices_train]
    y_train = y_train[random_indices_train]
    X_test = X_test[random_indices_test]
    y_test = y_test[random_indices_test]

    return X_train, y_train, X_test, y_test

In [None]:
def getPrimeTrainData():
    # Load PRIME2.0 Train (prtr) Data
    with open('Data/prime_peps_seq.txt', 'r') as file:
        prtr_pep = [line.strip() for line in file]
    with open('Data/prime_peps_imm.txt', 'r') as file:
        prtr_imm = [int(line.strip()) for line in file]
    with open('Data/prime_peps_HLA.txt', 'r') as file:
        prtr_hla = ['HLA-' + line.strip()[0] + '*' + line.strip()[1:3] + ':' + line.strip()[3:5] for line in file]

    # Filter elements to remove peptides of length != 9
    prtr_pep, prtr_imm, prtr_hla = zip(*[(p, i, h) for p, i, h in zip(prtr_pep, prtr_imm, prtr_hla) if len(p) == 9])
    X_train, y_train, hla_train = np.array(prtr_pep), np.array(prtr_imm), np.array(prtr_hla)
    _, _, _, X_test, y_test, hla_test = getSplitData(pepsP_train, pepsN_train, pepsP_test, pepsN_test, split=0.05)

    return X_train, y_train, hla_train, X_test, y_test, hla_test

import numpy as np

def getPrimeTrainDataSplit(HLA):
    # Load PRIME2.0 Train (prtr) Data
    with open('Data/prime_peps_seq.txt', 'r') as file:
        prtr_pep = [line.strip() for line in file]
    with open('Data/prime_peps_imm.txt', 'r') as file:
        prtr_imm = [int(line.strip()) for line in file]
    with open('Data/prime_peps_HLA.txt', 'r') as file:
        prtr_hla = ['HLA-' + line.strip(s)[0] + '*' + line.strip()[1:3] + ':' + line.strip()[3:5] for line in file]

    # Filter elements to remove peptides of length != 9
    prtr_pep, prtr_imm, prtr_hla = zip(*[(p, i, h) for p, i, h in zip(prtr_pep, prtr_imm, prtr_hla) if len(p) == 9])
    # Convert to NumPy arrays
    X_train, y_train, hla_train = np.array(prtr_pep), np.array(prtr_imm), np.array(prtr_hla)
    # Filter X_train and y_train based on the HLA indices
    indices = np.where(hla_train == HLA)[0]
    X_train, y_train = X_train[indices], y_train[indices]
    _, _, X_test, y_test = getSplitDataHLA(HLA, split=0.05)
    return X_train, y_train, X_test, y_test


In [None]:
def getAgnosticData(trainSplit=0.8):
    """
    Combine train and test data from peptide dictionaries into a virus-agnostic dataset.
    Split the combined data into train and test sets based on the specified train split ratio.
    Return the train and test datasets along with corresponding HLA lists.

    Args:
        trainSplit (float): The ratio of data to be allocated for training (default: 0.8)

    Returns:
        tuple: X_train (list), y_train (list), hla_train (list), X_test (list), y_test (list), hla_test (list)
    """
    X_train, y_train, hla_train, X_test, y_test, hla_test = [], [], [], [], [], []

    for hla in list_hlas:
        # Combine all X and y values for the current HLA
        all_X = pep_dict_train[hla][0] + pep_dict_train[hla][1] + pep_dict_test[hla][0] + pep_dict_test[hla][1]
        all_y = len(pep_dict_train[hla][0]) * [1] + len(pep_dict_train[hla][1]) * [0] + len(pep_dict_test[hla][0]) * [1] + len(pep_dict_test[hla][1]) * [0]

        # Shuffle the combined X and y values
        combined = list(zip(all_X, all_y))
        random.shuffle(combined)
        all_X, all_y = zip(*combined)

        # Calculate the split index based on the train split ratio
        split_index = int(len(all_X) * trainSplit)

        # Append the train and test data for the current HLA to the respective lists
        X_train.extend(all_X[:split_index])
        y_train.extend(all_y[:split_index])
        hla_train.extend(len(all_X[:split_index]) * [hla])
        X_test.extend(all_X[split_index:])
        y_test.extend(all_y[split_index:])
        hla_test.extend(len(all_X[split_index:]) * [hla])

    # Shuffle the combined train and test data
    combined = list(zip(X_train, y_train, hla_train))
    random.shuffle(combined)
    X_train, y_train, hla_train = zip(*combined)

    combined = list(zip(X_test, y_test, hla_test))
    random.shuffle(combined)
    X_test, y_test, hla_test = zip(*combined)

    return np.array(X_train), np.array(y_train), np.array(hla_train), np.array(X_test), np.array(y_test), np.array(hla_test)

def getAgnosticDataHLA(hla, trainSplit=0.8):
    """
    Combine train and test data from the peptide dictionaries for a specific HLA allele into a virus-agnostic dataset.
    Split the combined data into train and test sets based on the specified train split ratio.
    Return the train and test datasets.

    Args:
        hla (str): The HLA allele for which to retrieve the data.
        trainSplit (float): The ratio of data to be allocated for training (default: 0.8)

    Returns:
        tuple: X_train (list), y_train (list), X_test (list), y_test (list)
    """
    # Combine all X and y values for the HLA
    all_X = pep_dict_train[hla][0] + pep_dict_train[hla][1] + pep_dict_test[hla][0] + pep_dict_test[hla][1]
    all_y = len(pep_dict_train[hla][0]) * [1] + len(pep_dict_train[hla][1]) * [0] + len(pep_dict_test[hla][0]) * [1] + len(pep_dict_test[hla][1]) * [0]

    # Shuffle the combined X and y values
    combined = list(zip(all_X, all_y))
    random.shuffle(combined)
    all_X, all_y = zip(*combined)

    # Calculate the split index based on the train split ratio
    split_index = int(len(all_X) * trainSplit)

    # Split the data into train and test sets
    X_train = np.array(all_X[:split_index])
    y_train = np.array(all_y[:split_index])
    X_test = np.array(all_X[split_index:])
    y_test = np.array(all_y[split_index:])

    return X_train, y_train, X_test, y_test

In [None]:
# Classifier Functions & Definitions

def classifier_auc(model_idx, X_train, y_train, X_test, y_test, epochs=50, weight_decay=0.6):
    # Unweighted
    # Define the hyperparameters and early stopping variables
    input_size = A  # Adjust according to the number of features in your input data
    learning_rate = 0.0005
    batch_size = 16
    random.seed = 42
    num_epochs = epochs
    patience = int(num_epochs**0.5)
    early_stopping_counter = 0
    best_loss = float('inf')

    # Create the model
    model = copy.deepcopy(classifiers[model_idx])

    # Define the loss function and optimizer
    optimizer = torch.optim.AdamW(model.parameters(), weight_decay=weight_decay)
    loss_function = torch.nn.BCEWithLogitsLoss()

    # Convert the data to PyTorch tensors
    X_train_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_train)), num_classes=A).type(torch.FloatTensor)
    y_train_tensor = torch.Tensor(y_train)
    X_test_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_test)), num_classes=A).type(torch.FloatTensor)
    y_test_tensor = torch.Tensor(y_test)
    train_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size)

    # Reset model parameters
    for layer in model.children():
        if hasattr(layer, 'reset_parameters'):
            layer.reset_parameters()

    # Train the model
    for epoch in range(num_epochs):
        for batch_idx, (batch, y) in enumerate(train_loader):
            optimizer.zero_grad()
            y_pred = model(batch)
            loss = loss_function(y_pred, y.unsqueeze(1))
            loss.backward()
            optimizer.step()
        # Calculate validation loss
        with torch.no_grad():
            y_pred_val = model(X_test_tensor)
            loss_val = loss_function(y_pred_val, y_test_tensor.unsqueeze(1))

        # Check for early stopping
        if loss_val < best_loss:
            best_loss = loss_val
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                break

    # Calculate ROC curve and AUC
    fpr_train, tpr_train, thresholds_train = sklearn.metrics.roc_curve(
        np.concatenate((np.zeros(len(X_train_tensor[y_train_tensor == 0])) + 0,
                        np.zeros(len(X_train_tensor[y_train_tensor == 1])) + 1), axis=0),
        np.concatenate((model(X_train_tensor[y_train_tensor == 0]).detach().numpy(),
                        model(X_train_tensor[y_train_tensor == 1]).detach().numpy()), axis=0)
    )
    fpr_tests, tpr_tests, thresholds_tests = sklearn.metrics.roc_curve(
        np.concatenate((np.zeros(len(X_test_tensor[y_test_tensor == 0])) + 0,
                        np.zeros(len(X_test_tensor[y_test_tensor == 1])) + 1), axis=0),
        np.concatenate((model(X_test_tensor[y_test_tensor == 0]).detach().numpy(),
                        model(X_test_tensor[y_test_tensor == 1]).detach().numpy()), axis=0)
    )
    auc_train = sklearn.metrics.auc(fpr_train, tpr_train)
    auc_tests = sklearn.metrics.auc(fpr_tests, tpr_tests)
    return {'model_idx': model_idx, 'weight_decay': weight_decay, 'auc_train': auc_train, 'auc_tests': auc_tests}

def do_auc(model_idx, weight_decay=0.6, epochs=50, num_folds=5):
    fold_results = []
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
    for train_index, val_index in kf.split(X_train):
        X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
        y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
        auc = classifier_auc(model_idx, X_train_fold, y_train_fold, X_val_fold, y_val_fold, weight_decay=weight_decay, epochs=epochs)
        fold_results.append(auc)
    avg_auc_train = np.mean([fold['auc_train'] for fold in fold_results])
    avg_auc_tests = np.mean([fold['auc_tests'] for fold in fold_results])
    std_auc_train = np.std([fold['auc_train'] for fold in fold_results])
    std_auc_tests = np.std([fold['auc_tests'] for fold in fold_results])
    return {'model_idx': model_idx, 'weight_decay': weight_decay, 'auc_train': avg_auc_train, 'auc_tests': avg_auc_tests, 'std_train': std_auc_train, 'std_tests': std_auc_tests}

In [2]:
# Compare all PRIME-Trained Classifier Architectures

all_arcs_results = []
X_train, y_train, hla_train, X_test, y_test, hla_test = getPrimeTrainData()
classifiers = getAllModels()
for model_idx in range(len(classifiers)):
    results = []
    num_iterations = 10
    for i in range(num_iterations):
        result = classifier_auc(model_idx, X_train, y_train, X_test, y_test, epochs=100, weight_decay=0.6)
        results.append(result)
        clear_output(wait=True)
        print('Completed architecture:', model_idx, '-', i, ' - (', round(result['auc_tests'],2),')')

    # Extract the AUC scores from the results list
    auc_train_values = [result['auc_train'] for result in results]
    auc_test_values = [result['auc_tests'] for result in results]

    # Calculate the mean and standard deviation of AUC scores
    auc_train_mean = np.mean(auc_train_values)
    auc_train_std = np.std(auc_train_values)
    auc_test_mean = np.mean(auc_test_values)
    auc_test_std = np.std(auc_test_values)

    all_arcs_results.append([auc_train_mean, auc_train_std, auc_test_mean, auc_test_std])

auc_tr, auc_te, std_tr, std_te = np.array(all_arcs_results)[:,0], np.array(all_arcs_results)[:,2], np.array(all_arcs_results)[:,1], np.array(all_arcs_results)[:,3]
plt.figure(figsize=(8,5))
plt.errorbar(range(len(classifiers)), auc_tr, yerr=std_tr, label='Train', color='red', marker='o')
plt.errorbar(range(len(classifiers)), auc_te, yerr=std_te, label='Test', color='blue', marker='o')
plt.title('PRIME-Trained Classifiers Performance')
plt.xlabel('Architectures')
plt.ylabel('AUC')
plt.tight_layout()
#plt.xticks(rotation=45, ha='right', rotation_mode='anchor')
plt.vlines(x=6, ymin=min(auc_te), ymax=max(auc_tr), label='Selected Deep Model', linestyle = '--', color = 'green')
plt.vlines(x=0, ymin=min(auc_te), ymax=max(auc_tr), label='Parsimonious Model', linestyle = '--', color = 'purple')
plt.legend(loc = 'center right', fontsize = 9)
plt.savefig('Images/PRIME-Trained Classifiers Performance', dpi=300)
plt.show();

In [3]:
# Computing Results for Pan-Allelic Classifier

pars_aucs, deep_aucs = [], []
classifiers = getAllModels()

model_pars = copy.deepcopy(classifiers[0])
model_pars.load_state_dict(torch.load('Models/prtr-pars.pth'))

# Convert the data to PyTorch tensors
X_train, y_train, hla_train, X_test, y_test, hla_test = getPrimeTrainData()
X_train_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_train)), num_classes=A).type(torch.FloatTensor)
y_train_tensor = torch.Tensor(y_train)
X_test_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_test)), num_classes=A).type(torch.FloatTensor)
y_test_tensor = torch.Tensor(y_test)

# Calculate ROC curve and AUC
fpr_train, tpr_train, thresholds_train = sklearn.metrics.roc_curve(
    np.concatenate((np.zeros(len(X_train_tensor[y_train_tensor == 0])) + 0,
                    np.zeros(len(X_train_tensor[y_train_tensor == 1])) + 1), axis=0),
    np.concatenate((model_pars(X_train_tensor[y_train_tensor == 0]).detach().numpy(),
                    model_pars(X_train_tensor[y_train_tensor == 1]).detach().numpy()), axis=0)
)
if len(X_test) > 0:
    fpr_tests, tpr_tests, thresholds_tests = sklearn.metrics.roc_curve(
        np.concatenate((np.zeros(len(X_test_tensor[y_test_tensor == 0])) + 0,
                        np.zeros(len(X_test_tensor[y_test_tensor == 1])) + 1), axis=0),
        np.concatenate((model_pars(X_test_tensor[y_test_tensor == 0]).detach().numpy(),
                        model_pars(X_test_tensor[y_test_tensor == 1]).detach().numpy()), axis=0)
    )
    auc_train = sklearn.metrics.auc(fpr_train, tpr_train)
    auc_tests = sklearn.metrics.auc(fpr_tests, tpr_tests)

print(round(auc_train, 3), round(auc_tests, 3))

In [4]:
# PRIME-Trained Deep vs Parsimonious Code on Test Data for Pan-Allelic Classifier

from sklearn.exceptions import UndefinedMetricWarning
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

pars_aucs, deep_aucs, VA_pars_aucs, VA_deep_aucs = [], [], [], []
hla_count0, hla_count1 = [0] * len(list_hlas), [0] * len(list_hlas)
repeats = 100
peats = 10
progress_bar = tqdm(total= 2 * repeats * 10, desc='Progress')
for hla in list_hlas:
    classifiers = getAllModels()
    model_pars = copy.deepcopy(classifiers[0])
    model_pars.load_state_dict(torch.load('Models/prtr-pars.pth'))
    tests_all = []
    for i in range(repeats):
        # Convert the data to PyTorch tensors
        for i in range(peats):
            X_train, y_train, X_test, y_test= getPrimeTrainDataSplit(hla)
            if np.sum(y_test == 0) > 0 and np.sum(y_test == 1) > 0:
                break
#             if i == (peats-10):
#                 print(hla)
        hla_count0[list_hlas.index(hla)] += len(y_test[y_test==0])
        hla_count1[list_hlas.index(hla)] += len(y_test[y_test==1])
        X_test_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_test)), num_classes=A).type(torch.FloatTensor)
        y_test_tensor = torch.Tensor(y_test)
        # Calculate ROC curve and AUC
        if len(X_test) > 0:
            fpr_tests, tpr_tests, thresholds_tests = sklearn.metrics.roc_curve(
                np.concatenate((np.zeros(len(X_test_tensor[y_test_tensor == 0])) + 0,
                                np.zeros(len(X_test_tensor[y_test_tensor == 1])) + 1), axis=0),
                np.concatenate((model_pars(X_test_tensor[y_test_tensor == 0]).detach().numpy(),
                                model_pars(X_test_tensor[y_test_tensor == 1]).detach().numpy()), axis=0)
            )
            auc_tests = sklearn.metrics.auc(fpr_tests, tpr_tests)
        else:
            auc_tests = 0
        if(np.isnan(auc_tests)):
            auc_tests = 0
        tests_all.append(auc_tests)
        progress_bar.update(1)
    pars_aucs.append([np.average(auc_tests), np.std(auc_tests)])

for hla in list_hlas:
    classifiers = getAllModels()
    model_deep = copy.deepcopy(classifiers[6])
    model_deep.load_state_dict(torch.load('Models/prtr-deep.pth'))
    tests_all = []
    for i in range(repeats):
        # Convert the data to PyTorch tensors
        for i in range(peats):
            X_train, y_train, X_test, y_test = getPrimeTrainDataSplit(hla)
            if np.sum(y_test == 0) > 0 and np.sum(y_test == 1) > 0:
                break
#             if i == (peats-10):
#                 print(hla)
        hla_count0[list_hlas.index(hla)] += len(y_test[y_test==0])
        hla_count1[list_hlas.index(hla)] += len(y_test[y_test==1])
        X_test_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(X_test)), num_classes=A).type(torch.FloatTensor)
        y_test_tensor = torch.Tensor(y_test)
        # Calculate ROC curve and AUC
        if len(X_test) > 0:
            fpr_tests, tpr_tests, thresholds_tests = sklearn.metrics.roc_curve(
                np.concatenate((np.zeros(len(X_test_tensor[y_test_tensor == 0])) + 0,
                                np.zeros(len(X_test_tensor[y_test_tensor == 1])) + 1), axis=0),
                np.concatenate((model_deep(X_test_tensor[y_test_tensor == 0]).detach().numpy(),
                                model_deep(X_test_tensor[y_test_tensor == 1]).detach().numpy()), axis=0)
            )
            auc_tests = sklearn.metrics.auc(fpr_tests, tpr_tests)
        else:
            auc_tests = 0
        if(np.isnan(auc_tests)):
            auc_tests = 0
        tests_all.append(auc_tests)
        progress_bar.update(1)
    deep_aucs.append([np.average(auc_tests), np.std(auc_tests)])

In [5]:
# PRIME-Trained Deep vs Parsimonious Code on Test Data for Pan-Allelic Classifier

# Calculate the positions for the bars in each group
bar_width = 0.2
x_positions = np.arange(len(list_hlas))
fig, ax1 = plt.subplots(figsize=(10, 6))

# Create a twin axes on the right side for the counts
ax2 = ax1.twinx()

# Plot the bar charts on the right y-axis
ax2.bar(x_positions - bar_width/2, hla_count0, width=bar_width, alpha=0.5, color='lime', label='Count (Neg. in Test)', zorder=2)
ax2.bar(x_positions + bar_width/2, hla_count1, width=bar_width, alpha=0.5, color='green', label='Count (Pos. in Test)', zorder=2)

# Set the range for the right y-axis (counts)
ax2.set_ylim(0, max(max(hla_count0), max(hla_count1)))

# Label the right y-axis
ax2.set_ylabel('HLA Test Data Count')

ax2.legend(loc='upper right', fontsize=9)

# Plot the line charts for the AUC values on the left y-axis
ax1.errorbar(x_positions, np.array(pars_aucs)[:, 0], yerr=np.array(pars_aucs)[:,1], label='Parsimonious Test', color='orange', marker='o', capsize=5, zorder=3)
ax1.errorbar(x_positions, np.array(deep_aucs)[:, 0], yerr=np.array(deep_aucs)[:,1], label='Deep Test', color='cyan', marker='o', capsize=5, zorder=3)

ax1.set_xticks(x_positions)
ax1.set_xticklabels(list_hlas, rotation=45, ha='right', rotation_mode='anchor')
ax1.set_xlabel('HLAs')
ax1.set_ylabel('PRIME-Trained Classifier AUC')
ax1.set_ylim(0, 1)
ax1.legend(loc='upper left', fontsize=9)
ax1.set_title('Pan-Allelic Classifier Performance by Allele (PRIME-Trained)')

plt.tight_layout(pad=2)
plt.savefig('Images/Pan-Allelic Classifier Performance by Allele (PRIME-Trained)', dpi=300)
plt.show();

In [6]:
# Running Single-Allele Classifier

# Grid search was ran to find best model and split
repeats = 10
auc_all = []
hla_aucs_parsimonious, hla_aucs_deep, hla_stds_parsimonious, hla_stds_deep = [], [], [], []

classifiers = getModelsBest()
progress_bar = tqdm(total = 2 * repeats * 10, desc='Progress')


for hla in list_hlas:
    for i in range(repeats):
#         print('Completing Parsimonious:', hla, ' - ', i)
        X_train, y_train, X_test, y_test = getPrimeTrainDataSplit(hla)
        auc = classifier_auc(0, X_train, y_train, X_test, y_test, epochs=10000)
        auc_all.append([auc['auc_train'], auc['auc_tests']])
        progress_bar.update(1)
    hla_aucs_parsimonious.append([np.average(np.array(auc_all)[:,0]), np.average(np.array(auc_all)[:,1])])
    hla_stds_parsimonious.append([np.std(np.array(auc_all)[:,0]), np.std(np.array(auc_all)[:,1])])

    auc_all = []
    for i in range(repeats):
#         print('Completing Deep:', hla, ' - ', i)
        X_train, y_train, X_test, y_test = getPrimeTrainDataSplit(hla)
        auc = classifier_auc(1, X_train, y_train, X_test, y_test, epochs=10000)
        auc_all.append([auc['auc_train'], auc['auc_tests']])
        progress_bar.update(1)
    hla_aucs_deep.append([np.average(np.array(auc_all)[:,0]), np.average(np.array(auc_all)[:,1])])
    hla_stds_deep.append([np.std(np.array(auc_all)[:,0]), np.std(np.array(auc_all)[:,1])])

#     print('Completed:', hla)

In [7]:
# Single-Allele Classifier Results

# Calculate hla_count0 and hla_count1
hla_count0, hla_count1 = [], []
for hla in list_hlas:
    _, tr, _, te = getPrimeTrainDataSplit(hla)
    hla_count0.append(len(tr[tr==0]) + len(te[te==0]))
    hla_count1.append(len(tr[tr==1]) + len(te[te==1]))

# Calculate the positions for the bars in each group
bar_width = 0.2
x_positions = np.arange(len(list_hlas))
fig, ax1 = plt.subplots(figsize=(10, 6))

# Create a twin axes on the right side for the counts
ax2 = ax1.twinx()

# Plot the bar charts on the right y-axis
ax2.bar(x_positions - bar_width/2, hla_count0, width=bar_width, alpha=0.5, color='lime', label='Neg. Count', zorder=2)
ax2.bar(x_positions + bar_width/2, hla_count1, width=bar_width, alpha=0.5, color='green', label='Pos. Count', zorder=2)
ax2.set_ylim(0, max(max(hla_count0), max(hla_count1)))
ax2.set_ylabel('HLA Count')
ax2.legend(loc='upper right', fontsize=9)

# Plot the line charts for the AUC values on the left y-axis
ax1.errorbar(x_positions, np.array(hla_aucs_parsimonious)[:, 0], yerr=np.array(hla_stds_parsimonious)[:, 0], label='Parsimonious Train', color='red', capsize=5, marker='o', zorder=3)
ax1.errorbar(x_positions, np.array(hla_aucs_parsimonious)[:, 1], yerr=np.array(hla_stds_parsimonious)[:, 1], label='Parsimonious Test', color='magenta', capsize=5, marker='o', zorder=3)
ax1.errorbar(x_positions, np.array(hla_aucs_deep)[:, 0], yerr=np.array(hla_stds_deep)[:, 0], label='Deep Train', color='blue', capsize=5, marker='o', zorder=3)
ax1.errorbar(x_positions, np.array(hla_aucs_deep)[:, 1], yerr=np.array(hla_stds_deep)[:, 1], label='Deep Test', color='purple', capsize=5, marker='o', zorder=3)

ax1.set_xticks(x_positions)
ax1.set_xticklabels(list_hlas, rotation=45, ha='right', rotation_mode='anchor')
ax1.set_xlabel('HLAs')
ax1.set_ylabel('PRIME-Trained Classifier AUC')
ax1.set_ylim(0, 1)
ax1.set_title('PRIME-Trained Single Allele Classifier Performance')
ax1.legend(loc='lower left', fontsize=9)

plt.tight_layout(pad=2)
plt.savefig('Images/PRIME-Trained Single Allele Classifier Performance', dpi=300)
plt.show();

In [8]:
# Single-Allele vs Pan-Allelic Classifier Results

# Calculate hla_count0 and hla_count1
hla_count0, hla_count1 = [], []
for hla in list_hlas:
    _, _, _, te = getPrimeTrainDataSplit(hla)
    hla_count0.append(len(te[te==0]))
    hla_count1.append(len(te[te==1]))

# Calculate the positions for the bars in each group
bar_width = 0.2
x_positions = np.arange(len(list_hlas))
fig, ax1 = plt.subplots(figsize=(10, 6))

# Create a twin axes on the right side for the counts
ax2 = ax1.twinx()

# Plot the bar charts on the right y-axis
ax2.bar(x_positions - bar_width/2, hla_count0, width=bar_width, alpha=0.5, color='lime', label='Count (Neg. in Test)', zorder=2)
ax2.bar(x_positions + bar_width/2, hla_count1, width=bar_width, alpha=0.5, color='green', label='Count (Pos. in Test)', zorder=2)

# Set the range for the right y-axis (counts)
ax2.set_ylim(0, max(max(hla_count0), max(hla_count1)))

# Label the right y-axis
ax2.set_ylabel('Counts')

# Plot the line plots for the AUC values on the left y-axis
ax1.errorbar(x_positions, np.array(pars_aucs)[:, 0], yerr=np.array(pars_aucs)[:, 1], label='Pars. (Pan-Allelic)', color='orange', marker='o', capsize=5, zorder=3)
ax1.errorbar(x_positions, np.array(deep_aucs)[:, 0], yerr=np.array(deep_aucs)[:, 1], label='Deep. (Pan-Allelic)', color='cyan', marker='o', capsize=5, zorder=3)
ax1.errorbar(x_positions, np.array(hla_aucs_parsimonious)[:, 1], yerr=np.array(hla_stds_parsimonious)[:, 1], label='Pars. (Sing. Allele)', color='magenta', capsize=5, marker='o', zorder=3)
ax1.errorbar(x_positions, np.array(hla_aucs_deep)[:, 1], yerr=np.array(hla_stds_deep)[:, 1], label='Deep. (Sing. Allele)', color='purple', capsize=5, marker='o', zorder=3)

ax1.set_xticks(x_positions)
ax1.set_xticklabels(list_hlas, rotation=45, ha='right', rotation_mode='anchor')
ax1.set_xlabel('HLAs')
ax1.set_ylabel('PRIME-Trained Classifier AUC')
ax1.set_title('PRIME-Trained Single vs Pan-Allelic Classifier Performances')
ax1.legend(fontsize=8)

plt.tight_layout(pad=2.0)
plt.savefig('Images/Comparing PRIME-Trained Classifier Performances', dpi=300)
plt.show()

In [9]:
# Detecting all immunogenic peptides through classifier

# Load the FASTA file
fasta_path = 'EPI_ISL_402124.fasta'
sequences = list(DNA.read(fasta_path))

# Concatenate all DNA sequences
concatenated_sequence = ''.join(str(sequence) for sequence in sequences)

# Convert DNA sequences to amino acid protein sequences
protein_sequence = str(DNA(concatenated_sequence).translate())

ref_peptides = []
sequence_length = len(protein_sequence)
peptide_length = 9

for i in range(sequence_length - peptide_length + 1):
    peptide = protein_sequence[i:i + peptide_length]
    if '*' not in peptide and '*' not in protein_sequence[i:i + peptide_length + 1]:
        ref_peptides.append(peptide)

classifiers = getModelsBest()
model = copy.deepcopy(classifiers[1])
model.load_state_dict(torch.load('Models/prtr-deep.pth')) # *Change path as required*

ref_peptides_tensor = torch.nn.functional.one_hot(torch.LongTensor(convert_number(ref_peptides)), num_classes=A).type(torch.FloatTensor)

ref_imm = model(ref_peptides_tensor).detach().numpy()

In [None]:
# Get most immunogenic peptides

selected_peptides = [peptide for peptide, score in zip(ref_peptides, ref_imm) if score > np.percentile(ref_imm, 95)]

len(selected_peptides)

263