In [2]:
from rdkit import Chem
import pandas as pd
from joblib import Parallel,delayed
from rdkit.Chem import AllChem
from rdkit import rdBase
rdBase.DisableLog('rdApp.error')

In [6]:
class RunReaction():
    def __init__(self,path,header = 'Reaction Smarts'):
        # Path : Path to template file
        # Header : header of templates column in the templates files
        reaction=pd.read_csv(path)
        self.reactlist=reaction[header].tolist()
        
    def check_iso(self,products,all_prod):
        ''' Function to convert product to canononical SMILES and check if product is already in previously found reactants
            to ensure templates are not applied again to it in future generation'''
        new_prod=[]
        overall_can=[]
        red_prod=[p for p in products if (p!=[] and p!= False)]
        for x in red_prod:
            can_prod=[]
            for prod in x:
                try:
                    smiles=Chem.CanonSmiles(prod)
                    if smiles in overall_can: #Check for repetition across x's Eg, [[a,b],[a,c]]
                        continue
                    else:

                        overall_can.append(smiles)
                        if smiles in all_prod: #Checks if any element is  part of previous generations
                            can_prod.append(str(all_prod.index(smiles)))
                        else:
                            all_prod.append(smiles)
                            can_prod.append(smiles)   
                except:
                    continue
            if can_prod !=[]: # Checks for repetition of x's Eg, [[a,b],[c,d],[a,b]]
                min_prod=list(set(can_prod)) #Checks for repetition in x eg.[[a,a],[b,c]]
                new_prod.append(min_prod)
            else:
                continue

        return new_prod,all_prod
    
    def runrxn(self,rxnsmarts,reactants):
        # rxnsmarts : SMARTS of template to be applied
        # reactants : reactant to which the template must be applied
        # Returns tuple of products if successful , else False and id (id =0 if no new substrate is added id =1 if missing substrate is added)
        products=[]
        newreact=[]
        id=0
        try:
            rxn=AllChem.ReactionFromSmarts(rxnsmarts)
            AllChem.SanitizeRxn(rxn)
            rxn.Initialize()
        except:
            return False,id
        if rxn.IsInitialized():
            if len(reactants ) < rxn.GetNumReactantTemplates(): #Check if the number of reactants matches the template
                if rxn.IsMoleculeReactant(reactants[0]): #Check if molecule is a reactant in the template
                    templates=rxn.GetReactants()
                    for t in templates:  #Add our molecule to the reactant list in the particular position. Other reactants are the remaining reactant templates 
                        if reactants[0].HasSubstructMatch(t):
                            newreact.append(reactants[0])
                        else:
                            newreact.append(t)
                    newreact=tuple(newreact)
                    try:
                        ps=rxn.RunReactants(newreact)
                    except:
                        return False,id
                    id+=1
                else:
                    ps=()
            else:
                try:
                    rxn.Initialize();
                    ps=rxn.RunReactants(reactants)
                except:
                    return False,id
            for i in range(len(ps)):
                for j in range(len(ps[i])):
                    products.append(Chem.MolToSmiles(ps[i][j]))
            return products,id

        else:
            return False,id
    
    def generate_product(self, reactants,path,gens = 4):
         #reactants : a list of list of Smiles of the initial set of reactants
        # path : path to store generated network
        # gens : # of generations to run the build the network for
        self.all_product=[] # List all molecules in the reaction network 
        f_products,self.all_product=self.check_iso(reactants,self.all_product)
        for i in range(1,4): # Number of generations
            products=[]
            ctr=1
            for rxns in f_products: # All products of a given parent node list containing products from every reaction as a list
                ind=0
                for prod in rxns: # Products of a particular reaction
                    ind=ind+1
                    if prod.isdigit():
                        continue
                    else:
                        prodlist,check = zip(*Parallel(n_jobs=-1, verbose=1)(delayed(self.runrxn)(r,(Chem.MolFromSmiles(prod),)) for r in self.reactlist))
                
                        new_prod,all_product=self.check_iso(prodlist,self.all_product)
                        products.extend(new_prod)
                        string='Gen'+str(i)+str(ctr)+str(ind) # Gen + Generation number + which reaction's product in previous generation + index of product in previous generation  
                        dict[string] = new_prod
                        df = pd.DataFrame({key: pd.Series(value) for key, value in dict.items()})
                        df.to_csv(path)
                ctr=ctr+1
            f_products=products
        


In [11]:
# Example implementation

obj = RunReaction(r'C:\Users\ks\Desktop\ReactionNetwork\FinalTemplate_MapRN.csv')
reactants =[['CCC(=O)CCC=C']]
obj.generate_product(reactants,r'C:\Users\ks\Desktop\ReactionNetwork\Test_RN.csv')


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  26 tasks      | elapsed:   12.8s


KeyboardInterrupt: 