## Pre-processing 

### Pre-processing 0: Set up

In [23]:
#Packages
import os  #directory related
import cv2 #work with images
import json #work with JSON strings
import torch #ML
import random 
import warnings
import numpy as np #work with arrays
import pandas as pd #work with csv files
from operator import itemgetter #Return a callable object that fetches item from its operand 
from torch.utils.data import Dataset #use pre-loaded datasets as well as your own data when implementing the model
from torch.utils.data.sampler import SubsetRandomSampler, BatchSampler, SequentialSampler #determine how batches should be formed.
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import rdkit #to work with ChEMBL data
from rdkit import Chem 
from rdkit.Chem import Draw

In [4]:
#Directory: change to your directory if you want to replicate
os.chdir('/content/drive/MyDrive/GANHID')
project_file_path = '/content/drive/MyDrive/GANHID'
training_files_path = "{}/training_files".format(project_file_path)
result_files_path = "{}/result_files".format(project_file_path)
trained_models_path = "{}/trained_models".format(project_file_path)

### Pre-procesing 1: From Smiles to Images [D]


In [5]:
def smiles_to_images(smiles_list, output_path, img_size = (400, 400), bond_width=1.5, atom_label_font_size=55):
    '''
    standerdize the drawing of atoms, bonds, and labelling based on the standerdized SMILES representations 

    Parameters
    ----------
    smiles_list: list of SMILES strings
    output_path: defined earlier
    image_size: standardized 
    bond_width: standardized
    atom_label_font_size: standerdized

    '''

    Draw.SetDefaultStyle(atom_label_font_size=atom_label_font_size, bondLineWidth=bond_width) # set the default atom and bond properties
    # Iterate over the list of SMILES strings
    for i, smiles in enumerate(smiles_list):
        # Create the molecule object from the SMILES string
        mol = Chem.MolFromSmiles(smiles)
        # Draw the molecule as an image
        img = Draw.MolToImage(mol)
        # Resize the image
        img = img.resize(img_size)
        # Save the image to a file
        img.save(f"{output_path}/compound_{i}.png")



### Pre-processing 2: More Standerdizing and Clearning


In [6]:
def create_preprocessed_bioact_file(chembl_filtered_bioact_fl, chembl_version): 

    '''
    reads the raw bioactivity data, standardizes the units, and drops duplicate values 

    Parameters
    ----------
    chembl_filtered_bioact_fl: a string contains the tsv file name of the raw bioactivity data obtained from ChEMBL. 
    chembl_version: a string contains the version of the ChEMBL data that is being used

    '''
    raw_dataset_df = pd.read_csv("{}/{}".format(training_files_path, chembl_filtered_bioact_fl), sep="\t", index_col=False)

    # standardize units
    raw_dataset_df.loc[raw_dataset_df['standard_units'] == 'nM', 'standard_value'] = raw_dataset_df['standard_value'] / pow(10,3)
    raw_dataset_df.loc[raw_dataset_df['standard_units'] == 'M', 'standard_value'] = raw_dataset_df['standard_value'] / pow(10,6)
    raw_dataset_df['standard_units'] = 'uM'

    #group dataframe by target_compound_pair and standard_value, apply median and rename standard_value to median_std_val
    raw_dataset_df['target_compound_pair'] = raw_dataset_df['Target_CHEMBL_ID']+','+raw_dataset_df['Compound_CHEMBL_ID']
    df_median = raw_dataset_df.groupby(['target_compound_pair']).median().rename(columns={'standard_value': 'median_std_val'}).reset_index()

    #merge the median dataframe with original dataframe
    raw_dataset_df = pd.merge(raw_dataset_df, df_median[['target_compound_pair','median_std_val']],on='target_compound_pair')

    # drop duplicate values with median values
    raw_dataset_df = raw_dataset_df.loc[raw_dataset_df.duplicated(subset=['target_compound_pair'], keep=False)].sort

    # drop duplicate values with median values
    raw_dataset_df = raw_dataset_df.loc[raw_dataset_df.duplicated(subset=['target_compound_pair'], keep=False)].sort_values(by='median_std_val', ascending=True)
    raw_dataset_df = raw_dataset_df.drop_duplicates(subset='target_compound_pair', keep='first')
    raw_dataset_df = raw_dataset_df.drop(['standard_value'], axis=1)
    raw_dataset_df = raw_dataset_df.rename(columns={'median_std_val': 'standard_value'})
    
    # write data to file
    raw_dataset_df.to_csv("{}/{}_preprocessed_filtered_bioactivity_dataset.tsv".format(training_files_path, chembl_version), sep='\t', index=False)
    



### Preprocessing 3: Filtering ChEMBL

In [7]:
def get_uniprot_chembl_sp_id_mapping(chembl_uni_prot_mapping_fl):

    '''
    filters the dataframe to include only single protein targets 

    Parameters
    ----------
    chembl_uni_prot_mapping_fl: file location of a tab-separated file containing the mapping between UniProt IDs 
    and ChEMBL IDs for single protein targets. 

    '''
    mapping_df = pd.read_csv("{}/{}".format(training_files_path, chembl_uni_prot_mapping_fl), sep='\t', header=0)

    # Filter dataframe to include only single protein targets
    mapping_df = mapping_df[mapping_df['target_type'] == 'SINGLE PROTEIN']

    # Create dictionary where keys are uniprot ids and values are lists of chembl ids
    mapping_dict = mapping_df.groupby('uniprot_id')['chembl_id'].apply(list).to_dict()
    return mapping_dict


### Preprocessing 4: Building the Training set 

In [8]:
def create_act_inact_files_for_all_targets(fl, chembl_version, act_threshold, inact_threshold):

    '''
    reads a file containing compound activity data for multiple targets and filters the data 
    based on activity threshold and creates a pivot table separating the active and inactive compounds

    Parameters
    ----------
    fl: the name of the input file containing the compound activity data
    chembl_version: the version of the ChEMBL database used to obtain the data
    act_threshold: a threshold value for determining active compounds
    inact_threshold: a threshold value for determining inactive compounds
    training_files_path: the path of the directory where the output file will be saved

    '''
    
    pre_filt_chembl_df = pd.read_csv("{}/{}".format(training_files_path, fl), sep="\t" ,index_col=False) 
    
    # Separate the dataframe into two based on the threshold values
    act_rows_df = pre_filt_chembl_df[pre_filt_chembl_df["standard_value"] <= act_threshold]
    inact_rows_df = pre_filt_chembl_df[pre_filt_chembl_df["standard_value"] > inact_threshold]
    
    # Use a groupby statement and the aggregate function to group the dataframe by the target and compound IDs
    # and then store the resulting active and inactive compounds in separate columns
    grouped_df = act_rows_df.append(inact_rows_df).groupby(['Target_CHEMBL_ID', 'Compound_CHEMBL_ID']).agg({'standard_value':'first'}).reset_index()
    
    # Create a pivot table that separates the active and inactive compounds and stores them in separate columns
    pivot_df = grouped_df.pivot_table(index='Target_CHEMBL_ID', columns='standard_value', values='Compound_CHEMBL_ID', aggfunc='first')
    pivot_df.columns = [col if col<=act_threshold else 'inactive' for col in pivot_df.columns]
    pivot_df.rename(columns={col: 'active' for col in pivot_df.columns if col <= act_threshold}, inplace=True)
    
    # Write the pivot table to a file
    pivot_df.to_csv("{}/{}_preprocessed_filtered_act_inact_comps_{}_{}.tsv".format(training_files_path, chembl_version, act_threshold, inact_threshold), sep='\t')


### Preprocessing 5: Balancing the training sets (undersampling)


In [10]:
def under_sample_act_inact_files(active_data, inactive_data):
    '''
    balance the number of samples between the two classes by under-sampling the inactive data


    Parameters
    ----------
    active_data: list of active samples
    inactive_data: list of inactive samples

    Returns
    ----------
    balanced_data: a list of balanced active and inactive data.
    '''
    
    # Determine the number of samples to keep from the inactive class
    n_samples_to_keep = len(active_data)
    
    # Randomly select a subset of the inactive class samples
    inactive_data = random.sample(inactive_data, n_samples_to_keep)
    
    # Combine the active and under-sampled inactive data
    balanced_data = active_data + inactive_data
    
    return balanced_data

# Use the function
#act_data = create_act_inact_files_for_all_targets(all_data, act_threshold)
#inact_data = create_act_inact_files_for_all_targets(all_data, inact_threshold)
#balanced_data = under_sample_act_inact_files(act_data, inact_data)


### Preprocessing 6: Splitting the data

In [14]:
def create_final_randomized_training_val_test_sets(neg_act_inact_fl, smiles_inchi_fl, training_files_path):
    '''
    create final randomized training, validation, and test sets for a given target

    Parameters
    ----------

    neg_act_inact_fl: a file containing a list of active and inactive compounds for each target
    smiles_inchi_fl: a file containing the SMILES and InChI representations of compounds
    training_files_path: path to the directory where the training files will be stored

    Returns
    ----------

    tar_train_val_test_dict: a dictionary with keys: training, validation, test, and the values are 
    lists of lists where the inner lists contain the compound id and label

    '''

    chemblid_smiles_dict = get_chemblid_smiles_inchi_dict(smiles_inchi_fl) 
    act_inact_dict = get_act_inact_list_for_all_targets(neg_act_inact_fl)

    for tar in act_inact_dict:
        os.makedirs(os.path.join(training_files_path, "target_training_datasets", tar, "imgs"), exist_ok=True)
        act_list, inact_list = act_inact_dict[tar]
        
        # Trim the data to balance the number of actives and inactives
        if len(inact_list) >= len(act_list):
            inact_list = inact_list[:len(act_list)]
        else:
            act_list = act_list[:int(len(inact_list) * 1.5)]

        # Shuffle the data to randomize
        combined_list = act_list + inact_list
        random.shuffle(combined_list)

        # Define the split sizes 
        train_size = int(len(combined_list) * 0.6)
        val_size = int(len(combined_list) * 0.2)

        # Create the split
        training_comp_id_list = combined_list[:train_size]
        val_comp_id_list = combined_list[train_size:train_size+val_size]
        test_comp_id_list = combined_list[train_size+val_size:]
        
        #Creating the lists
        tar_train_val_test_dict = {"training": [], "validation": [], "test": []}
        for split, comp_id_list in zip(["training", "validation", "test"], [training_comp_id_list, val_comp_id_list, test_comp_id_list]):
            for comp_id in comp_id_list:
                try:
                    save_comp_imgs_from_smiles(tar, comp_id, chemblid_smiles_dict[comp_id][0])
                    tar_train_val_test_dict[split].append([comp_id, 1])
                except:
                    pass
    return tar_train_val_test_dict


## The Model

In [15]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.conv1 = nn.Conv2d(3, 32, 2)
        self.bn1 = nn.BatchNorm2d(32) 

        self.conv2 = nn.Conv2d(32, 64, 2)
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(64, 128, 2)
        self.bn3 = nn.BatchNorm2d(128)

        self.conv4 = nn.Conv2d(128, 64, 2)
        self.bn4 = nn.BatchNorm2d(64)

        self.conv5 = nn.Conv2d(64, 32, 2)
        self.bn5 = nn.BatchNorm2d(32)

        self.pool = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(32*5*5, fully_layer_1)
        self.fc2 = nn.Linear(fully_layer_1, fully_layer_2)

        # add last layer to produce output sample
        self.fc3 = nn.Linear(fully_layer_2, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = self.pool(F.relu(self.bn5(self.conv5(x))))

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.conv1 = nn.Conv2d(3, 32, 2)
        self.bn1 = nn.BatchNorm2d(32) 

        self.conv2 = nn.Conv2d(32, 64, 2)
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(64, 128, 2)
        self.bn3 = nn.BatchNorm2d(128)

        self.conv4 = nn.Conv2d(128, 64, 2)
        self.bn4 = nn.BatchNorm2d(64)

        self.conv5 = nn.Conv2d(64, 32, 2)
        self.bn5 = nn.BatchNorm2d(32)

        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(32*5*5, fully_layer_1)
        self.fc2 = nn.Linear(fully_layer_1, fully_layer_2)
        
        # add last layer to produce output probability
        self.fc3 = nn.Linear(fully_layer_2, 1)
    def forward(self, x):
      x = self.pool(F.leaky_relu(self.bn1(self.conv1(x)), negative_slope=0.2))
      x = self.pool(F.leaky_relu(self.bn2(self.conv2(x)), negative_slope=0.2))
      x = self.pool(F.leaky_relu(self.bn3(self.conv3(x)), negative_slope=0.2))
      x = self.pool(F.leaky_relu(self.bn4(self.conv4(x)), negative_slope=0.2))
      x = self.pool(F.leaky_relu(self.bn5(self.conv5(x)), negative_slope=0.2))
      x = F.leaky_relu(self.fc1(x), negative_slope=0.2)
      x = F.leaky_relu(self.fc2(x), negative_slope=0.2)
      x = self.fc3(x)
      
      return x



## Training

### Training 0: Set up

In [24]:
def save_best_model_predictions(experiment_name, epoch, validation_scores_dict, test_scores_dict, model, project_file_path, target_id, str_arguments, all_test_comp_ids, test_labels, test_predictions):
    '''
    save the best model and its predictions

    Parameters
    ----------
    experiment_name: a string representing the name of the experiment.
    epoch: the current epoch number.
    validation_scores_dict: a dictionary containing the scores of the model on the validation set.
    test_scores_dict: a dictionary containing the scores of the model on the test set.
    model: the model that needs to be saved.
    project_file_path: the path to the project files.
    target_id: the identifier of the target.
    str_arguments: a string representing the arguments used to train the model.
    all_test_comp_ids: a list of the identifiers of the compounds in the test set.
    test_labels: a list of the labels of the compounds in the test set.
    test_predictions: a list of the predictions of the model for the compounds in the test set. 

    '''
    model_path = os.path.join(trained_models_path, experiment_name)
    if not os.path.exists(model_path):
        os.makedirs(model_path)

    # Save model state dictionary
    torch.save(model.state_dict(), os.path.join(model_path, f"{target_id}_best_val-{str_arguments}-state_dict.pth"))
    
    # Build test predictions string
    str_test_predictions = []
    for comp_id, label, pred in zip(all_test_comp_ids, test_labels, test_predictions):
        str_test_predictions.append(f"{comp_id}\t{label}\t{pred}")
    str_test_predictions = "\n".join(str_test_predictions)

    best_test_performance_dict = test_scores_dict
    best_test_predictions = str_test_predictions
    
    # Return data
    return validation_scores_dict, best_test_performance_dict, best_test_predictions, str_test_predictions


### Training 1


In [25]:
def calculate_val_test_loss(model, criterion, data_loader, device):

  '''
  calculates the total loss, total count, all compound ids, all labels and all predictions for a given model, criterion and data_loader

  Parameters
  ----------
   model: A PyTorch model that will be used to predict the labels of the data.
   criterion: A PyTorch criterion that will be used to calculate the loss on the predicted labels.
   data_loader: A PyTorch data loader that will be used to iterate through the data.
   device: A string specifying the device to be used (e.g. "cpu" or "cuda").

  Returns
  -------
  total_loss: A float representing the sum of the loss values for the entire data set.
  total_count: An integer representing the total number of samples in the data set.
  all_comp_ids: A list of compound ids for the entire data set.
  all_labels: A list of labels for the entire data set.
  all_predictions: A list of predicted labels (outputs of the model) for the entire data set. 
  '''
    total_loss = 0.0
    all_comp_ids = []
    all_labels = []
    all_predictions = []

    for data in data_loader:
        img_arrs, labels, comp_ids = [x.to(device) for x in data]
        y_pred = model(img_arrs)
        loss = criterion(y_pred.squeeze(), labels)
        total_loss += float(loss.item())
        all_comp_ids.extend(comp_ids)
        _, preds = torch.max(y_pred, 1)
        all_labels.extend(labels)
        all_predictions.extend(preds)
    
    total_count = len(all_comp_ids)
    return total_loss, total_count, all_comp_ids, all_labels, all_predictions


### Training 2

In [26]:
def train_gan(generator, discriminator, data_loader, optimizer_G, optimizer_D, criterion, num_epochs):
    '''
      The function iterates through the data_loader, using each batch of data to train the discriminator and generator. 
      First, the discriminator is trained on real data, and then trained on fake data generated by the generator. 
      Then, the generator is trained to generate data that can fool the discriminator.
      At each step, the gradients of the generator and discriminator are reset, their parameters are updated, and loss is 
      calculated and printed out to monitor the training progress.


      Parameters
      ----------

      generator: a PyTorch model representing the generator network
      discriminator: a PyTorch model representing the discriminator network
      data_loader: a PyTorch dataloader that loads the training data
      optimizer_G: a PyTorch optimizer for the generator network
      optimizer_D: a PyTorch optimizer for the discriminator network
      criterion: a PyTorch loss function to calculate the loss
      num_epochs: an integer representing the number of times to iterate over the entire dataset during training

   '''
    for epoch in range(num_epochs):
        for i, data in enumerate(data_loader):
            # 1. Train the discriminator on real data
            optimizer_D.zero_grad()
            real_data = data[0].to(device)
            real_label = torch.ones(real_data.size(0), 1).to(device)
            real_output = discriminator(real_data)
            real_loss = criterion(real_output, real_label)
            real_loss.backward()

            # 2. Train the discriminator on fake data
            noise = torch.randn(real_data.size(0), 100, 1, 1).to(device)
            fake_data = generator(noise)
            fake_label = torch.zeros(real_data.size(0), 1).to(device)
            fake_output = discriminator(fake_data.detach())
            fake_loss = criterion(fake_output, fake_label)
            fake_loss.backward()

            # 3. Update the discriminator's parameters
            optimizer_D.step()

            # 4. Train the generator
            optimizer_G.zero_grad()
            noise = torch.randn(real_data.size(0), 100, 1, 1).to(device)
            fake_data = generator(noise)
            fake_output = discriminator(fake_data)
            generator_loss = criterion(fake_output, real_label)
            generator_loss.backward()

            # 5. Update the generator's parameters
            optimizer_G.step()

            # Print Losses
            print("Epoch: [%d/%d], Step: [%d/%d], D_loss: %.4f, G_loss: %.4f" % (epoch+1, num_epochs, i+1, len(data_loader), real_loss.item() + fake_loss.item(), generator_loss.item()))


## Evaluation

In [28]:
def evaluate_gan(generator, discriminator, data_loader, z_dim):
    '''
    evaluate the performance of the GAN model.

    Parameters
    ----------
    generator: the generator model
    discriminator: the discriminator model
    data_loader: a data loader for the real data
    z_dim: the dimension of the random noise input for the generator

    Returns
    -------
    evaluation_matrix: a 2x2 matrix, where the first row represents the real data and the second row represents the fake data
    where threshold of 0.5 is used to classify the results as real or fake
    '''

    # Initialize the evaluation matrix
    evaluation_matrix = np.zeros((2, 2))
    
    # Iterate over the data
    for real_data in data_loader:
        # Generate fake data
        z = torch.randn(len(real_data), z_dim)
        fake_data = generator(z).detach()
        
        # Compute the discriminator's output for the real and fake data
        real_output = discriminator(real_data)
        fake_output = discriminator(fake_data)
        
        # Update the evaluation matrix
        evaluation_matrix[0, 0] += (real_output >= 0.5).sum().item() # True Positive
        evaluation_matrix[0, 1] += (real_output < 0.5).sum().item() # False Negative
        evaluation_matrix[1, 0] += (fake_output < 0.5).sum().item() # False Positive
        evaluation_matrix[1, 1] += (fake_output >= 0.5).sum().item() # True Negative
    return evaluation_matrix

#Use the function
#data_loader = create_data_loader(data)
#evaluation_matrix = evaluate_gan(generator, discriminator, data_loader, z_dim)
