In [None]:
#***************************************************************************
#                          CLASICAL
#                          
#***************************************************************************

import torch
import torch.nn as nn
from torch.nn import Sequential, Linear, ReLU


class C_Net(nn.Module):
    
    def __init__(self, embed_dim=128,  n_filters=16, output_dim=128, dropout=0.2,  n_output=1, dilaSize=1):
        
        
        super(C_Net, self).__init__()

        self.embed_smile = nn.Embedding(65, embed_dim)
        self.embed_prot = nn.Embedding(26, embed_dim)
        
        #smiles 
        self.smiles = nn.Sequential(
            nn.Conv1d(in_channels= embed_dim, out_channels= embed_dim, kernel_size=3,padding=dilaSize, dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 2 ,dilation=dilaSize * 2),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            nn.ReLU(),
           nn.AdaptiveMaxPool1d(1)                           
        )
            
        #proteins sequence
        self.proteins = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=embed_dim, kernel_size=3,padding=dilaSize ,dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim  , out_channels=embed_dim, kernel_size=3,padding=dilaSize *2 ,dilation=dilaSize *2),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1)
          )
        

        #self.smiles_descriptors =  nn.Sequential(    #With descriptors
        #     nn.Linear(6 , 6),
        #     nn.ReLU()
         #)
            
        #self.linear = nn.Linear(embed_dim,  output_dim)
        
        self.layer2 = 128;
        self.layer3 =  64;
    
        

        self.predict = nn.Sequential(
                                    #nn.Linear(2 * embed_dim + 6, 1024),  #With descriptors
                                    nn.Linear(2 * output_dim, self.layer2 ),   #no descriptors
                                    nn.ReLU(),
                                    nn.Dropout(dropout),
                                    nn.Linear(self.layer2 , self.layer3),
                                    nn.ReLU(),
                                    nn.Dropout(dropout),
                                    nn.Linear(self.layer3 , n_output)#,
                                    #nn.Sigmoid()           #7/4/24          #sigmoid giati to target exei times 0 -1, diaforetika den bazw tipota
                                    )
  
    def forward(self, smi_in, seq_in,  smi_desc):#65,26
        
        embedded_smi  = self.embed_smile(smi_in) 
        embedded_seq  = self.embed_prot(seq_in)

      
        smi = self.smiles(embedded_smi.transpose(1,2))
        seq = self.proteins(embedded_seq.transpose(1,2))
        

        smi = smi.squeeze()
        seq = seq.squeeze()
        
        # concat
        #smi_seq = torch.cat((smi, seq, smi_desc),1)   #With descriptors

        smi_seq = torch.cat((smi, seq),1)           #no descriptors
        
        out = self.predict(smi_seq)

        return out 

In [None]:
#******************************************************************
#                          HYBRID
#
#******************************************************************

import torch
import torch.nn as nn
from torch.nn import Sequential, Linear, ReLU
import math
import pennylane as qml
from functools import partial
import pennylane.numpy as np


n_qubits  = 4
n_layers  = 3 
n_features = 256
n_angle_encoding = math.ceil(((n_features)/n_qubits)/2) #dense angle
#n_angle_encoding = math.ceil(((n_features)/n_qubits)) #angle
batch_size = 256
n_blocks = 2

# default.qubit RUN ALL EXPERIMENTS**********
#dev = qml.device("default.qubit", wires=n_qubits )
#dev = qml.device("lightning.qubit", wires=n_qubits )  # NOT USED*************

#  NOISE SIMULATORS************************************8
# Describe noise
#noise_gate = qml.AmplitudeDamping
#noise_gate = qml.DepolarizingChannel
#noise_strength = 0.1

# Load devices
dev_ideal = qml.device("default.mixed", wires=n_qubits)  # the ideal device with no noise
#dev_noisy = qml.transforms.insert(noise_gate, noise_strength)(dev_ideal) # the noisy device with noise
#dev_noisy = qml.transforms.insert(dev_ideal, noise_gate, noise_strength, position="all")



def circuit_amplitude(inputs, weights):                 #inputs  256 (batch size) x 256  output previous layer

    qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True )

    for W in weights: 
        for i in range(n_qubits):
            qml.Rot(*W[i], wires=i)                                   

        for i in range(n_qubits):
            qml.CNOT(wires=[i, (i + 1) % n_qubits])

    for i in range(n_qubits-1):
         qml.CNOT(wires=[ (i + 1) % n_qubits, 0])
         
    return  qml.expval(qml.PauliZ(0))



# einai isodinamo me to panw mono poy to cnot = range[1,2,3] ενώ το πάνω είναι range[1,1,1]
def circuit_angle_qml(inputs, weights):
    for i in range(n_angle_encoding):
        qml.AngleEmbedding(math.pi * inputs[:,i*n_qubits:i*n_qubits+n_qubits], wires=range(n_qubits))    #rotation='X'
        qml.StronglyEntanglingLayers(weights[i], wires = range (n_qubits), ranges=[1,1,1])   

    # prosthesa CNOT  me 2 orisma to qubit 1 22/4/2024
    for i in range(n_qubits-1):
         qml.CNOT(wires=[ (i + 1) % n_qubits, 0])

    return qml.expval(qml.PauliZ(0))


# ta dedomena apo ta klasika layer prepei na  ginoun normalize
#@qml.qnode(dev)
def circuit_dense_angle(inputs, weights):                 #inputs  256 (batch size) x 256  output previous layer

    for j in range(n_angle_encoding):               
        for i in range(n_qubits):                               #encoding
            qml.RX(math.pi *  inputs[:, i*2   + n_qubits *j] ,  wires=i)      #dense endocoding kai oxi aplo
            qml.RY(math.pi *  inputs[:, i*2+1 + n_qubits *j],   wires=i)

        qml.StronglyEntanglingLayers(weights[j], wires = range ( n_qubits ), ranges=[1,1,1])  

    for i in range(n_qubits-1):
         qml.CNOT(wires=[ (i + 1) % n_qubits, 0])
            
    return qml.expval(qml.PauliZ(0))



class HQ_Net(nn.Module):
    
    def __init__(self, embed_dim=128,  n_filters=16, output_dim=128, dropout=0.2,  n_output=1, dilaSize=1, normalize=True):
        
        
        super(HQ_Net, self).__init__()

        self.embed_smile = nn.Embedding(65, embed_dim)
        self.embed_prot = nn.Embedding(26, embed_dim)
        
        #smiles 
        self.smiles = nn.Sequential(
            nn.Conv1d(in_channels= embed_dim, out_channels= embed_dim, kernel_size=3,padding=dilaSize, dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 2 ,dilation=dilaSize * 2),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            #nn.ReLU(), 7/4/2024 return 0 - 1
            nn.Sigmoid(), 
            nn.AdaptiveMaxPool1d(1)                        
        )
            
        #proteins sequence
        self.proteins = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=embed_dim, kernel_size=3,padding=dilaSize ,dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim  , out_channels=embed_dim, kernel_size=3,padding=dilaSize *2 ,dilation=dilaSize *2),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= embed_dim , out_channels= embed_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            #nn.ReLU(), 7/4/2024 return 0 - 1
            nn.Sigmoid(),   
            nn.AdaptiveMaxPool1d(1),
          )
        

        #self.smiles_descriptors =  nn.Sequential(
        #     nn.Linear(6 , 6),
        #     nn.ReLU()
        # )
            
        #self.predict = nn.Sequential(
        #                            #nn.Linear(2 * embed_dim + 6, 1024), #6 smiles descriptors                                    #nn.Linear(2 * embed_dim + 6, 1024), #6 smiles descriptors
        #                            nn.Linear(2 * embed_dim , 1024), #6 smiles descriptors
        #                            nn.ReLU(),
        #                            nn.Dropout(dropout),
        #                            nn.Linear(1024, 512),
        #                            nn.ReLU(),
         #                           nn.Dropout(dropout),
         #                           nn.Linear(512, n_output)
         #                           )
      
        #for amplitube
        #weight_shapes = {"weights": ( n_layers, n_qubits, 3)}

        # for angle
        weight_shapes = {"weights": (n_angle_encoding, n_layers, n_qubits, 3)}

       
        #qnode = qml.QNode(circuit_amplitude_qml, dev, interface='torch', diff_method='backprop')
        #qnode = qml.QNode(circuit_angle_qml, dev, interface='torch', diff_method='backprop')
        qnode = qml.QNode(circuit_dense_angle, dev_ideal, interface='torch', diff_method='backprop')
        
        self.predict_q = qml.qnn.TorchLayer(qnode, weight_shapes) 
        
  
    def forward(self, smi_in, seq_in,  smi_desc):#65,26
    
        embedded_smi  = self.embed_smile(smi_in) 
        embedded_seq  = self.embed_prot(seq_in)
                
        smi = self.smiles(embedded_smi.transpose(1,2))
        seq = self.proteins(embedded_seq.transpose(1,2))
        
        smi = smi.squeeze()
        seq = seq.squeeze()

        # concat
        #smi_seq = torch.cat((smi, seq, smi_desc),1)
        #print('smi.shape', smi.shape)
        #print('smi', smi )
        smi_seq  =torch.cat((smi, seq) , 1)  
        
        out = self.predict_q(smi_seq)            
        
        return out 

In [None]:
import metrics

def test(model: nn.Module, test_loader, loss_function, device, show, _p, record):
    path = '../run/'
    #path = "C:/Experiments/HQ-DTA/run/"
    model.eval()
    test_loss = 0
    outputs = []
    targets = []
    with torch.no_grad():
        for idx, (*x, y) in tqdm(enumerate(test_loader), disable=not show, total=len(test_loader)):
            for i in range(len(x)):
                x[i] = x[i].to(device)
            y = y.to(device)

            y_hat = model(*x)

            test_loss += loss_function(y_hat.view(-1), y.view(-1)).item()
            outputs.append(y_hat.cpu().numpy().reshape(-1))
            targets.append(y.cpu().numpy().reshape(-1))

    targets = np.concatenate(targets).reshape(-1)
    outputs = np.concatenate(outputs).reshape(-1)

    np.savetxt(path + _p + record + 'targets.csv', targets, fmt='%1.2f' )#, fmt ='%d'
    np.savetxt(path + _p + record + 'outputs.csv', outputs, fmt='%1.2f')

    test_loss /= len(test_loader.dataset)

    evaluation = {
        'loss': test_loss, #einai idio me to MSE
        'MSE':  metrics.MSE(targets, outputs),
        'RMSE': metrics.RMSE(targets, outputs),
        'R2':   metrics.R2(targets, outputs),
        #'R2 adjusted':   R2_adjusted(targets, outputs),  # y_train, X_train
        'MAE': metrics.MAE(targets, outputs),
        'Person': metrics.PERSON(targets, outputs),
        'p_value': metrics.P_VALUE(targets, outputs),
        'C_INDEX': metrics.C_INDEX(targets, outputs),
        'SD': metrics.SD(targets, outputs),
    }

    return evaluation

In [None]:
The training code was based on the code of Kaili Wang, 2021 https://github.com/KailiWang1/DeepDTAF
import sys
import time
from datetime import datetime
from pathlib import Path
#import numpy as np
import pennylane.numpy as np
import torch
from torch import _pin_memory, nn, optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm.auto import tqdm


from dataset import MyDataset_PDBBind2020, get_scalers_PDBBind2020, MyDataset_pdbbind2016, MyDataset_davis_kiba, get_scalers_davis_kiba


print(sys.argv)

SHOW_PROCESS_BAR = True


seed = np.random.randint(33927, 33928) ##random 
path = Path(f'../export/{datetime.now().strftime("%m%d%H%M")}_{seed}') 

output_name= datetime.now().strftime("%m%d%H%M");

cuda_name = "cuda:0"
device = torch.device(cuda_name if torch.cuda.is_available() else "cpu")

max_seq_len = 1000  
max_smi_len = 160

batch_size = 256
n_epoch = 30  
interrupt = None
save_best_epoch = 5  # init 5  when `save_best_epoch` is reached and the loss starts to decrease, save best model parameters
scale_target = True # For Hybrid montel the target must be scaled 
scale_inputs = False  
init_weights = False


# GPU uses cudnn as backend to ensure repeatable by setting the following (in turn, use advances function to speed up training)
torch.backends.cudnn.deterministic = False 
torch.backends.cudnn.benchmark =  True

torch.manual_seed(seed)
np.random.seed(seed)

print('path: ', path)

writer = SummaryWriter(path)
f_param = open(path / 'parameters.txt', 'w')

print(f'device = {device}')
print(f'seed = {seed}')
print(f'write to {path}')
print(f'batch_size={batch_size}')
print(f'epoch = {n_epoch}')
print(f'Scale target = {scale_target}')
print(f'Scale inputs = {scale_inputs}')
print(f'init_weights = {init_weights}')



f_param.write(f'device = {device}\n'
          f'seed = {seed}\n'
          f'write to {path}\n'
          f'batch_size= {batch_size}\n'
          f'epoch = {n_epoch}\n'
          f'Scale target= {scale_target}\n'  
          f'Scale inputs= {scale_inputs}\n'
          f'init_weights = {init_weights}') 



#******************* NOT USED**************
def initialize_weights(m):
    if isinstance(m, nn.Conv1d):
        nn.init.xavier_uniform_(m.weight.data)
        #nn.init.uniform_(m.weight.data, a=0.0, b=0.01)
        nn.init.zeros_(m.bias.data)
    elif isinstance(m, nn.Linear):
        #nn.init.uniform_(m.weight.data, a=0.0, b=0.01)
        nn.init.xavier_uniform_(m.weight.data)
        nn.init.zeros_(m.bias.data)
#********************************************      

assert 0<=save_best_epoch<n_epoch, 'Save_best_epoch parameter must be greater than n_epoch '


modeling = [C_Net, HQ_Net][1]

data_name = ['pdbbind2020', 'pdbbind2016', 'davis','kiba'][1]



f_param.write(f'data base={data_name}\n')

data_path = '../data/' + data_name
print(f'data path to {data_path}')

f_param.write(f'data_path={data_path}\n')

model = modeling().to(device)
#aply init weights
if init_weights:
    model.apply(initialize_weights)

print(model)

f_param.write('model: \n')
f_param.write(str(model)+'\n')
#f_param.close()


scalers=list()  
if data_name == 'pdbbind2020':
    scalers = get_scalers_PDBBind2020(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test']
    max_smi_len = 160

    data_loaders = {phase_name:
                    DataLoader(MyDataset_PDBBind2020(data_path, phase_name, data_name, max_seq_len, max_smi_len, scale_target,
                                           scale_inputs, scalers),
                                         batch_size=batch_size,
                                         pin_memory=True,
                                         num_workers=4,
                                         shuffle= True)
                   for phase_name in phase_name_array}
    
elif data_name in ['davis', 'kibas']:
    scalers = get_scalers_davis_kiba(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test']
    if data_name == 'davis':
        max_smi_len = 85
    else:
        max_smi_len = 100

    data_loaders = {phase_name:
            DataLoader(MyDataset_davis_kiba(data_path, phase_name, data_name, max_seq_len, max_smi_len, scale_target,
                                    scale_inputs, scalers),
                                    batch_size=batch_size,
                                    pin_memory=True,
                                    num_workers=4,
                                    shuffle= True)
                   for phase_name in phase_name_array}
elif data_name == 'pdbbind2016': 
    print('Scaller in dataset') #scalers = get_scalers(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test', 'test105', 'test71']
    max_smi_len = 160

    data_loaders = {phase_name:
                    DataLoader(MyDataset_pdbbind2016(data_path, phase_name, data_name, max_seq_len, max_smi_len, scale_target,
                                           scale_inputs, scalers),
                                         batch_size=batch_size,
                                         pin_memory=True,
                                         num_workers=4,
                                         shuffle= True)
                   for phase_name in phase_name_array}
else:
    print('No dataset')
    
 
print(f'max_seq_len={max_seq_len}\n'
      f'max_smi_len={max_smi_len}')

f_param.write(f'max_seq_len={max_seq_len}\n'
      f'max_smi_len={max_smi_len}\n')


                

optimizer = optim.AdamW(model.parameters()  )
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.0001,
                                          epochs=n_epoch,
                                          steps_per_epoch=len(data_loaders['training']))

loss_function = nn.MSELoss(reduction='sum')

    
start = datetime.now()
print('start at ', start)


best_epoch = -1
best_val_loss = 100000000
for epoch in range(1, n_epoch + 1):
    tbar = tqdm(enumerate(data_loaders['training']), disable= not SHOW_PROCESS_BAR, total=len(data_loaders['training']))
    
    #print('after tqdm, how much time takes ', datetime.now())
    
    for idx, (*x, y) in tbar:
        model.train()

        for i in range(len(x)):
            x[i] = x[i].to(device)

        y = y.to(device)

        optimizer.zero_grad()

        output = model(*x)
        
        loss = loss_function(output.view(-1), y.view(-1))  

        loss.backward() 
            
        optimizer.step()
        scheduler.step() 

        tbar.set_description(f' * Train Epoch {epoch} Loss={loss.item() / len(y):.3f}')

 
    for _p in ['training', 'validation']:
    #for _p in ['test']:
        performance = test(model, data_loaders[_p], loss_function, device, not SHOW_PROCESS_BAR, _p, record = '_train' + str(epoch) +'_' + output_name) 

        for i in performance:
            writer.add_scalar(f'{i} {_p}', performance[i], global_step=epoch)
        if _p=='validation' and epoch>=save_best_epoch and performance['loss']<best_val_loss:
            best_val_loss = performance['loss']
            best_epoch = epoch
            torch.save(model.state_dict(), 'h_best_model.pt')

print('best epoch:', best_epoch)
f_param.write(f'best epoch={best_epoch}\n')

print('Testing...')

model.load_state_dict(torch.load('h_best_model.pt'))
with open(path / 'result.txt', 'w') as f:

    for _p in phase_name_array:
    #for _p in ['test']:
        performance = test(model, data_loaders[_p], loss_function, device,  not SHOW_PROCESS_BAR, _p, record='_test_'+ output_name)
        f.write(f'{_p}:\n')
        print(f'{_p}:')
        for k, v in performance.items():
            f.write(f' {k}: {v}')
            print(f' {k}: {v}')
        f.write('\n')
        print()

writer.close()
print('training finished')

end = datetime.now()
print('end at:', end)
print('time used:', str(end - start))

f_param.write(f'time used={str(end - start)}\n')
f_param.close()
