In [1]:
## Standard libraries
import os
import numpy as np
import math
import json
from functools import partial

import random as rd

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import GPUtil
import torch
import torch.nn as nn
import sys
sys.path.append('../model')
from utils import amino_acid_to_number, tokenize

device = "cuda:0"

import sys
sys.path.append('../model')
from functions import get_A2N_list, tokenize, make_train_val_test_lists_rand, prepare_data
from models import ProtDataset

outpath = "../output/"

In [2]:
# os.makedirs(outpath + study_id + "_rep_" + str(0))

In [3]:
data_name = "Faure2023_1_lenient"
train_percent = 20

In [4]:
# make folder for storing analysis outputs

study_id = "_".join([data_name, str(train_percent) + "%"])

matching_folders = [folder for folder in os.listdir(outpath) if study_id in folder and os.path.isdir(os.path.join(outpath, folder)) ]

if len(matching_folders) == 0:
    rep = 0
else: rep = np.max([int(folder.split("_")[-1]) for folder in matching_folders]) + 1

results_path = outpath + "_".join([study_id, "rep", str(rep)])
os.makedirs(results_path)

In [5]:
R2s = pd.DataFrame(columns=['Model', 'R2'])
R2s.to_csv(os.path.join(results_path, 'R2s.csv'), index=False)

### Read in data

In [6]:
in_path = "../Data/Data_prepared/" + data_name + ".csv"
datafile = pd.read_csv(in_path, index_col=None)

In [7]:
phenotypes, seqs, seqs1h = prepare_data(datafile)

  seqs = seqs[:, sites_var]


In [8]:
_, L, AA_size = seqs1h.shape
print(f"sequence length = {L}; ", f"AA_size = {AA_size}")

sequence length = 34;  AA_size = 2


In [9]:
num_train = int(.01*train_percent*len(datafile))
num_test = 2000
train_list, val_list, test_list = make_train_val_test_lists_rand(datafile, num_train, num_test)    
print(num_train)

25864


### Linear model

In [13]:
model_name = "Linear"
from models import LinearModel

In [14]:
import torch.utils.data as data

X = seqs1h.float().to(device)
y = phenotypes.to(device)

X_train, y_train = X[train_list], y[train_list]
X_val, y_val = X[val_list], y[val_list]
X_test, y_test = X[test_list], y[test_list]


train_dataset = ProtDataset(X_train, y_train)
train_loader = data.DataLoader(train_dataset,
                               batch_size=1000,
                               shuffle=True,
                               drop_last=False)

In [15]:
dropout_p = 0.0
model = LinearModel(L, AA_size, dropout_p).cuda()

In [16]:
# from models import LinearModel

In [17]:
import torch.optim as optim
import torch
import torch.nn as nn


from scipy.stats import pearsonr
learning_rate = 0.01
epochs = 300

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    total_loss = 0
    for batch_inputs, batch_targets in train_loader:
        model.train()
        optimizer.zero_grad()
        outputs = model(batch_inputs)
        loss = criterion(outputs, batch_targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}")
        model.eval()
        pred, true = model(X_val.flatten(1)).flatten().detach().cpu().numpy(), y_val.flatten().detach().cpu().numpy()
        print(pearsonr(pred, true)[0]**2)

Epoch 1/300, Loss: 0.6456961929798126
0.006871824042067844
Epoch 11/300, Loss: 0.26104320992123
0.006732416532868313
Epoch 21/300, Loss: 0.2609321136366237
0.10676915831027803
Epoch 31/300, Loss: 0.2589776692065326
0.3646081428152946
Epoch 41/300, Loss: 0.11678294295614416
0.5721909380724145
Epoch 51/300, Loss: 0.11415663361549377
0.5748013816128043
Epoch 61/300, Loss: 0.11293533444404602
0.5784241291034687
Epoch 71/300, Loss: 0.11225231398235667
0.582069313158436
Epoch 81/300, Loss: 0.11187286471778696
0.5835685444347483
Epoch 91/300, Loss: 0.11160500008951534
0.584386002243876
Epoch 101/300, Loss: 0.11124361103231256
0.5851785991392328
Epoch 111/300, Loss: 0.11113004860552875
0.5872695072721136
Epoch 121/300, Loss: 0.11100862513888966
0.585480210974256
Epoch 131/300, Loss: 0.11089750582521612
0.5866639571734393
Epoch 141/300, Loss: 0.111132873730226
0.5860542009164328
Epoch 151/300, Loss: 0.1109411818060008
0.5874863395103854
Epoch 161/300, Loss: 0.11129309914328835
0.587949751573138

In [18]:
model.eval()
pred, true = model(X_test.flatten(1)).flatten().detach().cpu().numpy(), y_test.flatten().detach().cpu().numpy()

r2_test = pearsonr(pred, true)[0]**2

print(f"{model_name} model achieved test R2 = {r2_test}")

Linear model achieved test R2 = 0.6111873340301814


In [19]:
import csv
with open(os.path.join(results_path, "R2s.csv"), mode='a', newline='') as file:
    writer = csv.writer(file)
    writer.writerows([[model_name, r2_test]])

### Transformer model

In [10]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from models import Transformer_torch_MHA

In [11]:
seqs_ex = seqs + AA_size*torch.tensor(range(L))
X = seqs_ex.to(device)
y = phenotypes.to(device)
X_train, y_train = X[train_list], y[train_list]
X_val, y_val = X[val_list], y[val_list]
X_test, y_test = X[test_list], y[test_list]
train_dataset = ProtDataset(X_train, y_train)

In [12]:
import torch.utils.data as data
batch_size = 544
train_loader = data.DataLoader(train_dataset,
                               batch_size=batch_size,
                               shuffle=True,
                               drop_last=False)

In [13]:
# Best Trial:
#   Value: 0.7601
#   Params: 
#     hidden_dim_h: 23
#     dropout: 0.12805161023112027
#     batch_size: 544


In [14]:
# sequence_length = L
# input_dim = AA_size*L
# output_dim = 1
# num_layers = 2
# num_heads = 4
# hidden_dim = 23*num_heads
# dropout = 0.12805161023112027

# model = Transformer_torch_MHA(L, input_dim, hidden_dim, num_layers, num_heads, dropout).to(device)

In [15]:
# from scipy.stats import pearsonr
# learning_rate = 0.001
# epochs = 500

# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# for epoch in range(epochs):
#     model.train()
#     total_loss = 0
#     for batch_inputs, batch_targets in train_loader:
#         optimizer.zero_grad()
#         outputs = model(batch_inputs)
#         loss = criterion(outputs, batch_targets)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()
    
#     if epoch % 20 == 0:
#         print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}")
#         model.eval()
#         pred, true = model(X_test.flatten(1)).flatten().detach().cpu().numpy(), y_test.flatten().detach().cpu().numpy()
#         print(pearsonr(pred, true)[0]**2)

#### Hyperparameter Tuning

In [89]:
import optuna
from scipy.stats import pearsonr

learning_rate = 0.001
num_heads = 4

sequence_length = L
input_dim = AA_size*L
output_dim = 1

def objective(trial):
    global criterion_best, model_best

    hidden_dim_h = trial.suggest_int('hidden_dim_h', 10, 50)
    dropout = trial.suggest_float('dropout', 0.05, 0.35)
    batch_size = trial.suggest_int('batch_size', 100, 1200)
    n_epochs = trial.suggest_int('n_epochs', 30, 300)
    
    print(f"Build model with {num_layers} layers of attention")
    model = Transformer_torch_MHA(L, input_dim, hidden_dim_h*num_heads, num_layers, num_heads, dropout).to(device)
    
    train_loader = data.DataLoader(train_dataset,
                                   batch_size=batch_size,
                                   shuffle=True,
                                   drop_last=False)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    r2_test = []
    try: 
        for epoch in range(n_epochs):

                model.train()
                total_loss = 0
                for batch_inputs, batch_targets in train_loader:
                    optimizer.zero_grad()
                    outputs = model(batch_inputs)
                    loss = criterion(outputs, batch_targets)
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item()

                if epoch % 10 == 0:
                    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {total_loss/len(train_loader)}")
                    model.eval()
                    pred, true = model(X_val.flatten(1)).flatten().detach().cpu().numpy(), y_val.flatten().detach().cpu().numpy()
                    print(pearsonr(pred, true)[0]**2)
                    if pearsonr(pred, true)[0]**2 == "nan":
                        break
                    r2_test.append(pearsonr(pred, true)[0]**2)
                    
    except: print("training failed")
    
    criterion = np.array(r2_test)[-1]
    if criterion > criterion_best:
        print("Found better hyperparameter, update model")
        criterion_best = criterion
        model_best = model
    
    return np.array(r2_test)[-1]

In [None]:
n_trials = 100
for num_layers in [1, 2, 3]:

    model_name = "TF_" + str(num_layers)

    criterion_best = 0.
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=100)

    # Print the best hyperparameters
    best_trial = study.best_trial
    print("Best Trial:")
    print(f"  Criterion: {best_trial.value:.4f}")
    print("  Params: ")
    for key, value in best_trial.params.items():
        print(f"    {key}: {value}")  

    best_hyper_parameters = {}
    for key, value in best_trial.params.items():
        best_hyper_parameters[key] = value

    model_best.eval()
    pred, true = model_best(X_val.flatten(1)).flatten().detach().cpu().numpy(), y_val.flatten().detach().cpu().numpy()

    r2_test = pearsonr(pred, true)[0]**2
    print(f"{model_name} achieved R2 = {r2_test}")

    # save test R2 score
    import csv
    with open(os.path.join(results_path, "R2s.csv"), mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerows([[model_name, r2_test]])

    # save predictions
    pd.DataFrame({"prediction": pred, "true": true}).to_csv(os.path.join(results_path, model_name + "_predictions.csv"), index=False)

    # save best model
    torch.save(model_best, os.path.join(results_path, model_name + "_BestModel"))        

In [90]:
# num_layers = 1
# model_name = "TF_" + str(num_layers)

# criterion_best = 0.
# study = optuna.create_study(direction='maximize')
# study.optimize(objective, n_trials=10)

# # Print the best hyperparameters
# best_trial = study.best_trial
# print("Best Trial:")
# print(f"  Criterion: {best_trial.value:.4f}")
# print("  Params: ")
# for key, value in best_trial.params.items():
#     print(f"    {key}: {value}")

# best_hyper_parameters = {}
# for key, value in best_trial.params.items():
#     best_hyper_parameters[key] = value

# model_best.eval()
# pred, true = model_best(X_val.flatten(1)).flatten().detach().cpu().numpy(), y_val.flatten().detach().cpu().numpy()

# r2_test = pearsonr(pred, true)[0]**2
# print(f"{model_name} achieved R2 = {r2_test}")

# # save test R2 score
# import csv
# with open(os.path.join(results_path, "R2s.csv"), mode='a', newline='') as file:
#     writer = csv.writer(file)
#     writer.writerows([[model_name, r2_test]])

# # save predictions
# pd.DataFrame({"prediction": pred, "true": true}).to_csv(os.path.join(results_path, model_name + "_predictions.csv"), index=False)

# # save best model
# torch.save(model_best, os.path.join(results_path, model_name + "_BestModel"))