A model for generating SMILES strings from images of molecules.

In [None]:
## setup cell
# import required libraries
import numpy as np
import pandas as pd
import csv
import math
import matplotlib.image as mpimg
from sklearn import model_selection
import keras as keras
import keras.layers as layers
import keras.regularizers as regularizers

# load the list of compounds and image conversion errors
compounds = pd.read_csv("compounds.csv")
with open("errors.csv", "r") as file:
    errors = list(csv.reader(file))
errors = list(map(int, errors[0]))

# filter out the compounds that threw errors
compounds = compounds.query("SubstanceID not in @ errors")

# generate a dictionary of unique characters in SMILES strings and its inverse
characters = sorted(list(set(compounds["SMILES"].sum())))
character_dictionary = dict((j, i) for i, j in enumerate(characters))
character_dictionary_inverse = dict((i, j) for i, j in enumerate(characters))

# augment the dictionary with start, stop, and "blank" codons
character_dictionary.update({"start": 68, "stop": 69, "blank": 70})
character_dictionary_inverse.update({68: "start", 69: "stop", 70: ""})

In [None]:
## to keep training time reasonable, we will truncate the dataset
# subset dataset to ~50,000 shortest SMILES strings
compounds["SMILES_length"] = compounds.SMILES.map(lambda x: len(x))
cutoff_length = compounds["SMILES_length"].quantile(50000 / len(compounds))
test = compounds.query("SMILES_length <= @length")

# find the longest SMILES string in the dataset; add space for start and end codons
max_length = compounds["SMILES"].map(lambda x: len(x)).max() + 2

In [None]:
## test, train, validation, split
# split 80/20 into train and test sets
train, test = model_selection.train_test_split(compounds, test_size = 0.2, random_state = 1935235)

# further sudivide training set into train and validation sets to give an 80/20/20 overall split
train, validation = model_selection.train_test_split(train, test_size = 0.25, random_state = 358235)

In [None]:
## a simple function to pad encoded strings with blanks
# declare a function pad
# string: a list generated by encoding a string with character_dictionary
# max_length: the number of items to pad the string out to
# returns: a list of integers padded with 70 to length max_length

def pad(string, max_length):
    while len(string) < max_length:
        string.append(70)
    return(string)

In [None]:
## data generator to supply images and targets to the neural network
# declare a function molecule_image_generator
# SubstanceIDs: a list of SubstanceIDs from which to draw batches
# image_batch_size: the number of images included in each "batch"
# batch_size: the number of rows to return in each batch
# returns: a data generator

def molecule_image_generator(SubstanceIDs, image_image_batch_size, batches_per_image):
    # calculate the maximum number of batches from one iteration of SubstanceIDs
    max_batches = math.floor(len(SubstanceIDs) / image_image_batch_size)

    # declare a counter to track current batch
    batch = 0
    
    # loop through batches indefinitely
    while True:                
        # declare an empty array to hold the image data
        # shape == (image_batch_size * max_length), xpixels, ypixels, channels
        X1 = np.empty([image_batch_size * (max_length), 300, 300, 1], dtype = "int8")
        
        # declare an empty array to hold the SMILES string input fragments 
        # shape == (image_batch_size * (max_length), max_length
        X2 = np.empty([image_batch_size * (max_length), max_length], dtype = "int8")
        
        # declare an empty array (vector) to hold the SMILES string target fragment
        # shape == image_batch_size * max_length
        Y = np.empty([image_batch_size * max_length], dtype = "int8")
        
        
        # determine which images to place in this batch
        # take image_batch_size observations if possible
        if batch < max_batches:
            batch_start = batch * image_batch_size
            batch_end = batch * image_batch_size + image_batch_size
            batch += 1
        # otherwise take as many observations as possible 
        else:
            batch_start = batch * image_batch_size
            batch_end = batch * image_batch_size + (len(SubstanceIDs) % image_batch_size)
            batch = 0
        
        
        # generate X1, X2, Y for a batch of images
        # loop through images by SubstanceID index
        for i in range(batch_start, batch_end):
            # encode the SMILES string for the current SubstanceID into a list of integers
            SMILES = compounds.query("SubstanceID == @SubstanceIDs[@i]")["SMILES"].to_string(header = False, index = False).strip()
            SMILES = [character_dictionary[character] for character in SMILES]
            
            # add start and stop codons, pad with blanks to max_length
            SMILES.insert(0, 68)
            SMILES.append(69)
            SMILES = pad(SMILES, max_length)
            
            # loop through each character in the SMILES string
            for j in range(0, len(SMILES)):
                # calcuate the current observation number
                obs = (i - batch_start) * len(SMILES) + j
                
                # import image into X1; augment with a single channel dimension
                X1[obs, :, :, 0] = mpimg.imread("./molecule_drawings/" + str(SubstanceIDs[i]) + ".png")
                
                # split SMILES at each character into an initiating string and a target character
                X2[obs, :] = pad(SMILES[:j], max_length)
                Y[obs] = SMILES[j]
                
        
        # one-hot encode Y
        Y = keras.utils.to_categorical(Y)

        # subset X1, X2, Y into batches_per_image arrays
        X1 = np.array_split(X1, batches_per_image)
        X2 = np.array_split(X2, batches_per_image)
        Y = np.array_split(Y, batches_per_image)
        
        # yield the generated data to the training function in batches of batch_size observations
        for i in range(len(Y)):
            yield([[X1[i], X2[i]], Y[i]])

Our network will take two inputs:  
1) The images, which are run through a convolutional branch.  
2) The growing SMILES strings, which are run through an LSTM branch.

The two inputs are merged into an FFNN trunk with softmax output.

In [None]:
## define the neural network architecture
# CNN layers for images
input_image = layers.Input(shape = (300, 300, 1))
conv_1 = layers.Conv2D(
    filters = 50, 
    kernel_size = (3, 3), 
    strides = (1, 1), 
    activation = "relu",
    kernel_regularizer = regularizers.l2()
)(input_image)
max_pool_1 = layers.MaxPool2D(pool_size = (3, 3))(conv_1)
conv_2 = layers.Conv2D(
    filters = 100, 
    kernel_size = (5, 5), 
    strides = (1, 1), 
    activation = "relu",
    kernel_regularizer = regularizers.l2()
)(max_pool_1)
max_pool_2 = layers.MaxPool2D(pool_size = (5, 5))(conv_2)
flatten = layers.Flatten()(max_pool_2)
dense_CNN = layers.Dense(
    units = 256,
    activation = "relu",
    kernel_regularizer = regularizers.l2()
)(flatten)
dropout_CNN = layers.Dropout(rate = 0.2)(dropout_CNN)

# RNN layers for SMILES strings
input_SMILES = layers.Input(shape = (max_length, ))
embedding = layers.Embedding(input_dim = max_length, output_dim = 200)(input_SMILES)
lstm = layers.LSTM(units = 256)(embedding)

# merge into FFNN layers
sum_inputs = layers.add([dense_CNN, lstm])
dense_output = layers.Dense(
    units = 1000, 
    activation = "relu",
    kernel_regularizer = regularizers.l2()
)(sum_inputs)
dropout_output = layers.Dropout(rate = 0.2)(dense_output)
output = layers.Dense(units = len(character_dictionary), activation = "softmax")(dropout_output)

model = keras.Model(inputs = [input_image, input_SMILES], outputs = output)
model.summary()

In [None]:
## train the model
# set the number of epochs
epochs = 1
# set the number of images to process at once
image_batch_size = 1
# set the number of batches per image
batches_per_image = 14

# compile the model
model.compile(loss = "categorical_crossentropy", optimizer = "adadelta")

# train the model
model.fit_generator(generator = molecule_image_generator(list(train.SubstanceID), image_batch_size, batches_per_image), steps_per_epoch = (len(train) / image_batch_size) * batches_per_image, nb_epoch = epochs)