In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from lifelines.utils import concordance_index
from sklearn.metrics import precision_recall_curve, auc, r2_score
from rdkit import Chem
from rdkit.Chem import AllChem, Descriptors



In [2]:
file_path = "davis.txt"
columns = ["drug_id", "protein_id", "SMILES_sequence", "amino_acid_sequence", "affinity_score"]
data = pd.read_csv(file_path, sep = ' ', names=columns)

grouping = data.groupby("protein_id")

train_data = pd.DataFrame(columns = data.columns)
test_data = pd.DataFrame(columns = data.columns)

for protein_id, group in grouping:
    train_group, test_group = train_test_split(group, test_size = 0.7, random_state = 36)
    train_data = pd.concat([train_data, train_group])
    test_data = pd.concat([test_data, test_group])

train_data.to_csv("Train_Data.csv", index = False )
test_data.to_csv("Test_Data.csv", index = False)

unique_proteins = data['protein_id'].unique()
train_proteins, test_proteins = train_test_split(unique_proteins, test_size=0.1, random_state=36)

train_data_new_proteins = data[data['protein_id'].isin(train_proteins)]
test_data_new_proteins = data[data['protein_id'].isin(test_proteins)]

train_data_new_proteins.to_csv("Train_Data_new_proteins.csv", index=False)
test_data_new_proteins.to_csv("Test_Data_new_proteins.csv", index=False)

# No new drugs in test
train_data_no_new_drugs, test_data_no_new_drugs = train_test_split(data, test_size=0.2, random_state=36)
train_drugs = train_data_no_new_drugs['drug_id'].unique()
test_data_no_new_drugs = test_data_no_new_drugs[test_data_no_new_drugs['drug_id'].isin(train_drugs)]

train_data_no_new_drugs.to_csv("Train_Data_no_new_drugs.csv", index=False)
test_data_no_new_drugs.to_csv("Test_Data_no_new_drugs.csv", index=False)

# New drugs in test
unique_drugs = data['drug_id'].unique()
train_drugs, test_drugs = train_test_split(unique_drugs, test_size=0.1, random_state=36)

train_data_new_drugs = data[data['drug_id'].isin(train_drugs)]
test_data_new_drugs = data[data['drug_id'].isin(test_drugs)]

train_data_new_drugs.to_csv("Train_Data_new_drugs.csv", index=False)
test_data_new_drugs.to_csv("Test_Data_new_drugs.csv", index=False)

train_data = pd.read_csv("Train_Data_new_proteins.csv")
test_data = pd.read_csv("Test_Data_new_proteins.csv")


  train_data = pd.concat([train_data, train_group])
  test_data = pd.concat([test_data, test_group])


In [3]:
amino_acid_dict = {
    'A': 1, 'R': 2, 'N': 3, 'D': 4, 'C': 5,
    'E': 6, 'Q': 7, 'G': 8, 'H': 9, 'I': 10,
    'L': 11, 'K': 12, 'M': 13, 'F': 14, 'P': 15,
    'S': 16, 'T': 17, 'W': 18, 'Y': 19, 'V': 20
}

def encode_aa_sequence(sequence, aa_dict, max_len = 1000):
   encoded_aa_sequence = [aa_dict[aa] for aa in sequence if aa in aa_dict]
   encoded_aa_sequence = encoded_aa_sequence[:max_len]
   padding = [0] * (max_len - len(encoded_aa_sequence))
   return encoded_aa_sequence + padding

train_data['encoded_aa_sequence'] = train_data['amino_acid_sequence'].apply(lambda x: encode_aa_sequence(x, amino_acid_dict))
test_data['encoded_aa_sequence'] = test_data['amino_acid_sequence'].apply(lambda x: encode_aa_sequence(x, amino_acid_dict))

all_smiles_chars = set("".join(data['SMILES_sequence']))
smiles_dict = {char: idx + 1 for idx, char in enumerate(all_smiles_chars)}  # Reserve 0 for padding

print("SMILES Dictionary:", smiles_dict)

all_smiles_chars = set("".join(data['SMILES_sequence']))
smiles_dict = {char: idx for idx, char in enumerate(sorted(all_smiles_chars))}

def encode_smiles_onehot(smiles, smiles_dict, max_len=150):
    encoding = [[0] * len(smiles_dict) for _ in range(max_len)]
    for i, char in enumerate(smiles[:max_len]):
        if char in smiles_dict:
            encoding[i][smiles_dict[char]] = 1
    return encoding


train_data['encoded_smiles_onehot'] = train_data['SMILES_sequence'].apply(lambda x: encode_smiles_onehot(x, smiles_dict))
test_data['encoded_smiles_onehot'] = test_data['SMILES_sequence'].apply(lambda x: encode_smiles_onehot(x, smiles_dict))


SMILES Dictionary: {'4': 1, 'S': 2, 'F': 3, 'C': 4, 'N': 5, 'O': 6, '.': 7, '1': 8, '7': 9, ')': 10, '5': 11, 'I': 12, '2': 13, 'r': 14, '=': 15, 'B': 16, 'l': 17, '8': 18, 'P': 19, '9': 20, '(': 21, '6': 22, '3': 23, '#': 24}


In [4]:
class AffinityDataset (Dataset):
  def __init__ (self, aa_en_seq, smiles_en_seq, scores):
    self.aa_en_seq = aa_en_seq
    self.smiles_en_seq = smiles_en_seq
    self.scores = scores

  def __len__ (self):
    return len(self.aa_en_seq)

  def __getitem__ (self, idx):
    aa_sequence = torch.tensor(self.aa_en_seq[idx], dtype = torch.float)
    smiles_sequence = torch.tensor(self.smiles_en_seq[idx], dtype = torch.float)
    score = torch.tensor(self.scores[idx], dtype = torch.float)

    return (aa_sequence, smiles_sequence), score

train_dataset = AffinityDataset(train_data['encoded_aa_sequence'].tolist(),
                                  train_data['encoded_smiles_onehot'].tolist(),
                                  train_data['affinity_score'].tolist())
test_dataset = AffinityDataset(test_data['encoded_aa_sequence'].tolist(),
                                 test_data['encoded_smiles_onehot'].tolist(),
                                 test_data['affinity_score'].tolist())

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [5]:
class CNNAffinityPredictor(nn.Module):
    def __init__(self, smiles_vocab_size, aa_vocab_size=21, smiles_seq_len=150, aa_seq_len=1000, embedding_dim=32, output_dim=1):
        super(CNNAffinityPredictor, self).__init__()

        # SMILES branch
        self.smiles_conv = nn.Sequential(
            nn.Conv1d(in_channels=smiles_vocab_size, out_channels=64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1)
        )

        # Protein branch
        self.aa_embedding = nn.Embedding(num_embeddings=aa_vocab_size, embedding_dim=embedding_dim, padding_idx=0)
        self.aa_conv = nn.Sequential(
            nn.Conv1d(in_channels=embedding_dim, out_channels=64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1)
        )

        # Regression head
        self.fc = nn.Sequential(
            nn.Linear(128 * 2, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, smiles_input, aa_input):
        # SMILES: [batch, seq_len, vocab] → [batch, vocab, seq_len]
        smiles_input = smiles_input.permute(0, 2, 1)
        smiles_feat = self.smiles_conv(smiles_input).squeeze(-1)

        # Proteins: [batch, seq_len] → embed → [batch, embed_dim, seq_len]
        aa_embed = self.aa_embedding(aa_input.long()).permute(0, 2, 1)
        aa_feat = self.aa_conv(aa_embed).squeeze(-1)

        combined = torch.cat((smiles_feat, aa_feat), dim=1)
        output = self.fc(combined)
        return output

smiles_input_dim = len(train_dataset[0][0][1][0]) * len(train_dataset[0][0][1])  # Update smiles_input_dim
aa_input_dim = len(train_dataset[0][0][0])  # Update aa_input_dim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNAffinityPredictor(smiles_vocab_size=len(smiles_dict)).to(device)


In [6]:

import torch.optim as optim

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10


In [7]:

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for (aa_seq, smiles_seq), labels in train_loader:
        optimizer.zero_grad()
        outputs = model(smiles_seq, aa_seq).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}")


Epoch 1/10, Loss: 0.8027
Epoch 2/10, Loss: 0.5758
Epoch 3/10, Loss: 0.5637
Epoch 4/10, Loss: 0.5368
Epoch 5/10, Loss: 0.5151
Epoch 6/10, Loss: 0.4934
Epoch 7/10, Loss: 0.4858
Epoch 8/10, Loss: 0.4786
Epoch 9/10, Loss: 0.4804
Epoch 10/10, Loss: 0.4793


In [8]:

from sklearn.metrics import mean_squared_error

def rm2_score(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    r2 = r2_score(y_true, y_pred)
    slope = np.sum(y_true * y_pred) / np.sum(y_pred ** 2)
    y_pred_origin = slope * y_pred
    r0_squared = r2_score(y_true, y_pred_origin)
    return r2 * (1 - np.sqrt(np.abs(r2 - r0_squared)))


In [9]:

model.eval()
preds, trues = [], []

with torch.no_grad():
    for (aa_seq, smiles_seq), labels in test_loader:
        outputs = model(smiles_seq, aa_seq).squeeze()
        preds.extend(outputs.tolist())
        trues.extend(labels.tolist())

ci = concordance_index(trues, preds)
mse = mean_squared_error(trues, preds)
r2 = r2_score(trues, preds)
r = np.corrcoef(trues, preds)[0, 1]
rm2 = rm2_score(trues, preds)

print(f"CI: {ci:.3f}, MSE: {mse:.3f}, R²: {r2:.3f}, R: {r:.3f}, Rm²: {rm2:.3f}")


CI: 0.800, MSE: 0.531, R²: 0.389, R: 0.627, Rm²: 0.364


In [10]:
def calculate_ci(y_true, y_pred):
    return concordance_index(y_true, y_pred)

def calculate_auc(y_true, y_pred):
    threshold = np.median(y_true)
    y_true_binary = (y_true >= threshold).astype(int)
    y_pred_binary = (y_pred >= threshold).astype(int)

    precision, recall, _ = precision_recall_curve(y_true_binary, y_pred_binary)
    return auc(recall, precision)

def rm2score(r2_score):
   return r2_score * (1 - (abs(r2_score - 0.5) ** 0.5))

def calculate_pearson (y_true, y_pred):
    return np.corrcoef(y_true, y_pred)[0, 1]

In [13]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

def train_on_split(train_data_path, split_name):
    print(f"Training on split: {split_name}")

    train_data = pd.read_csv(train_data_path)
    train_data['encoded_aa_sequence'] = train_data['amino_acid_sequence'].apply(lambda x: encode_aa_sequence(x, amino_acid_dict))
    train_data['encoded_smiles_onehot'] = train_data['SMILES_sequence'].apply(lambda x: encode_smiles_onehot(x, smiles_dict))
    # Load training data
    train_dataset = AffinityDataset(train_data['encoded_aa_sequence'].tolist(),
                                      train_data['encoded_smiles_onehot'].tolist(),
                                      train_data['affinity_score'].tolist())
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    smiles_input_dim = len(train_dataset[0][0][1][0]) * len(train_dataset[0][0][1])
    aa_input_dim = len(train_dataset[0][0][0])

    # Define model, criterion, optimizer
    model = CNNAffinityPredictor(
    smiles_vocab_size=len(smiles_dict),
    aa_vocab_size=21,
    smiles_seq_len=150,
    aa_seq_len=1000,
    embedding_dim=32,
    output_dim=1
).to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(10):
        model.train()
        running_loss = 0.0
        for (aa_seq, smiles_seq), labels in train_loader:
            aa_seq, smiles_seq, labels = aa_seq.to(device), smiles_seq.to(device), labels.to(device)

            # Forward pass
            outputs = model(smiles_seq, aa_seq).squeeze()
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader):.4f}")

    return model, train_loader


# Function to evaluate the model
def evaluate_on_split(model, test_data_path, split_name):
    print(f"Evaluating on split: {split_name}")
    test_data = pd.read_csv(test_data_path)
    test_data['encoded_aa_sequence'] = test_data['amino_acid_sequence'].apply(lambda x: encode_aa_sequence(x, amino_acid_dict))
    test_data['encoded_smiles_onehot'] = test_data['SMILES_sequence'].apply(lambda x: encode_smiles_onehot(x, smiles_dict))
    # Load test data
    test_dataset = AffinityDataset(
        test_data['encoded_aa_sequence'].tolist(),
        test_data['encoded_smiles_onehot'].tolist(),
        test_data['affinity_score'].tolist()
    )
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Evaluate model
    evaluate_model(model, test_loader, nn.MSELoss(), device)


def evaluate_model(model, data_loader, criterion, device):
    model.eval()
    y_true, y_pred = [], []
    total_loss = 0

    with torch.no_grad():
        for (aa_seq, smiles_seq), labels in data_loader:
            aa_seq, smiles_seq, labels = aa_seq.to(device), smiles_seq.to(device), labels.to(device)
            outputs = model(smiles_seq, aa_seq).squeeze()
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(outputs.cpu().numpy())

    mse = total_loss / len(data_loader)
    ci = calculate_ci(np.array(y_true), np.array(y_pred))
    r2 = r2_score(y_true, y_pred)
    auc_pr = calculate_auc(np.array(y_true), np.array(y_pred))
    rm2 = rm2score(r2)
    pearson = calculate_pearson(np.array(y_true), np.array(y_pred))

    print(f"Metrics: MSE={mse:.4f}, CI={ci:.4f}, R2={r2:.4f}, AUC-PR={auc_pr:.4f}, RM²={rm2:.4f}, Pearson={pearson:.4f}")

# Train and evaluate for all splits
splits = {
    "No New Proteins in Test": ("Train_Data.csv", "Test_Data.csv"),
    "New Proteins in Test": ("Train_Data_new_proteins.csv", "Test_Data_new_proteins.csv"),
    "No New Drugs in Test": ("Train_Data_no_new_drugs.csv", "Test_Data_no_new_drugs.csv"),
    "New Drugs in Test": ("Train_Data_new_drugs.csv", "Test_Data_new_drugs.csv"),
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Main loop for all splits
for split_name, (train_path, test_path) in splits.items():
    trained_model, train_loader = train_on_split(train_path, split_name)
    evaluate_on_split(trained_model, test_path, split_name)


Training on split: No New Proteins in Test
Epoch 1, Loss: 1.3436
Epoch 2, Loss: 0.6529
Epoch 3, Loss: 0.6478
Epoch 4, Loss: 0.6464
Epoch 5, Loss: 0.6438
Epoch 6, Loss: 0.6332
Epoch 7, Loss: 0.6403
Epoch 8, Loss: 0.6392
Epoch 9, Loss: 0.6258
Epoch 10, Loss: 0.6482
Evaluating on split: No New Proteins in Test
Metrics: MSE=0.6586, CI=0.6906, R2=0.1022, AUC-PR=1.0000, RM²=0.0377, Pearson=0.3469
Training on split: New Proteins in Test
Epoch 1, Loss: 0.7809
Epoch 2, Loss: 0.5735
Epoch 3, Loss: 0.5682
Epoch 4, Loss: 0.5303
Epoch 5, Loss: 0.5060
Epoch 6, Loss: 0.4914
Epoch 7, Loss: 0.4801
Epoch 8, Loss: 0.4763
Epoch 9, Loss: 0.4616
Epoch 10, Loss: 0.4561
Evaluating on split: New Proteins in Test
Metrics: MSE=0.5251, CI=0.8015, R2=0.3937, AUC-PR=1.0000, RM²=0.2654, Pearson=0.6339
Training on split: No New Drugs in Test
Epoch 1, Loss: 0.8352
Epoch 2, Loss: 0.5741
Epoch 3, Loss: 0.5705
Epoch 4, Loss: 0.5740
Epoch 5, Loss: 0.5611
Epoch 6, Loss: 0.5501
Epoch 7, Loss: 0.5253
Epoch 8, Loss: 0.4966
Ep