In [4]:
from IPython.display import clear_output
!pip install tqdm subword-nmt

clear_output()

In [None]:
import sys

!git clone https://github.com/kexinhuang12345/MolTrans.git
!mv MolTrans/ESPF ESPF && rm -rf MolTrans/dataset

sys.path.append("MolTrans")

In [5]:
import copy
from pathlib import Path

import pandas as pd
import numpy as np

import torch

from config import BIN_config_DBPE
from models import BIN_Interaction_Flat
from stream import BIN_Data_Encoder

from sklearn import metrics
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, roc_curve, confusion_matrix, \
    precision_score, recall_score, auc

import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader

np.random.seed(42)
torch.manual_seed(42)  # reproducible torch:2 np:3

<torch._C.Generator at 0x7f6fd9bc88d0>

In [None]:
dataset = "BIOSNAP"
dataset_path = Path("benchmarks") / dataset
train_indices, val_indices, test_indices = [np.fromfile(str(dataset_path / f"{dataset_type}_indices.bin"), dtype=int) for dataset_type in ["train", "val", "test"]]
data = pd.read_csv(dataset_path / "data.csv").reset_index(drop=True)
data["Target Sequence"] = data["Target Sequence"].str.upper()
cols = ["Target Sequence", "SMILES", "Label"]

training_set = BIN_Data_Encoder(train_indices - train_indices.min(), data.loc[train_indices, "Label"].values, data.loc[train_indices, cols].reset_index(drop=True))
training_generator = torch.utils.data.DataLoader(training_set, **params)

validation_set = BIN_Data_Encoder(val_indices - val_indices.min(), data.loc[val_indices, "Label"].values, data.loc[val_indices, cols].reset_index(drop=True))
validation_generator = torch.utils.data.DataLoader(validation_set, **params)

In [None]:
lr = 1e-3
batch_size = 16
workers = 0
epochs = 50
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

params = {'batch_size': batch_size,
          'shuffle': True,
          'num_workers': workers,
          'drop_last': True}


model = BIN_Interaction_Flat(**BIN_config_DBPE()).cuda()
opt = torch.optim.Adam(model.parameters(), lr=lr)
torch.backends.cudnn.benchmark = True

loss_history = []

In [None]:
def test(data_generator, model):
    y_pred = []
    y_label = []
    model.eval()
    loss_accumulate = 0.0
    count = 0.0
    for i, (d, p, d_mask, p_mask, label) in enumerate(data_generator):
        score = model(d.long().cuda(), p.long().cuda(), d_mask.long().cuda(), p_mask.long().cuda())

        m = torch.nn.Sigmoid()
        logits = torch.squeeze(m(score))
        loss_fct = torch.nn.BCELoss()

        label = Variable(torch.from_numpy(np.array(label)).float()).cuda()

        loss = loss_fct(logits, label)

        loss_accumulate += loss
        count += 1

        logits = logits.detach().cpu().numpy()

        label_ids = label.to('cpu').numpy()
        y_label = y_label + label_ids.flatten().tolist()
        y_pred = y_pred + logits.flatten().tolist()

    loss = loss_accumulate / count

    fpr, tpr, thresholds = roc_curve(y_label, y_pred)

    precision = tpr / (tpr + fpr)

    f1 = 2 * precision * tpr / (tpr + precision + 0.00001)

    thred_optim = thresholds[5:][np.argmax(f1[5:])]

    print("optimal threshold: " + str(thred_optim))

    y_pred_s = [1 if i else 0 for i in (y_pred >= thred_optim)]

    auc_k = metrics.auc(fpr, tpr)
    print("AUROC:" + str(auc_k))
    print("AUPRC: " + str(average_precision_score(y_label, y_pred)))

    cm1 = confusion_matrix(y_label, y_pred_s)
    print('Confusion Matrix : \n', cm1)
    print('Recall : ', recall_score(y_label, y_pred_s))
    print('Precision : ', precision_score(y_label, y_pred_s))

    total1 = sum(sum(cm1))
    #####from confusion matrix calculate accuracy
    accuracy1 = (cm1[0, 0] + cm1[1, 1]) / total1
    print('Accuracy : ', accuracy1)

    sensitivity1 = cm1[0, 0] / (cm1[0, 0] + cm1[0, 1])
    print('Sensitivity : ', sensitivity1)

    specificity1 = cm1[1, 1] / (cm1[1, 0] + cm1[1, 1])
    print('Specificity : ', specificity1)

    outputs = np.asarray([1 if i else 0 for i in (np.asarray(y_pred) >= 0.5)])
    return roc_auc_score(y_label, y_pred), average_precision_score(y_label, y_pred), f1_score(y_label,
                                                                                              outputs), y_pred, loss.item()

In [None]:
max_auc = 0
loss_fct = torch.nn.BCELoss()


for epo in range(epochs):
        model.train()
        for i, (d, p, d_mask, p_mask, label) in enumerate(training_generator):
            score = model(d.long().cuda(), p.long().cuda(), d_mask.long().cuda(), p_mask.long().cuda())

            label = Variable(torch.from_numpy(np.array(label)).float()).cuda()

            m = torch.nn.Sigmoid()
            n = torch.squeeze(m(score))

            loss = loss_fct(n, label)
            loss.requires_grad = True
            #loss.requires_grad = True 
            loss_history.append(loss)

            opt.zero_grad()
            loss.backward()
            opt.step()

            if (i % 1000 == 0):
                print('Training at Epoch ' + str(epo + 1) + ' iteration ' + str(i) + ' with loss ' + str(
                    loss.cpu().detach().numpy()))

        # every epoch test
        with torch.set_grad_enabled(False):
            auc, auprc, f1, logits, loss = test(validation_generator, model)
            if auc > max_auc:
                model_max = copy.deepcopy(model)
                max_auc = auc
            print('Validation at Epoch ' + str(epo + 1) + ' , AUROC: ' + str(auc) + ' , AUPRC: ' + str(
                auprc) + ' , F1: ' + str(f1))