In [19]:
import re
import torch
import pickle
import pandas as pd
from tqdm.auto import tqdm

tqdm.pandas()

# 1. Pre-processing

### Create a combined dataframe
> This creates a dataframe containing the image IDs & labels for both original images provided by the Bristol Myers Squibb pharmaceutical company, and the augmentations generated per each original image.

In [20]:
train_df = pd.read_csv('../../data/bms-molecular-translation/train_labels.csv')

### InChI pre-processing
> This, firstly, splits the first part of the InChI string (the chemical formula) into sequences of text and numbers. Secondly, this splits the second part of the InChI string (the other layers) into sequences of text and numbers.

In [21]:
def split_inchi_formula(formula: str) -> str:
    
    """
    This function splits the chemical formula (in the first layer of InChI) 
    into its separate element and number components.
    
    :param formula: chemical formula, e.g. C13H20OS
    :type formula:  string
    :return:        splitted chemical formula
    :rtype:         string
    """
    
    string = ''
    
    # for each chemical element in the formula
    for i in re.findall(r"[A-Z][^A-Z]*", formula):
        
        # return each separate element, i.e. text
        elem = re.match(r"\D+", i).group()
        # return each separate number
        num  = i.replace(elem, "")
        # add either the element or both element and number (space-separated) to the string 
        if num == "":
            string += f"{elem} "
        else:
            string += f"{elem} {str(num)} "
    
    return string.rstrip(' ')

In [23]:
def split_inchi_layers(layers: str) -> str:
    
    """
    This function splits the layers (following the first layer of InChI)
    into separate element and number components.
    
    :param layers: layer string, e.g. c1-9(2)8-15-13-6-5-10(3)7-12(13)11(4)14/h5-7,9,11,14H,8H2,1-4H3
    :type layers:  string
    :return:       splitted layer info
    :rtype:        string
    """
    
    string = ''
    
    # for each layer in layers
    for i in re.findall(r"[a-z][^a-z]*", layers):
        # get the character preceding the layer info
        elem = i[0]
        # get the number string succeeding the character
        num  = i.replace(elem, "").replace("/", "")
        num_string = ''
        
        # for each number string
        for j in re.findall(r"[0-9]+[^0-9]*", num):
            # get the list of numbers
            num_list = list(re.findall(r'\d+', j))
            # get the first number
            _num = num_list[0]
            # add the number string to the overall result
            if j == _num:
                num_string += f"{_num} "
            else:
                extra = j.replace(_num, "")
                num_string += f"{_num} {' '.join(list(extra))} "
    
        string += f"/{elem} {num_string}"

        return string.rstrip(' ')

### Tokenize texts and predict captions
> This tokenizes each text by converting it to a sequence of characters. Backward compatibility is also maintained, i.e. sequence to text conversion. Image caption predictions also take place within the Tokenizer class.

In [25]:
class Tokenizer(object):
    
    def __init__(self):
        # string to integer mapping
        self.stoi = {}
        # integer to string mapping
        self.itos = {}
    
    def __len__(self) -> None:
        
        """
        This method returns the length of token:index map.
        
        :return: length of map
        :rtype: int
        """
        # return the length of the map
        return len(self.stoi)
    
    def fit_on_texts(self, texts: list) -> None:
        
        """
        This method creates a vocabulary of all tokens contained in provided texts,
        and updates the mapping of token to index, and index to token.
        
        :param texts: list of texts
        :type texts:  list
        """
        
        # create a storage for all tokens
        vocab = set()
        
        # add tokens from each text to vocabulary
        for text in texts:
            vocab.update(text.split(' '))
            
        # sort the vocabulary in alphabetical order
        vocab = sorted(vocab)
        
        # add start, end and pad for sentence
        vocab.append('<sos>')
        vocab.append('<eos>')
        vocab.append('<pad>')
        
        # update the string to integer mapping, where integer is the index of the token
        for i, s in enumerate(vocab):
            self.stoi[s] = i
        
        # reverse the previous vocabulary to create integer to string mapping
        self.itos = {item[1]: item[0] for item in self.stoi.items()}
        
    def text_to_sequence(self, text: str) -> list:
        
        """
        This method converts the given text to a list of its individual tokens,
        including start and end of string symbols.
        
        :param text: input textual data
        :type  text: str
        :return:     list of tokens
        :rtype:      list
        """
        
        # storage to append symbols to
        sequence = []
        
        # add the start of string symbol to storage
        sequence.append(self.stoi['<sos>'])
        
        # add each token in text to storage
        for s in text.split(' '):
            sequence.append(self.stoi[s])
            
        # add the end of string symbol to storage
        sequence.append(self.stoi['<eos>'])
        
        return sequence
    
    def texts_to_sequences(self, texts: list) -> list:
        
        """
        This method converts each text in the provided list into sequences of characters.
        Each sequence is appended to a list and the said list is returned.
        
        :param texts: a list of input texts
        :type  texts: list
        :return:      a list of sequences
        :rtype:       list
        """
        
        # storage to append sequences to
        sequences = []
        
        # for each text do
        for text in texts:
            # convert the text to a list of characters
            sequence = self.text_to_sequence(text)
            # append the lists of characters to an aggregated list storage
            sequences.append(sequence)

        return sequences
    
    def sequence_to_text(self, sequence: list) -> str:
        
        """
        This method converts the sequence of characters back into text.
        
        :param sequence: list of characters
        :type  sequence: list
        :return:         text
        :rtype:          str 
        """
        # join the characters with no space in between
        return ''.join(list(map(lambda i: self.itos[i], sequence)))
    
    def sequences_to_texts(self, sequences: list) -> list:
        
        """
        This method converts each provided sequence into text and returns all texts inside a list.
        
        :param sequences: list of character sequences
        :type  sequences: list
        :return:          list of texts
        :rtype:           list
        """
        
        # storage for texts
        texts = []
        
        # convert each sequence to text and append to storage
        for sequence in sequences:
            text = self.sequence_to_text(sequence)
            texts.append(text)

        return texts
    
    def predict_caption(self, sequence: list) -> str:
        
        """
        This method predicts the caption by adding each symbol in sequence to a resulting string.
        This keeps happening up until the end of sentence or padding is met.
        
        :param sequence: list of characters
        :type  sequence: list
        :return:         image caption
        :rtype:          string
        """
        
        # storage for the final caption
        caption = ''
        
        # for each index in a sequence of symbols
        for i in sequence:
            # if symbol is the end of sentence or padding, break
            if i == self.stoi['<eos>'] or i == self.stoi['<pad>']:
                break
            # otherwise, add the symbol to the final caption
            caption += self.itos[i]
            
        return caption
    
    def predict_captions(self, sequences: list) -> list:
        
        """
        This method predicts the captions for each sequence in a list of sequences.
        
        :param sequences: list of sequences
        :type  sequences: list
        :return:          list of final image captions
        :rtype:           list
        """
        
        # storage for captions
        captions = []
        
        # for each sequence, do
        for sequence in sequences:
            
            # predict the caption per sequence
            caption = self.predict_caption(sequence)
            
            # append to the storage of captions
            captions.append(caption)
            
        return captions

In [26]:
# split the InChI string with the backslash delimiter
train_df['InChI_chemical_formula'] = train_df['InChI'].apply(lambda x: x.split('/')[1])

### Pre-process
> This performs all preprocessing steps, mainly: (1) converting InChI string to space separated list of elements,
(2) tokenizing the InChI string by creating lists of elements, and (3) computing the actual lengths of each such list. The results are returned in `train_df`.

In [None]:
# split the InChI string into the chemical formula part and the other layers part
train_df['InChI_text'] = (
    train_df['InChI_chemical_formula'].apply(split_inchi_formula) 
    + ' ' 
    + train_df['InChI'].apply(lambda x: '/'.join(x.split('/')[2:])).apply(split_inchi_layers).values
    + ' '
    + train_df['InChI'].apply(lambda x: x[x.find('/h'):]).apply(split_inchi_layers).values
)

In [29]:
# save the train_df in a separate csv
train_df.to_csv('../../data/train_df.csv')

In [30]:
# create a tokenizer class
tokenizer = Tokenizer()

# create a vocabulary of all InChI tokens
tokenizer.fit_on_texts(train_df['InChI_text'].values)

# save the tokenizer
torch.save(tokenizer, '../../data/tokenizer.pth')

# store all sequence lengths
lengths = []

# creates a progress bar around the iterable
tk = tqdm(train_df['InChI_text'].values, total=len(train_df))

# for each text, i.e. InChI string, in the iterable, do
for text in tk:
    
    # convert text to sequence of characters
    seq = tokenizer.text_to_sequence(text)
    
    # update the caption length (reduced by 2 for <end> and <pad>) and append to the aggregated storage
    length = len(seq) - 2
    lengths.append(length)
    
# write down the lengths in the dataframe
train_df['InChI_length'] = lengths

# save as a pickle file
train_df.to_pickle('../../data/train.pkl')

print('Saved the train dataframe as a pickle file.')