In [None]:
import torch
import torch.nn as nn
from torch.nn import Sequential, Linear, ReLU


class C_Net(nn.Module):
    
    def __init__(self, emb_dim=128, output_dim=128, dropout=0.2,  n_output=1, dilaSize=1):
        
        
        super(C_Net, self).__init__()

        self.embed_smile = nn.Embedding(65, emb_dim)
        self.embed_prot = nn.Embedding(26, emb_dim)
        
        #smiles 
        self.smiles = nn.Sequential(
            nn.Conv1d(in_channels= emb_dim, out_channels= emb_dim, kernel_size=3,padding=dilaSize, dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 2 ,dilation=dilaSize * 2),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            nn.ReLU(),
           nn.AdaptiveMaxPool1d(1)                           
        )
            
        #proteins sequence
        self.proteins = nn.Sequential(
            nn.Conv1d(in_channels=emb_dim, out_channels=emb_dim, kernel_size=3,padding=dilaSize ,dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim  , out_channels=emb_dim, kernel_size=3,padding=dilaSize *2 ,dilation=dilaSize *2),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1)
          )
        
        
        self.layer2 = 16;
        self.layer3 =  8;       
        

        self.predict = nn.Sequential(
                                    nn.Linear(2 * output_dim, self.layer2 ), 
                                    nn.ReLU(),
                                    nn.Dropout(dropout),
                                    nn.Linear(self.layer2 , self.layer3),
                                    nn.ReLU(),
                                    nn.Dropout(dropout),
                                    nn.Linear(self.layer3 , n_output)#,
                                    )
  
    def forward(self, smi_in, seq_in,  smi_desc):       
        embedded_smi  = self.embed_smile(smi_in) 
        embedded_seq  = self.embed_prot(seq_in)
        
        smi = self.smiles(embedded_smi.transpose(1,2))
        seq = self.proteins(embedded_seq.transpose(1,2))
        
        smi = smi.squeeze()
        seq = seq.squeeze()
        smi_seq = torch.cat((smi, seq),1)          
        
        out = self.predict(smi_seq)
        
        return out 

In [None]:
import torch
import torch.nn as nn
from torch.nn import Sequential, Linear, ReLU
import math
import pennylane as qml
from functools import partial
import pennylane.numpy as np



qubits  = 8
layers  = 3 
features = 256
#angle_encoding = math.ceil(((features)/qubits)/2) #dense angle
angle_encoding = math.ceil(((features)/qubits)) #angle

dev = qml.device("default.qubit", wires=qubits )

@qml.qnode(dev)
def circuit_amplitude_qml(inputs, weights):

    qml.AmplitudeEmbedding(inputs, wires=range(qubits), normalize=True)
    qml.StronglyEntanglingLayers(weights, wires = range (qubits), ranges=[1,1,1])   
    for i in range(qubits-1):
         qml.CNOT(wires=[ (i + 1) % qubits, 0])

    return qml.expval(qml.PauliZ(0))


@qml.qnode(dev)
def circuit_angle_qml(inputs, weights):
    for i in range(angle_encoding):
        qml.AngleEmbedding(math.pi * inputs[:,i*qubits:i*qubits+qubits], wires=range(qubits))    #rotation='X'
        qml.StronglyEntanglingLayers(weights[i], wires = range (qubits), ranges=[1,1,1])   

    for i in range(qubits-1):
         qml.CNOT(wires=[ (i + 1) % qubits, 0])

    return qml.expval(qml.PauliZ(0))


@qml.qnode(dev)
def circuit_dense_angle(inputs, weights):                 #inputs  256 (batch size) x 256  output previous layer
    
    for j in range(angle_encoding):               
        for i in range(qubits):                               #eencoding
            qml.RX(math.pi *  inputs[:, i*2   + qubits *j] ,  wires=i)      #dense endocoding kai oxi aplo
            qml.RY(math.pi *  inputs[:, i*2+1 + qubits *j],   wires=i)

        qml.StronglyEntanglingLayers(weights[j], wires = range ( qubits ), ranges=[1,1,1])   

    for i in range(qubits-1):
         qml.CNOT(wires=[ (i + 1) % qubits, 0])
            
    return qml.expval(qml.PauliZ(0))


class HQ_Net(nn.Module):
    
    def __init__(self, emb_dim=128,  dropout=0.2,  n_output=1, dilaSize=1, normalize=True):
          
        super(HQ_Net, self).__init__()

        self.embed_smile = nn.Embedding(65, emb_dim)
        self.embed_prot = nn.Embedding(26, emb_dim)
        
        #smiles 
        self.smiles = nn.Sequential(
            nn.Conv1d(in_channels= emb_dim, out_channels= emb_dim, kernel_size=3,padding=dilaSize, dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 2 ,dilation=dilaSize * 2),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            #nn.ReLU(), 7/4/2024 return 0 - 1
            nn.Sigmoid(), 
            nn.AdaptiveMaxPool1d(1)                        
        )
            
        #proteins sequence
        self.proteins = nn.Sequential(
            nn.Conv1d(in_channels=emb_dim, out_channels=emb_dim, kernel_size=3,padding=dilaSize ,dilation=dilaSize),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim  , out_channels=emb_dim, kernel_size=3,padding=dilaSize *2 ,dilation=dilaSize *2),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 4 ,dilation=dilaSize * 4),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 8 ,dilation=dilaSize * 8),
            nn.ReLU(),
            nn.Conv1d(in_channels= emb_dim , out_channels= emb_dim, kernel_size=3,padding=dilaSize * 16,dilation=dilaSize * 16),
            #nn.ReLU(), 7/4/2024  7/4/2024 return 0 - 1
            nn.Sigmoid(),   
            nn.AdaptiveMaxPool1d(1),
          )
        
        #for amplitube
        #weight_shapes = {"weights": ( layers, qubits, 3)}

        # for angle and dense angle
        weight_shapes = {"weights": (angle_encoding, layers, qubits, 3)} # 3 is for rot

        qnode = qml.QNode(circuit_angle_qml, dev, interface='torch', diff_method='backprop') # circuit_amplitude_qml or  circuit_dense_angle
        
        self.predict_q = qml.qnn.TorchLayer(qnode, weight_shapes) 
        
        #nn.init.uniform_(self.predict_q.weight.data, a=0.0, b=0.01, generator=None) worse perfomance
  
    def forward(self, smi_in, seq_in,  smi_desc):#65,26

        embedded_smi  = self.embed_smile(smi_in) 
        embedded_seq  = self.embed_prot(seq_in)   
        
        smi = self.smiles(embedded_smi.transpose(1,2))
        seq = self.proteins(embedded_seq.transpose(1,2))

        smi = smi.squeeze()
        seq = seq.squeeze()

        smi_seq =torch.cat((smi, seq) , 1)  
        out = self.predict_q(smi_seq)            #quantum
        
        return out 

In [5]:
import metrics

def test(model: nn.Module, test_loader, loss_function, device, show, _p, record):
    path = '../run/'
    model.eval()
    test_loss = 0
    outputs = []
    targets = []
    with torch.no_grad():
        for idx, (*x, y) in tqdm(enumerate(test_loader), disable=not show, total=len(test_loader)):
            for i in range(len(x)):
                x[i] = x[i].to(device)
            y = y.to(device)

            y_hat = model(*x)

            test_loss += loss_function(y_hat.view(-1), y.view(-1)).item()
            outputs.append(y_hat.cpu().numpy().reshape(-1))
            targets.append(y.cpu().numpy().reshape(-1))

    targets = np.concatenate(targets).reshape(-1)
    outputs = np.concatenate(outputs).reshape(-1)

    np.savetxt(path + _p + record + 'targets.csv', targets, fmt='%1.2f' )#, fmt ='%d'
    np.savetxt(path + _p + record + 'outputs.csv', outputs, fmt='%1.2f')

    test_loss /= len(test_loader.dataset)

    evaluation = {
        'loss': test_loss, #einai idio me to MSE
        'MSE':  metrics.MSE(targets, outputs),
        'RMSE': metrics.RMSE(targets, outputs),
        'R2':   metrics.R2(targets, outputs),
        #'R2 adjusted':   R2_adjusted(targets, outputs),  # y_train, X_train
        'MAE': metrics.MAE(targets, outputs),
        'Person': metrics.PERSON(targets, outputs),
        'p_value': metrics.P_VALUE(targets, outputs),
        'C_INDEX': metrics.C_INDEX(targets, outputs),
        'SD': metrics.SD(targets, outputs),
    }

    return evaluation

In [None]:
# ************************* WORKING **************************************
##The training code was based on the code of Kaili Wang, 2021 https://github.com/KailiWang1/DeepDTAF#
import sys
import time
from datetime import datetime
from pathlib import Path
import pennylane.numpy as np
import torch
from torch import _pin_memory, nn, optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm.auto import tqdm


from dataset import MyDataset_PDBBind2020, get_scalers_PDBBind2020, MyDataset_pdbbind2016, MyDataset_davis_kiba, get_scalers_davis_kiba

print(sys.argv)


SHOW_PROCESS_BAR = True


seed = np.random.randint(33927, 33928) ##random 
path = Path(f'../export/{datetime.now().strftime("%m%d%H%M")}_{seed}') 

output_name= datetime.now().strftime("%m%d%H%M");

cuda_name = "cuda:0"
device = torch.device(cuda_name if torch.cuda.is_available() else "cpu")

max_prot_sequence_len = 1000  
max_smile_len = 160

batch_size = 256
epoch = 30  #10, 50 
interrupt = None
save_best_epoch = 5  # init 5  when `save_best_epoch` is reached and the loss starts to decrease, save best model parameters
scale_target = True # For Hybrid montel the target must be scaled 
scale_inputs = False  


# GPU uses cudnn as backend to ensure repeatable by setting the following (in turn, use advances function to speed up training)
torch.backends.cudnn.deterministic = False 
torch.backends.cudnn.benchmark =  True

torch.manual_seed(seed)
np.random.seed(seed)

print('path: ', path)

writer = SummaryWriter(path)
f_param = open(path / 'parameters.txt', 'w')

print(f'device = {device}')
print(f'seed = {seed}')
print(f'write to {path}')
print(f'batch_size={batch_size}')
print(f'epoch = {epoch}')
print(f'Scale target = {scale_target}')
print(f'Scale inputs = {scale_inputs}')


f_param.write(f'device = {device}\n'
          f'seed = {seed}\n'
          f'write to {path}\n'
          f'batch_size= {batch_size}\n'
          f'epoch = {epoch}\n'
          f'Scale target= {scale_target}\n'  
          f'Scale inputs= {scale_inputs}\n')


assert 0<=save_best_epoch<epoch, 'Save_best_epoch parameter must be greater than epoch '


modeling = [C_Net, HQ_Net][1]

print('modeling', modeling )


print(f'qubits = {qubits}')
print(f'layers = {layers}')
print(f'blocks = {angle_encoding}')
print(f'features = {features}')

f_param.write(f'qubits = {qubits}\n'
        f'layers = {layers}\n'
        f'blocks = {angle_encoding}\n'
        f'features = {features}\n')     
     

data_name = ['pdbbind2020', 'pdbbind2016', 'davis','kiba'][1]

f_param.write(f'data base={data_name}\n')

data_path = '../data/' + data_name
print(f'data path to {data_path}')

f_param.write(f'data_path={data_path}\n')

model = modeling().to(device)


print(model)

f_param.write('model: \n')
f_param.write(str(model)+'\n')
#f_param.close()

scalers=list()  
if data_name == 'pdbbind2020':
    scalers = get_scalers_PDBBind2020(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test']
    max_smile_len = 160

    data_loaders = {phase_name:
                    DataLoader(MyDataset_PDBBind2020(data_path, phase_name, data_name, max_prot_sequence_len, max_smile_len, scale_target,
                                           scale_inputs, scalers),
                                         batch_size=batch_size,
                                         pin_memory=True,
                                         num_workers=4,
                                         shuffle= True)
                   for phase_name in phase_name_array}
    
elif data_name in ['davis', 'kibas']:
    scalers = get_scalers_davis_kiba(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test']
    if data_name == 'davis':
        max_smile_len = 85
    else:
        max_smile_len = 100

    data_loaders = {phase_name:
            DataLoader(MyDataset_davis_kiba(data_path, phase_name, data_name, max_prot_sequence_len, max_smile_len, scale_target,
                                    scale_inputs, scalers),
                                    batch_size=batch_size,
                                    pin_memory=True,
                                    num_workers=4,
                                    shuffle= True)
                   for phase_name in phase_name_array}
elif data_name == 'pdbbind2016': 
    print('Scaller in dataset') #scalers = get_scalers(data_path, 'training', data_name)
    phase_name_array = ['training', 'validation', 'test', 'test105', 'test71']
    max_smile_len = 160

    data_loaders = {phase_name:
                    DataLoader(MyDataset_pdbbind2016(data_path, phase_name, data_name, max_prot_sequence_len, max_smile_len, scale_target,
                                           scale_inputs, scalers),
                                         batch_size=batch_size,
                                         pin_memory=True,
                                         num_workers=4,
                                         shuffle= True)
                   for phase_name in phase_name_array}
else:
    print('No dataset')
    
 
print(f'max_prot_sequence_len={max_prot_sequence_len}\n'
      f'max_smile_len={max_smile_len}')

f_param.write(f'max_prot_sequence_len={max_prot_sequence_len}\n'
      f'max_smile_len={max_smile_len}\n')

optimizer = optim.AdamW(model.parameters()  )
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.0001,
                                          epochs=epoch,
                                          steps_per_epoch=len(data_loaders['training']))

loss_function = nn.MSELoss(reduction='sum')
    
start = datetime.now()
print('start at ', start)

best_epoch = -1
best_val_loss = 100000000
for epoch in range(1, epoch + 1):
    tbar = tqdm(enumerate(data_loaders['training']), disable= not SHOW_PROCESS_BAR, total=len(data_loaders['training']))
       
    for idx, (*x, y) in tbar:
        model.train()

        for i in range(len(x)):
            x[i] = x[i].to(device)

        y = y.to(device)

        optimizer.zero_grad()

        output = model(*x)
       
        loss = loss_function(output.view(-1), y.view(-1))  
        
        loss.backward() 
            
        optimizer.step()
        scheduler.step() 

        tbar.set_description(f' * Train Epoch {epoch} Loss={loss.item() / len(y):.3f}')

 
    for _p in ['training', 'validation']:
        performance = test(model, data_loaders[_p], loss_function, device, not SHOW_PROCESS_BAR, _p, record = '_train' + str(epoch) +'_' + output_name) 

        for i in performance:
            writer.add_scalar(f'{i} {_p}', performance[i], global_step=epoch)
        if _p=='validation' and epoch>=save_best_epoch and performance['loss']<best_val_loss:
            best_val_loss = performance['loss']
            best_epoch = epoch
            torch.save(model.state_dict(), 'n_best_model.pt')

print('best epoch:', best_epoch)
f_param.write(f'best epoch={best_epoch}\n')

print('Testing...')

model.load_state_dict(torch.load('n_best_model.pt'))
with open(path / 'result.txt', 'w') as f:

    for _p in phase_name_array:
        performance = test(model, data_loaders[_p], loss_function, device,  not SHOW_PROCESS_BAR, _p, record='_test_'+ output_name)
        f.write(f'{_p}:\n')
        print(f'{_p}:')
        for k, v in performance.items():
            f.write(f' {k}: {v}')
            print(f' {k}: {v}')
        f.write('\n')
        print()

writer.close()
print('training finished')

end = datetime.now()
print('end at:', end)
print('time used:', str(end - start))

f_param.write(f'time used={str(end - start)}\n')
f_param.close()


In [None]:
import matplotlib.pyplot as plt
import pennylane.numpy as np
import math


qubits = 4
layers = 5
angle_rotations=3
features = 8
angle_encoding = math.ceil(((features)/8/2))

weights_init = 0.01 * np.random.randn(angle_encoding, layers, qubits, angle_rotations, requires_grad=True)
inputs =  torch.randn(256, 256)


for i in range(angle_encoding):
       print(i*qubits, ' - ' , i*qubits+qubits)
       tmp = inputs[:,i*qubits:i*qubits+qubits]


fig, ax = qml.draw_mpl(circuit_dense_angle)(
       inputs,weights_init)
plt.show()

print(qml.draw(circuit_dense_angle, expansion_strategy="device")(inputs,weights_init))
