In [1]:
import numpy as np
import pandas as pd
import torch.nn as nn
import torch
import torch.nn.functional as F
# from deepquantum import Circuit
# from deepquantum.utils import dag,measure_state,ptrace,multi_kron,encoding,measure,expecval_ZI
import torch.optim as optim
import os
import json
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

In [4]:
embedding_num_drug = 64      #字典序
embedding_num_target = 25    #字典序
embedding_dim_drug = 16      #2^6
embedding_dim_target = 16    #2^6
hyber_para = 16              #2^4
qubits_cirAorB = 4           #每一边的qubits数
dim_embed = hyber_para

In [2]:
from typing import Optional, Sequence
from torch.utils.data import Dataset, DataLoader, Sampler
import os

from torch.utils.data.dataloader import _collate_fn_t, _worker_init_fn_t
VOCAB_PROTEIN = {"A": 1, "C": 2, "B": 3, "E": 4, "D": 5, "G": 6,
                 "F": 7, "I": 8, "H": 9, "K": 10, "M": 11, "L": 12,
                 "O": 13, "N": 14, "Q": 15, "P": 16, "S": 17, "R": 18,
                 "U": 19, "T": 20, "W": 21,
                 "V": 22, "Y": 23, "X": 24,
                 "Z": 25}
VOCAB_LIGAND_ISO = {"#": 29, "%": 30, ")": 31, "(": 1, "+": 32, "-": 33, "/": 34, ".": 2,
                    "1": 35, "0": 3, "3": 36, "2": 4, "5": 37, "4": 5, "7": 38, "6": 6,
                    "9": 39, "8": 7, "=": 40, "A": 41, "@": 8, "C": 42, "B": 9, "E": 43,
                    "D": 10, "G": 44, "F": 11, "I": 45, "H": 12, "K": 46, "M": 47, "L": 13,
                    "O": 48, "N": 14, "P": 15, "S": 49, "R": 16, "U": 50, "T": 17, "W": 51,
                    "V": 18, "Y": 52, "[": 53, "Z": 19, "]": 54, "\\": 20, "a": 55, "c": 56,
                    "b": 21, "e": 57, "d": 22, "g": 58, "f": 23, "i": 59, "h": 24, "m": 60,
                    "l": 25, "o": 61, "n": 26, "s": 62, "r": 27, "u": 63, "t": 28, "y": 64}

def str2int(ligand, protein, label):
    ligand = [VOCAB_LIGAND_ISO[s] for s in ligand]
    protein = [VOCAB_PROTEIN[s] for s in protein]
        
    if len(ligand) < 128:
         ligand = np.pad(ligand, (0, 128 - len(ligand)))
    else:
        ligand = ligand[:128]
    if len(protein) < 512:
        protein = np.pad(protein, (0, 512 - len(protein)))
    else:
        protein = protein[:512]

    return torch.tensor(ligand, dtype=torch.long), torch.tensor(protein, dtype=torch.long), torch.tensor(label, dtype=torch.float)

def Gram(data, hyber_para = 16):
    data = data.T @ data
    data = data.view(hyber_para, hyber_para)
    return data


class MyDataset(Dataset):
    def __init__(self, root_dir, csv_name, embed_prot=len(VOCAB_PROTEIN), embed_lig=len(VOCAB_LIGAND_ISO), is_Gram = False):
        super(MyDataset).__init__()
        self.embed_lig = nn.Embedding(embed_lig, 16, padding_idx=0)
        self.embed_prot = nn.Embedding(embed_prot, 16, padding_idx=0)
        self.is_Gram = is_Gram
        self.path = os.path.join(root_dir, csv_name)
        self.data = pd.read_csv(self.path)
        self.embed_drug = nn.Embedding(embed_lig, embed_lig, padding_idx=0)
        self.embed_target = nn.Embedding(embed_prot, embed_prot, padding_idx=0)

    def __getitem__(self, idx):
        ligand , protein, label =  self.data.iloc[idx,]['smiles'], self.data.iloc[idx,]['sequence'], self.data.iloc[idx,]['label']
        ligand, protein, label = str2int(ligand, protein, label)
        if self.is_Gram:
            ligand = Gram(self.embed_lig(ligand))
            protein = Gram(self.embed_prot(protein))
        return ligand, protein, label
    
    def __len__(self):
        return len(self.data)
    
if __name__ == '__main__':
    root_dir = r'./data/'
    train_name = 'training_dataset.csv'
    train_dataset = MyDataset(root_dir, train_name)
    train_dataloader = DataLoader(train_dataset, batch_size=32,shuffle=True,drop_last=True)
    print(len(VOCAB_LIGAND_ISO))
    # for data in train_dataloader:
    #     ligand, _, _ = data
    #     print(ligand)

64


In [3]:
class Linear(nn.Module):
    def __init__(self):
        super().__init__()
        self.FC1 = nn.Linear(128,32)
        self.FC2 = nn.Linear(32,1)
    def forward(self,x):
        out = F.leaky_relu(self.FC1(x))
        out = F.leaky_relu(self.FC2(out))
        return out

In [5]:
class CNNLayerBased(nn.Module):
    def __init__(self, embedding_num_drug, embedding_num_target, embedding_dim_drug=dim_embed,
                  embedding_dim_target=dim_embed, conv1_out_dim = qubits_cirAorB):
        super().__init__()
        # self.data_pre = ClassicalPre()
        self.drugconv1d = nn.Conv1d(embedding_dim_drug, conv1_out_dim, kernel_size = 4, stride = 1, padding = 'same')
        self.targetconv1d = nn.Conv1d(embedding_dim_target, conv1_out_dim, kernel_size = 4, stride = 1, padding = 'same')
        self.linearlayer = Linear()
    def forward(self,drug_input, target_input):   #x是一条记录
        # drug_input, target_input = self.data_pre(x)
        drug_output = self.drugconv1d(drug_input)   #1 4 16

        target_output = self.targetconv1d(target_input)   #1 4 16

        linear_input = torch.cat([drug_output, target_output],dim=1).view(drug_output.shape[0],-1)
        # print('linear',linear_input.shape)
        linear_output = self.linearlayer(linear_input)
        affinity = linear_output
        return affinity

In [6]:
class Conv1d(nn.Module):
    """
    Three 1d convolutional layer with relu activation stacked on top of each other
    with a final global maxpooling layer
    """
    def __init__(self, vocab_size, channel, kernel_size, stride=1, padding=0):
        super(Conv1d, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim=128)
        self.conv1 = nn.Conv1d(128, channel, kernel_size, stride, padding)
        self.conv2 = nn.Conv1d(channel, channel*2, kernel_size, stride, padding)
        self.conv3 = nn.Conv1d(channel*2, channel*3, kernel_size, stride, padding)
        self.relu = nn.ReLU()
        self.globalmaxpool = nn.AdaptiveMaxPool1d(1)

    def forward(self, x):
        x = self.embedding(x)
        
        x = x.permute(0, 2, 1)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.globalmaxpool(x)
        x = x.squeeze(-1)
        return x

class DeepDTA(nn.Module):
    """DeepDTA model architecture, Y-shaped net that does 1d convolution on 
    both the ligand and the protein representation and then concatenates the
    result into a final predictor of binding affinity"""

    def __init__(self, pro_vocab_size, lig_vocab_size, channel, protein_kernel_size, ligand_kernel_size):
        super(DeepDTA, self).__init__()
        self.ligand_conv = Conv1d(lig_vocab_size, channel, ligand_kernel_size)
        self.protein_conv = Conv1d(pro_vocab_size, channel, protein_kernel_size)
        self.fc1 = nn.Linear(channel*6, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.fc3 = nn.Linear(1024, 512)
        self.fc4 = nn.Linear(512, 1)
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()

    def forward(self, protein, ligand):
        x1 = self.ligand_conv(ligand)
        x2 = self.protein_conv(protein)
        x = torch.cat((x1, x2), dim=1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc4(x)
        return x
    
# if __name__ == '__main__':
#     root_dir = r'./data/'
#     train_name = 'training_dataset.csv'
#     train_dataset = MyDataset(root_dir, train_name)
#     train_dataloader = DataLoader(train_dataset, batch_size=32,shuffle=True,drop_last=True)
#     model = DeepDTA(512,128,32,12,4)
#     for data in train_dataloader:
#         ligand, protein, _ = data
#         pred = model(ligand, protein)
#         print(pred)


In [7]:
import math

def get_mse(records_real, records_predict):
    """
    均方误差 估计值与真值 偏差
    """
    if len(records_real) == len(records_predict):
        return sum([(x - y) ** 2 for x, y in zip(records_real, records_predict)]) / len(records_real)
    else:
        return None

def get_rmse(records_real, records_predict):
    """
    均方根误差：是均方误差的算术平方根
    """
    mse = get_mse(records_real, records_predict)
    if mse:
        return math.sqrt(mse)
    else:
        return None
    
def cal_pccs(x, y, n):
    """
    warning: data format must be narray
    :param x: Variable 1
    :param y: The variable 2
    :param n: The number of elements in x
    :return: pccs
    """
    sum_xy = np.sum(np.sum(x*y))
    sum_x = np.sum(np.sum(x))
    sum_y = np.sum(np.sum(y))
    sum_x2 = np.sum(np.sum(x*x))
    sum_y2 = np.sum(np.sum(y*y))
    pcc = (n*sum_xy-sum_x*sum_y)/np.sqrt((n*sum_x2-sum_x*sum_x)*(n*sum_y2-sum_y*sum_y))
    return pcc

In [8]:
from scipy.stats import pearsonr
# pccs = pearsonr(x, y)

def deepdta_val(model,dataloader):
    model.eval()
    
    pred_list = []
    label_list = []
    rmse_list = []
    for data in dataloader:
        ligand, protein, label =  data
        exp = torch.tensor(label, dtype=torch.float).unsqueeze(-1).detach().numpy()
        pred = model(ligand, protein).detach().numpy()
        rmse_list.append(get_rmse(exp, pred))
        pred_list.append(pred)
        label_list.append(exp)
    rmse = sum(rmse_list) / len(rmse_list)
    # rmse = get_rmse(label_list, pred_list)
    
    # print('pred_list:', pred_list[0].shape)
    pred = np.concatenate(pred_list).reshape(-1)
    label = np.concatenate(label_list).reshape(-1)
    # print('pred:',pred.shape)
    # print('label', label.shape)
    # print(0 in pred)
    # print(0 in label)
    pccs = np.corrcoef(pred, label)[0, 1]
    # pccs = cal_pccs(label_list, pred_list, len(label_list))
    # pccs = np.column_stack(label_list, pred_list)
    # pccs = sum(pccs_list) / len(pccs_list)
    
    return rmse, pccs

if __name__ == '__main__':
    root_dir = r'./data/'
    train_name = 'test_dataset.csv'
    train_dataset = MyDataset(root_dir, train_name)
    train_dataloader = DataLoader(train_dataset, batch_size=32,shuffle=True,drop_last=True)
    model = DeepDTA(512,128,32,12,4)
    rmse, pccs = deepdta_val(model=model, dataloader=train_dataloader)
    print('rmse', rmse)
    print('pccs', pccs)

  if sys.path[0] == "":


rmse 6.743253702088819
pccs 0.2817903143030842


In [None]:
# DeepDTA + MyDataset

from tqdm import tqdm

# 处理np计算过程中遇到0除以0的情况
np.seterr(divide='ignore',invalid='ignore')

root_dir = r'./data/'
train_dataset_name = 'training_dataset.csv'
test_dataset_name = 'test_dataset.csv'
val_dataset_name = 'validation_dataset.csv'
TRAIN_PATH = './data/DTA_result/train/6-14/epoch{} train_loss_min_{}_dict_dta.pth'
TEST_PATH = './data/DTA_result/test/6-14/epoch{}_dict_dta.pth'
BEST_RESULT_PATH = './data/DTA_result/best_result_dict_dta.pth'

train_dataset = MyDataset(root_dir=root_dir, csv_name=train_dataset_name)
train_dataloader = DataLoader(train_dataset, batch_size=32,shuffle=True,drop_last=True)

test_dataset = MyDataset(root_dir=root_dir, csv_name=test_dataset_name)
test_dataloader = DataLoader(test_dataset, batch_size=32,shuffle=True,drop_last=True)

val_dataset = MyDataset(root_dir=root_dir, csv_name=val_dataset_name)
val_dataloader = DataLoader(val_dataset, batch_size=32,shuffle=True,drop_last=True)

model = DeepDTA(512,128,32,12,4)
criterion=nn.MSELoss()
optimizer=optim.Adam(model.parameters(),lr=0.001)
epochs = 600

loss_list = []
train_loss_list = []
test_rmse_list = []
test_pr_list = []

train_min_loss = 1000
test_min_rmse = 1000
test_max_pr = -1

for epoch in range(epochs):
    for data in train_dataloader:
        ligand, protein, label = data
        pred = model(ligand, protein)
        optimizer.zero_grad()
        loss = criterion(pred, label)
        loss_list.append(loss.detach().numpy().tolist())
        loss.backward()
        optimizer.step()
    train_loss = sum(loss_list) / len(loss_list)
    if train_loss < train_min_loss:
        train_min_loss = train_loss
        torch.save(model.state_dict(), TRAIN_PATH.format(str(epoch),str(train_loss)[7:13]))

    # start validatin
    test_rmse, test_pr = deepdta_val(model, test_dataloader)
    train_loss_list.append(train_loss)
    test_rmse_list.append(test_rmse)
    test_pr_list.append(test_pr)
    
    if test_rmse < test_min_rmse or test_pr > test_max_pr:
        if test_rmse < test_min_rmse and test_pr > test_max_pr:
            test_min_rmse = test_rmse
            test_max_pr = test_pr
            torch.save(model.state_dict(), BEST_RESULT_PATH)
            print('best result-epochs:', epoch + 1, 'train-loss:', '%.4f' % train_loss)
            print('valid-rmse:', '% .4f'%test_rmse, 'valid-pr:','% .4f'%test_pr)
            continue
        elif test_rmse < test_min_rmse :
            test_min_rmse = test_rmse
        else:
            test_max_pr = test_pr
        torch.save(model.state_dict(), TEST_PATH.format(str(epoch)))
    

    print('epochs:', epoch + 1, 'train-loss:', '%.4f' % train_loss)
    print('valid-rmse:', '% .4f'%test_rmse, 'valid-pr:','% .4f'%test_pr)

model.load_state_dict(torch.load(BEST_RESULT_PATH)) 
val_rmse, val_pr = deepdta_val(model, val_dataloader)
print('valid-rmse:', '% .4f'%val_rmse, 'valid-pr:','% .4f'%val_pr)

save_path = "./data/DTA_result/"
if not os.path.exists(save_path):
    os.mkdir(save_path)
dict = {"train_loss": train_loss_list,"rmse":test_rmse_list, "pr":test_pr_list}
with open(save_path + "DTA_result.json", "w") as f:
    json.dump(dict, f)


In [None]:
# CNN + MyDataset

np.seterr(divide='ignore',invalid='ignore')

root_dir = r'./data/'
train_dataset_name = 'training_dataset.csv'
test_dataset_name = 'test_dataset.csv'
val_dataset_name = 'validation_dataset.csv'
TRAIN_PATH = './data/DTI_result/6-16/train/epoch{} train_loss_min_{}_dict_cnn.pth'
TEST_PATH = './data/DTI_result/6-16/test/epoch{}_dict_cnn.pth'
BEST_RESULT_PATH = './data/DTI_result/best_result_dict_cnn.pth'

train_dataset = MyDataset(root_dir=root_dir, csv_name=train_dataset_name, is_Gram=True)
train_dataloader = DataLoader(train_dataset, batch_size=32,shuffle=True,drop_last=True)

test_dataset = MyDataset(root_dir=root_dir, csv_name=test_dataset_name, is_Gram=True)
test_dataloader = DataLoader(test_dataset, batch_size=32,shuffle=True,drop_last=True)

val_dataset = MyDataset(root_dir=root_dir, csv_name=val_dataset_name, is_Gram=True)
val_dataloader = DataLoader(val_dataset, batch_size=32,shuffle=True,drop_last=True)

model = CNNLayerBased(64,25)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 500

loss_list = []
train_loss_list = []
test_rmse_list = []
test_pr_list = []

train_min_loss = 1000
test_min_rmse = 1000
test_max_pr = -1

for epoch in range(epochs):
    for data in train_dataloader:
        ligand, protein, label = data
        # print(ligand.shape)
        # ligand = np.squeeze(ligand)
        # protein = np.squeeze(protein)
        # print(ligand.shape)
        pred = model(ligand, protein)
        optimizer.zero_grad()
        loss = criterion(pred, label)
        loss_list.append(loss.detach().numpy().tolist())
        loss.backward()
        optimizer.step()
    train_loss = sum(loss_list) / len(loss_list)
    if train_loss < train_min_loss:
        train_min_loss = train_loss
        torch.save(model.state_dict(), TRAIN_PATH.format(str(epoch),str(train_loss)[7:13]))

    # start validatin
    test_rmse, test_pr = deepdta_val(model, test_dataloader)
    train_loss_list.append(train_loss)
    test_rmse_list.append(test_rmse)
    test_pr_list.append(test_pr)
    
    if test_rmse < test_min_rmse or test_pr > test_max_pr:
        if test_rmse < test_min_rmse and test_pr > test_max_pr:
            test_min_rmse = test_rmse
            test_max_pr = test_pr
            torch.save(model.state_dict(), BEST_RESULT_PATH)
            print('best result-epochs:', epoch + 1, 'train-loss:', '%.4f' % train_loss)
            print('valid-rmse:', '% .4f'%test_rmse, 'valid-pr:','% .4f'%test_pr)
            continue
        elif test_rmse < test_min_rmse :
            test_min_rmse = test_rmse
        else:
            test_max_pr = test_pr
        torch.save(model.state_dict(), TEST_PATH.format(str(epoch)))
    

    print('epochs:', epoch + 1, 'train-loss:', '%.4f' % train_loss)
    print('valid-rmse:', '% .4f'%test_rmse, 'valid-pr:','% .4f'%test_pr)

model.load_state_dict(torch.load(BEST_RESULT_PATH)) 
val_rmse, val_pr = deepdta_val(model, val_dataloader)
print('valid-rmse:', '% .4f'%val_rmse, 'valid-pr:','% .4f'%val_pr)

save_path = "./data/DTI_result/"
if not os.path.exists(save_path):
    os.mkdir(save_path)
dict = {"train_loss": train_loss_list,"rmse":test_rmse_list, "pr":test_pr_list}
with open(save_path + "DTI_result_epoch500.json", "w") as f:
    json.dump(dict, f)