#Predicting Molecular Bioactivity

Fisher Moritzburke (fmoritzb) & 
Martin Hoffmann (maedhoff)

In [1]:
%matplotlib inline

import numpy as np

import sklearn as sk
from sklearn.model_selection import train_test_split # split into training/test
from sklearn import preprocessing  # for standardizing the data
from sklearn import metrics  # Useful for creating confusion matrices

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from tensorflow import keras
from keras import backend as K
Activation = keras.layers.Activation
to_categorical = keras.utils.to_categorical
Sequential = keras.Sequential
from keras.models import Model
from keras.layers import Input, Dense, Dropout
from keras.layers.merge import add

import math
import time

from google.colab import files
import io

import warnings  
warnings.filterwarnings('ignore')

Using TensorFlow backend.


In [0]:
datasets = [
    ('HIVINT', '18wUy7Ax3LcpbOWEWxnd6vZ2q7K6Bd9IA'), #15.3MB
    ('HIVPROT', '1kQZWm-Nxf5M-do_LwNabPEfMpdVKmLxZ'), #37.1MB
    #('TDI', '1QAM7yM4pnN8_cLsTkccMhZZxFr8f9xRQ'), #47.8MB
    ('OX1', '15bNN3_eyKlgGLOqN-veLgNLRqFAdU64P'), #49.5MB
    ('THROMBIN', '11RJO5lG-pd1a_GrtraXx811wfkJHG-16'), #53.6MB
]

#Helper functions

In [0]:
# takes a (int or float) and returns a boolean
def greater_than_zero(val):
  # returns True if the value is greater than 0
  assert isinstance(val, (int, float))
  return(True if val>0 else False)

# takes a pandas dataframe and an int, returns a pandas dataframe
def drop_irrelevant_columns(data, n):
  # drops columns from the dataframe with less than n non-zero values
  num_columns = len(data.columns)
  to_drop = []
  for col in data.columns[2:]:
    if list(map(greater_than_zero, data[col])).count(True) < n:
      to_drop.append(col)

  data = data.drop(columns=to_drop)
  
  remaining_percentage = round( ((len(data.columns)-2)/num_columns)*100, 2)
  print(f'{len(to_drop)} columns have been dropped. {len(data.columns)} columns ({remaining_percentage}%) remain.')
  
  return data

In [0]:
# takes a (int or float) and returns a (int or float)
def log(val):
  if not isinstance(val, (int, float)):
    return val
  if val == 0:
    return 0
  else:
    return math.log(val+1, 10)

In [0]:
# r_square accuracy metric function
def r_square(y_true, y_pred):
    SS_res =  K.sum(K.square(y_true - y_pred)) 
    SS_tot = K.sum(K.square(y_true - K.mean(y_true))) 
    return ( 1 - SS_res/(SS_tot + K.epsilon()) )

#Build and evaluate models


In [0]:
def run(dataset_name, dataset_file_id, min_molecules_per_column = -1,
           log_transform=True, 
           nn_epochs=150,
           autoencoder_epochs=50,
           nn_ae_epochs=150,
           resMLP_epochs=200,
           verbose=2):
  print(f"{dataset_name} data set...")
  
  histories = {}
  
  Dropout = keras.layers.Dropout
  Dense = keras.layers.Dense
  
  # 1) fetch data from google drive
  link = 'https://drive.google.com/uc?export=download&id={FILE_ID}'
  csv_url = link.format(FILE_ID = dataset_file_id)
  try:
    data = pd.read_csv(csv_url, sep=',')
  except Exception as e:
    print('There was an error parsing the csv file!')
    return
  
  # 2) remove irrelevant columns:
  if min_molecules_per_column > -1:
    data = drop_irrelevant_columns(data, min_molecules_per_column)
    
  # 3) apply a logarithmic transformation:
  if log_transform:
    data = data.applymap(log)
  
  # 4) split the dataset in training and test data:
  cols = data.columns.tolist()[2:]    # exclude MOLECULE and Act
  X = data[cols]
  y = data['Act']
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0)
  
  # make sure data is numpy array
  if isinstance(X_train, pd.DataFrame):
    X_train = X_train.values
    X_test = X_test.values
  
  # 5) build, train, & evaluate the models
  
  #============ Standard MLP ===============#
  
  model = Sequential()

  input_dim = X_train.shape[1]

  # Add layers
  model.add( Dense(2000, activation='relu', input_dim = input_dim))
  model.add(Dropout(0.25))
  model.add(Dense(1000, activation='relu'))
  model.add(Dropout(0.25))
  model.add(Dense(1000, activation='relu'))
  model.add(Dropout(0.25))
  model.add(Dense(1000, activation='relu'))
  model.add(Dropout(0.1))
  # Layer 5, output layer
  model.add(Dense(1, activation = 'linear'))

  # Compile the NN model, defining the optimizer to use, the loss function, and the metrics to use.
  model.compile(optimizer = 'RMSprop',
               loss = 'mse',
               metrics = [r_square])
  
  # fit the model to the training data:
  print('training the MLP...')
  history = model.fit(X_train, 
    y_train,
    validation_split = 0.2, 
    epochs = nn_epochs, 
    batch_size = 50,
    verbose = verbose,
  )
  
  # Evaluate the MLP's performance
  evaluation = {}
  
  train_loss, train_acc = model.evaluate(X_train, y_train) # evaluate
  test_loss, test_acc = model.evaluate(X_test, y_test) # evaluate
  
  evaluation['train_acc'] = train_acc
  evaluation['test_acc'] = test_acc
  evaluation['train_loss'] = train_loss
  evaluation['test_loss'] = test_loss
  
  print('MLP test set accuracy:', test_acc, '\n')
  
  histories['MLP'] = {'history': history.history,
                      'evaluation' : evaluation}
  
  
  #============ Encoder + MLP ===============#
  
  # create the autoencoder:
  input_dim = X_train.shape[1] # the input size
  encoding_dim = 50 # the size of the encoded representation
  compression_factor = float(input_dim) / encoding_dim

  autoencoder = Sequential() # create sequential model
  # Encoder Layers
  autoencoder.add(Dense(input_dim/4, input_shape=(input_dim,), activation='relu'))
  autoencoder.add(Dense(input_dim/10, activation='relu'))
  autoencoder.add(Dense(encoding_dim, activation='relu')) # encoded state
  # Decoding Layers
  autoencoder.add(Dense(input_dim/10, activation='relu'))
  autoencoder.add(Dense(input_dim/4, activation='relu'))
  autoencoder.add(Dense(input_dim, activation='relu'))

  autoencoder.compile(optimizer='RMSprop',
    loss = 'mse', metrics=[r_square])

  autoencoder_history = autoencoder.fit(X_train, X_train,
    epochs=autoencoder_epochs, #50
    batch_size=256,
    validation_data=(X_test, X_test),
    verbose = verbose,
  )
  
  
  # build a new model using the autoencoder as a basis:
  ae_model = Sequential()

  # Fist add the encoding layers trained previously
  ae_model.add(autoencoder.layers[0])
  ae_model.add(autoencoder.layers[1])
  ae_model.add(autoencoder.layers[2])

  # add an additional 2 FC layers
  ae_model.add(Dense(50, activation='relu'))
  ae_model.add(Dropout(0.25))
  ae_model.add(Dense(20, activation='relu'))
  ae_model.add(Dropout(0.1))
  # output layer
  ae_model.add(Dense(1, activation = 'linear'))
  
  ae_model.compile(optimizer = 'RMSprop',
             loss = 'mse',
             metrics = [r_square])

  print('Training the encoder + MLP...')
  
  history = ae_model.fit(X_train, 
    y_train,
    validation_split = 0.33, 
    epochs = nn_ae_epochs, #300 
    batch_size = 32,
    verbose = verbose,
  )
  
  # Evaluate the model's performance
  evaluation = {}
  
  train_loss, train_acc = ae_model.evaluate(X_train, y_train) # evaluate
  test_loss, test_acc = ae_model.evaluate(X_test, y_test) # evaluate
  
  evaluation['train_acc'] = train_acc
  evaluation['test_acc'] = test_acc
  evaluation['train_loss'] = train_loss
  evaluation['test_loss'] = test_loss

  print('encoder + MLP test set accuracy:', test_acc, '\n')
  
  histories['ae_MLP'] = {'history': history.history, 
                         'evaluation': evaluation}
  
  #============ residual MLP ===============# 
  from keras.layers import Input, Dense, Dropout
  
  # residualBlock layer. Returns the outputs and the new size of the tensor
  def residualBlock(inputs, size):
    outputs = Dense(size, activation='relu')(inputs) # Dense relu
    outputs = Dropout(.2)(outputs) # dropout
    outputs = add([outputs, inputs]) # add layers
    outputs = Dense(size//2, activation='relu')(outputs)
    return outputs, size//2
  
  inSize = X_train.shape[1] # size of input
  
  visible = Input(shape=(inSize,), name='input') # input layer
  x, size = residualBlock(visible, inSize)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  x, size = residualBlock(x, size)
  output = Dense(1, activation='linear')(x)
  
  resMLP = Model(inputs=visible, outputs=output)
  
  resMLP.compile(optimizer = 'RMSprop',
                loss = 'mse',
                metrics = [r_square])
  
  print('Training the resMLP...')
  
  history = resMLP.fit(X_train, 
                      y_train,
                      validation_split = 0.2, 
                      epochs = resMLP_epochs,
                      batch_size = 100,
                      verbose = verbose)
  
  # Evaluate the model's performance
  evaluation = {}
  train_loss, train_acc = resMLP.evaluate(X_train, y_train) # evaluate train
  test_loss, test_acc = resMLP.evaluate(X_test, y_test) # evaluate test
  # save evaluations
  evaluation['train_acc'] = train_acc
  evaluation['test_acc'] = test_acc
  evaluation['train_loss'] = train_loss
  evaluation['test_loss'] = test_loss
  
  print('residual MLP test set accuracy:', test_acc)
  
  histories['resMLP'] = {'history': history.history,
                         'evaluation': evaluation}

  print('\n')
  
  return histories

In [7]:
print('Building models')
results = {}

for dataset in datasets:
  name = dataset[0]

  results[name] = run(
    dataset_name             = name, 
    dataset_file_id          = dataset[1],
    min_molecules_per_column = 30,
    log_transform            = True,
    nn_epochs=150, 
    autoencoder_epochs=50,
    nn_ae_epochs=200,
    resMLP_epochs=200, 
    verbose=0
  )

In [0]:
# print evaluation for each dataset
for dataset in results.keys():
  print('Dataset:', dataset)
  print('  MLP - test accuracy', '=', results[dataset]['MLP']['evaluation']['test_acc'])
  print('  Encoder+MLP - test accuracy', '=', results[dataset]['ae_MLP']['evaluation']['test_acc'])
  print('  Residual MLP - test accuracy', '=', results[dataset]['resMLP']['evaluation']['test_acc'])
  print('\n')