In [1]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import Descriptors, QED, RDConfig, rdmolops, inchi
from sklearn.model_selection import train_test_split
import random
import os
import sys
sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score'))
import sascorer

In [2]:
# expert dataset - ESR1 actives 
expert_df = pd.read_csv('./esr1.csv')
expert_smiles = expert_df['SMILES'].tolist()

In [3]:
# training dataset - MOSES
training_df = pd.read_csv('./dataset_v1.csv')
training_smiles = training_df['SMILES'].tolist()

In [4]:
def canonicalize_smiles_without_stereo(smis):
    """Convert a list of SMILES strings to their canonical forms without stereochemistry."""
    cans = []
    for smi in smis:
        mol = Chem.MolFromSmiles(smi)
        if mol:
            Chem.rdmolops.RemoveStereochemistry(mol)
            cans.append(Chem.MolToSmiles(mol, isomericSmiles=False))
    return cans


In [5]:
# canonizing smiles
expert_smiles = canonicalize_smiles_without_stereo(expert_smiles)
training_smiles = canonicalize_smiles_without_stereo(training_smiles)

In [6]:
def get_inchikey(smiles):
    """Get molecule InChIKey from SMILES."""
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        return inchi.MolToInchiKey(mol)
    else:
        return None

In [7]:
def discard_isomers(smiles_list):
    """Discard stereoisomers from a list of SMILES."""
    inchikeys = [get_inchikey(smi) for smi in smiles_list]
    
    # Extract the first part of the InChIKey (before the first dash) to represent molecular connectivity
    inchikey_parts = [key.split("-")[0] if key else None for key in inchikeys]
    
    # Create a dictionary with the first part of the InChIKey as the key and the SMILES as the values
    inchikey_to_smiles = {}
    for ikey, smi in zip(inchikey_parts, smiles_list):
        if ikey:
            if ikey in inchikey_to_smiles:
                inchikey_to_smiles[ikey].append(smi)
            else:
                inchikey_to_smiles[ikey] = [smi]
                
    # From each group of stereoisomers (with the same molecular connectivity), pick one randomly
    retained_smiles = [random.choice(smiles) for smiles in inchikey_to_smiles.values()]
    
    return retained_smiles

In [8]:
# discarding stereoisomers
expert_smiles = discard_isomers(expert_smiles)
training_smiles = discard_isomers(training_smiles)

In [9]:
# make DataFrames to store chemical properties
expert_df = pd.DataFrame(expert_smiles, columns=['SMILES'])
training_df = pd.DataFrame(training_smiles, columns=['SMILES'])

In [10]:
# EXPORT - this is the expert dataset for imitation learning
expert_df.to_csv('./esr1_canonized.csv', index=False)

In [11]:
# copied from MOSES
def get_mol(smiles_or_mol):
    '''
    Loads SMILES/molecule into RDKit's object
    '''
    if isinstance(smiles_or_mol, str):
        if len(smiles_or_mol) == 0:
            return None
        mol = Chem.MolFromSmiles(smiles_or_mol)
        if mol is None:
            return None
        try:
            Chem.SanitizeMol(mol)
        except ValueError:
            return None
        return mol
    return smiles_or_mol

In [12]:
def get_descriptor(mol_or_smiles, descriptor):
    """Get the function for calculating a given chemical property."""
    mol = get_mol(mol_or_smiles)
    if mol is None:
        return None
    if descriptor == 'logP':
        return Descriptors.MolLogP(mol)
    elif descriptor == 'SA':
        return sascorer.calculateScore(mol)
    elif descriptor == 'QED':
        return QED.qed(mol)
    else:
        print('Something went wrong')

In [13]:
# calculating chemical properties of interest and adding them to the DataFrames
expert_df['logP'] = expert_df['SMILES'].apply(lambda x: get_descriptor(x, 'logP'))
expert_df['SA'] = expert_df['SMILES'].apply(lambda x: get_descriptor(x, 'SA'))
expert_df['QED'] = expert_df['SMILES'].apply(lambda x: get_descriptor(x, 'QED'))

training_df['logP'] = training_df['SMILES'].apply(lambda x: get_descriptor(x, 'logP'))
training_df['SA'] = training_df['SMILES'].apply(lambda x: get_descriptor(x, 'SA'))
training_df['QED'] = training_df['SMILES'].apply(lambda x: get_descriptor(x, 'QED'))

In [14]:
# dropping duplicates
common_rows = pd.merge(expert_df, training_df, on='SMILES')
training_df = training_df.merge(common_rows[['SMILES']], on='SMILES', how='left', indicator=True)
training_df = training_df[training_df['_merge'] == 'left_only']
training_df.drop(columns=['_merge'], inplace=True)

In [15]:
# optional intermittent save to have full datasets with properties
# expert_df.to_csv('./esr1_w-prop.csv', index=False)
# training_df.to_csv('./moses_w-prop.csv', index=False)

In [16]:
# randomly pick 60000 rows to get the non-biased training dataset
sampled_df = training_df.sample(n=60000, replace=False, random_state=42)

In [17]:
# split the sampled_df into train, val and test
moses_train, temp_df = train_test_split(sampled_df, train_size=50000, random_state=42)
moses_val, moses_test = train_test_split(temp_df, train_size=5000, random_state=42)

In [18]:
# EXPORT - get the three datasets which will be used for VAE training
moses_train.to_csv('./moses_train_50k.csv', index=False)
moses_val.to_csv('./moses_val_5k.csv', index=False)
moses_test.to_csv('./moses_test_5k.csv', index=False)

In [19]:
def sample_within_boundaries(df, target, n_samples, max_vals=None, min_vals=None):
    """
    Samples molecules from the training dataframe based on chemical properties boundaries 
    set by the target dataframe.
    
    Parameters:
        df (pd.DataFrame): Training molecules dataframe with SMILES and chemical properties.
        target (pd.DataFrame): Target molecules dataframe with SMILES and chemical properties.
        n_samples (int): Number of molecules to sample.
        max_vals (dict, optional): Additional upper boundaries for certain properties.
        min_vals (dict, optional): Additional lower boundaries for certain properties.

    Returns:
        pd.DataFrame: Sampled dataframe.
    """
    
    # extract property columns
    property_cols = [col for col in df.columns if col != 'SMILES']

    # determine boundaries from target dataframe
    target_mins = target[property_cols].min()
    target_maxs = target[property_cols].max()

    # apply additional constraints if provided
    if max_vals:
        for key, value in max_vals.items():
            if key in target_maxs:
                target_maxs[key] = min(target_maxs[key], value)

    if min_vals:
        for key, value in min_vals.items():
            if key in target_mins:
                target_mins[key] = max(target_mins[key], value)
    
    # filter the training dataframe based on the boundaries
    mask = (df[property_cols] >= target_mins) & (df[property_cols] <= target_maxs)
    filtered_df = df[mask.all(axis=1)]

    # sample from the filtered dataframe
    sampled_df = filtered_df.sample(n=min(n_samples, len(filtered_df)), random_state=42)

    return sampled_df

In [20]:
# sample from training_df based on properties of expert_df to obtain the biased dataset
biased_df = sample_within_boundaries(training_df, expert_df, 60000, min_vals={'QED': 0.8}, max_vals={'SA': 3})

In [21]:
# split the sampled biased_df into train, val and test
biased_train, temp_df = train_test_split(biased_df, train_size=50000, random_state=42)
biased_val, biased_test = train_test_split(temp_df, train_size=5000, random_state=42)

In [22]:
# EXPORT - get the three datasets which will be used for VAE training
biased_train.to_csv('./moses_biased_train_50k.csv', index=False)
biased_val.to_csv('./moses_biased_val_5k.csv', index=False)
biased_test.to_csv('./moses_biased_test_5k.csv', index=False)