In [None]:
import os
import re
import json

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch


from botorch import fit_gpytorch_mll
from gpytorch.likelihoods import FixedNoiseGaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch import settings

from gskgpr import GaussianStringKernelGP
from seq2ascii import Seq2Ascii
import pickle

from botorch.acquisition import qUpperConfidenceBound
from botorch.sampling import SobolQMCNormalSampler
from botorch.optim.optimize import optimize_acqf_discrete




In [None]:
def initialize_model(train_x, train_y, err_y, translator,num_outputs):
    model = GaussianStringKernelGP(train_x=train_x, train_y=train_y, 
                            likelihood=FixedNoiseGaussianLikelihood(noise=err_y), 
                            translator=translator, num_outputs=num_outputs)
    mll = ExactMarginalLogLikelihood(model.likelihood, model).to(device)
    return model, mll

def append_star(name):
    if len(name) == 45:
        return name + '*****'
    else:
        return name


In [None]:
## Setting names and rounds
Previous_Round = 23
Current_Round = 24

## Names for saving the models and training data
training_sample_pickle_file = f'BO_sample_{Current_Round}.pickle'
model_name = f'GPR_Model_{Current_Round}.pth'

## Read data and library    
ELP_all=pd.read_csv('./pickes/FE_all.csv')
computed=pd.read_csv('./pickes/FE-latest.csv')

dataset = ELP_all.merge(computed[['ELP', 'dG', 'dG_err']], on='ELP', how='left')
dataset['Sequence'] = dataset['Sequence'].apply(append_star)


### Fit a GPR model

In [None]:
## Get translator
translator = Seq2Ascii("./AA.blosum62.pckl")
fspace = dataset.Sequence.to_list()
translator.fit(fspace)

## save the training samples in this round
dataset_train=dataset.dropna(subset=['dG'], inplace=False)
BO_samples = dataset_train[['ELP', 'dG', 'dG_err']].values.tolist()
with open(f'./pickles/training_samples/{training_sample_pickle_file}', 'wb') as f:
    pickle.dump(BO_samples, f)

## Get the mean and std
train_mean=dataset_train['dG'].mean()
train_std=dataset_train['dG'].std()

## Normalization
dataset_train.loc[:, 'dG_err'] = dataset_train['dG_err'] / train_std
dataset_train.loc[:, 'dG'] = (dataset_train['dG'] - train_mean) / train_std
dataset_train.loc[:, 'dG_var'] = dataset_train['dG_err']**2

print('Fitting GPR')
## encode, train and save model
device='cpu'
encoded_x = translator.encode_to_int(dataset_train.loc[:, 'Sequence'].to_list()).to(device)
train_y = torch.tensor(dataset_train.dG.to_numpy()).float().to(device)
err_y = torch.tensor(dataset_train.dG_var.to_numpy()).float().to(device)

model, mll = initialize_model(encoded_x, train_y, err_y, translator, num_outputs=1)
fit_gpytorch_mll(mll)
print(f'Actual sigma1: {model.covar_module.sigma1.item()}')
print(f'Actual sigma2: {model.covar_module.sigma2.item()}')
torch.save(model.state_dict(), f'./pickles/models/{model_name}')

## Prediction
full_space = torch.as_tensor(list(translator.int2str.keys())).view(-1, 1).to(device)
with torch.no_grad():
    post = model.posterior(full_space)


### Run an acquisition function

In [None]:
print('Performing UCB for pure exploitation')
## UCB
choices = list(translator.int2str.keys())
for i in dataset_train.Sequence: # remove the ones that are already in the training set
    choices.remove(translator.str2int[i])
choices = torch.Tensor(choices).view(-1, 1).to(device)

best_value = train_y.max()

sampler = SobolQMCNormalSampler(sample_shape=torch.Size([len(choices)]), seed=0)
qUCB = qUpperConfidenceBound(model, 0, sampler=sampler)
torch.manual_seed(seed=0)  # to keep the restart conditions the same

new_point_mc, acqui_value = optimize_acqf_discrete(
    acq_function=qUCB,
    q=2,
    choices=choices,
    max_batch_size=len(choices),
    unique=True)
decoded_sequences=translator.decode(new_point_mc.squeeze())

ELP_name = []
for decoded_seq in decoded_sequences:
    filtered_row = dataset.loc[dataset['Sequence'] == decoded_seq]
    ELP_name.append(filtered_row['ELP'].item())
print('Selected Candidates are ')
print(ELP_name)

with torch.no_grad():
    post_candidates = model.posterior(new_point_mc.view(-1, 1))
    post_candidates_mean=(post_candidates.mean.detach().numpy()*train_std+train_mean).reshape(-1)
    post_candidates_std=(post_candidates.variance.sqrt().detach().numpy()*train_std).reshape(-1)
    print(post_candidates_mean, post_candidates_std)
