In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

!pip install biopython
import Bio
from Bio import SeqIO

In [6]:
def get_fastas(path):
  """
  Extracts the sequences and IDs of proteins from a FASTA file.

  Args:
    path (str): Filepath of the FASTA file
  
  Returns:
    df (DataFrame): A Pandas dataframe containing the protein IDs and sequences

  """

  ids = []
  seqs = []

  with open(path) as handle:
      for record in Bio.SeqIO.parse(handle, "fasta"):
          ids.append(record.id)
          seqs.append(str(record.seq))
          
  df = pd.DataFrame(columns=['id', 'seq'])
  df['id'] = ids
  df['seq'] = seqs
  
  return df

In [None]:
# Loads the positive and negative proteins in the dataset

pos_train = get_fastas("data/positive_train_1.txt")
neg_train = get_fastas("data/negative_train_1.txt")

pos_test = get_fastas("data/positive_train_2.txt")
neg_test = get_fastas("data/negative_train_2.txt")

pos_train['label'] = 1
neg_train['label'] = 0

pos_test['label'] = 1
neg_test['label'] = 0

full = pd.concat([pos_train, neg_train, pos_test, neg_test])

full = full.sample(frac=1)

full

In [None]:
!pip install transformers
from transformers import EsmTokenizer, TFEsmForSequenceClassification

In [None]:
# Uses the ESM tokenizer to encode the protein sequences

tokenizer = EsmTokenizer.from_pretrained("facebook/esm2_t6_8M_UR50D")

encodings = tokenizer(list(full['seq']), return_tensors='tf', truncation=True, padding=True)['input_ids']

encodings

In [3]:
def fit_model(train_encodings, train_labels, test_encodings, test_labels, learning_rate=1e-4, batch_size=16):
  """
  Fine-tunes a pretrained ESM sequence classsifier model on the training set and
  returns the model.

  Args:
    train_encodings (tf.tensor): Tokenized encodings of the training samples
    train_labels (np.array): Labels of the training samples
    test_encodings (tf.tensor): Tokenized encodings of the test samples
    test_labels (np.array): Labels of the test samples
    learning_rate (float): Learning rate for training (default=1e-4)
    batch_size (float): Batch size for training (default=16)

  Returns:
    model (tf.model): Trained model

  """

  # Imports a pre-trained ESM-2 model with classification head

  model = TFEsmForSequenceClassification.from_pretrained(
      "facebook/esm2_t6_8M_UR50D", 
      num_labels=1)
  

  # Stops training if test loss has not improved for 2 epochs

  callbacks = [tf.keras.callbacks.EarlyStopping(patience=2, 
                                                restore_best_weights=True)]


  # beta_2 setting was taken from the original ESM-2 publication

  optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_2=0.98)

  loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)

  model.compile(optimizer=optimizer, loss=loss, metrics='accuracy')

  model.summary()

  with tf.device('/GPU:0'):
    model.fit(train_encodings, 
              train_labels, 
              epochs=15, 
              batch_size=batch_size,
              verbose=1, 
              validation_data=[test_encodings, test_labels], 
              callbacks=callbacks)
    
  return model

In [None]:
%%time

# hp_values is a list of hyperparameter settings to try

hp_values = [4, 8, 16, 32]
n_hp_values = len(hp_values)


# Splits the dataset into n_splits sets for cross-validation

n_splits = 5
splits = KFold(n_splits=n_splits)


# Initializes a NumPy array to hold the accuracy and AUC values for each
# hyperparameter setting

acc_results = np.zeros((n_splits, n_hp_values))
auc_results = np.zeros((n_splits, n_hp_values))


# Iterates through each split and tests each hyperparameter value on that split

for i, (train_index, test_index) in enumerate(splits.split(full['seq'])):

  # Splits the encodings and labels

  train_encodings = tf.gather(encodings, indices=train_index)
  test_encodings = tf.gather(encodings, indices=test_index)

  train_labels = np.array(full['label'].iloc[train_index])
  test_labels = np.array(full['label'].iloc[test_index])


  # Iterates through the hyperparameter values to be tested and fits a model
  # using each one

  for j, value in enumerate(hp_values):
    print('Split #: ', i + 1, ' Parameter val: ', value)


    # This configuration was used to modulate the hyperparameter value
    # batch_size; it can be changed to modulate learning_rate or any other model
    # hyperparameter by adding them to the fit_model args

    model = fit_model(train_encodings, 
                      train_labels, 
                      test_encodings, 
                      test_labels, 
                      batch_size=value)
    

    # Generates predictions for the test set using the trained model and tests
    # their accuracy and AUC, saving them to the "_results" arrays

    logits = model.predict(test_encodings, batch_size=8)

    probs = tf.keras.activations.sigmoid(logits.logits)

    acc_results[i][j] = accuracy_score(test_labels, np.round(probs))
    auc_results[i][j] = roc_auc_score(test_labels, probs)

In [None]:
# Saves and downloads the accuracy and AUC results as a single zip file. This
# configuration is designed for use with Google Colab

acc_results = pd.DataFrame(acc_results, columns=hp_values)
auc_results = pd.DataFrame(auc_results, columns=hp_values)

acc_results.to_csv('accuracy_results.csv')
auc_results.to_csv('auc_results.csv')

from zipfile import ZipFile
from google.colab import files

zip = ZipFile('zipped_results.zip', 'w')

zip.write('accuracy_results.csv')
zip.write('auc_results.csv')

zip.close()

files.download("zipped_results.zip")

In [None]:
# Imports the independent evaluation set

pos_evaluation = get_fastas("data/positive_test.txt")
neg_evaluation = get_fastas("data/negative_test.txt")

pos_evaluation['label'] = 1
neg_evaluation['label'] = 0

evaluation = pd.concat([pos_evaluation, neg_evaluation])

evaluation

In [11]:
# Generates the tokenized encodings for the evaluation set

eval_encodings = tokenizer(list(evaluation['seq']), return_tensors='tf', truncation=True, padding=True)['input_ids']

In [None]:
%%time

# Trains a model on the full training set and gets its performance for the 
# evaluation set

train_labels = np.array(full['label'])
eval_labels = np.array(evaluation['label'])

model = fit_model(encodings, train_labels, eval_encodings, eval_labels)

logits = model.predict(eval_encodings, batch_size=8)

probs = tf.keras.activations.sigmoid(logits.logits)

acc = accuracy_score(eval_labels, np.round(probs))
auc = roc_auc_score(eval_labels, probs)