In [1]:
############################################################################
#This code utilizes reaction information from MetaCyc to track carbon fates
#.. in genome-scale metabolic networks
#
#In this example, the carbons from isotopically labeled glucose are
#.. transferred to downstream metabolties, and dictionaries are built to
#.. compile the reactants, products, and enzymes which catalyzed the reaction
#
#This is an example of two in silico isotope labeling reactions
#This work has been extended to build metabolic pathways from experimental
#.. data, and is currently being prepared for publication
#
############################################################################

In [2]:
import numpy, pandas, os, sys, re, itertools, csv, gc

In [3]:
from itertools import chain

In [4]:
from collections import defaultdict

In [5]:
import dill as pickle

In [6]:
#Import the reaction links of RXNs identifiers to ECs
MetaCycReactionLinks=pandas.read_csv('reaction-linksAdd.txt',sep='\t',skiprows=1)
MetaCycReactionLinks=MetaCycReactionLinks.drop(0)

In [7]:
#Trim reaction links
MetaCycReactionLinks=MetaCycReactionLinks[MetaCycReactionLinks.columns[0:2]]

In [8]:
MetaCycReactionLinks.columns=['MetaCyc','EC']

In [9]:
#Create a dictionary of MetaCyc RXN IDs with EC numbers as values
MetaCycReactionLinksDict=dict(zip(MetaCycReactionLinks.MetaCyc,MetaCycReactionLinks.EC))

In [10]:
#Read in the Smiles Matrix of reactions to compounds - use fly reactions
#SmilesMap=pandas.read_csv('atom-mappings-smiles.dat',sep='\t',header=None)
#Fly reactions
SmilesMap=pandas.read_csv('atom-mappings-smiles-flyonlyAdd.dat',sep='\t',header=None,skiprows=1,usecols=[1,2])
SmilesMap.columns=['Reaction','Compounds']

In [11]:
#Read in list of Smiles->'English' conversions
PubChemCpds=pandas.read_csv('PubChemCpdMatchAdd.csv',sep=',')

In [12]:
#Compiled expression matches - may shave some time off later given the
#.. number of string matches I perform
SmilesStringRe1=re.compile(r':\d?\d]*')
SmilesStringRe2=re.compile(r'\[|\]')
SmilesStringRe3=re.compile(r'^\'|\'$')
MetaCycSplitter=re.compile(r'(>>|\.)')
CarbFinder=re.compile(r'C\*?')
LabelFinder=re.compile(r'C\*:\d+')
SpaceRemover=re.compile(r'^ ')

In [13]:
#Dictionary to pull out 'English name' from the Smiles format
PubChemCpdsDict=dict(zip(PubChemCpds.Smiles,PubChemCpds.Name))

In [14]:
#Function to get a smiles string from the MetaCyc-specific Smiles annotation
#Using compiled expressions
def SmilesFromMetaCycCpd(OneCpdString):
    
    #Get rid of the bracketing
    OneCpdString=SmilesStringRe1.sub(repl='',string=OneCpdString)
    
    #Extra bracketing
    OneCpdString=SmilesStringRe2.sub(repl='',string=OneCpdString)
    
    #Remove extra apostrophes if they exist
    if OneCpdString.startswith('\'') and OneCpdString.endswith('\''):
        OneCpdString=SmilesStringRe3.sub(repl='',string=OneCpdString)
        
    return(OneCpdString)

In [15]:
#Define glucose by SMILES format
GLUCOSESMILES='C(C1(C(C(C(C(O1)O)O)O)O))O'

In [16]:
#13C6 Glucose and initial labeling list - this string can be changed to
#.. any tracer of interest, e.g. 13C3 Lactate
GLUCOSESMILES13C6='C*(C*1(C*(C*(C*(C*(O1)O)O)O)O))O'
LabListInitial=[GLUCOSESMILES13C6]

In [17]:
#Dictionary of MetaCyc Rxn to MetaCyc Compound list split up
SmilesMapDict=dict(zip(SmilesMap.Reaction,SmilesMap.Compounds))

In [18]:
#Dictionary of MetaCyc Rxn to Compound items in SMILES format
SmilesMapSmilesSplit=SmilesMap.Compounds.str.split('[.]|[>>]')
Reducer=lambda x:[SmilesFromMetaCycCpd(y) for y in x]
SmilesMapSmilesSplit=SmilesMapSmilesSplit.apply(Reducer)
SmilesMapSmilesCpdDict=dict(zip(SmilesMap.Reaction,SmilesMapSmilesSplit))

In [19]:
#Master compound list built from SmilesMap
MasterCpdList=list(chain.from_iterable(SmilesMapSmilesSplit))
MasterCpdList=list(set(MasterCpdList))

In [20]:
#Master dictionary of the reactions which contain any one of the 12k compounds - useful for GenerateNewReaction function
CpdToRxnDict={}
for cpd in range(len(MasterCpdList)):
    xlist=[k for (k,v) in SmilesMapSmilesCpdDict.items() if MasterCpdList[cpd] in v] #56s for cpd list of 12k
    CpdToRxnDict.setdefault(MasterCpdList[cpd],[])
    CpdToRxnDict[MasterCpdList[cpd]].extend(xlist)

In [21]:
#Get the index in a reaction that is already split by its .'s and >>'s of the >> (i.e. what splits the reactants and products)
def GetTheReactantsLength(OneSplitSMILESReaction):
    for i in range(0,len(OneSplitSMILESReaction)):
        if len(OneSplitSMILESReaction[i])==0:
            return i

In [22]:
#Dictionary of the point of equilibration sign based on str.split('[.]|[>>]'), with keys as the MetaCyc RXN IDs
ReactantLengthDict=dict(zip(SmilesMap.Reaction,SmilesMapSmilesSplit.apply(GetTheReactantsLength)))

In [23]:
#Insert a labeled compound into a MetaCyc reaction by finding its isotope-stripped version
def InsertLabeledMetabIntoString(BigString,LabMetab,StripMetab):
    
    #Find the carbons
    Carbs=CarbFinder.findall(LabMetab)
    
    #Iterative list of the carbon locations with a label
    match=[j for j in range(len(Carbs)) if Carbs[j].__contains__('C*')]

    #Split the MetaCyc reaction compound string into reactants and products
    TempRxn=MetaCycSplitter.split(str(BigString))
    
    #Convert each item to Smiles format
    TempRxn1=[SmilesFromMetaCycCpd(x) for x in TempRxn]
    
    for metab in range(len(TempRxn)):
        
        #Find the carbon locations
        CIndicies=[m.start() for m in re.finditer('C',TempRxn[metab])]
        
        #Listify
        ListMetab=list(TempRxn[metab])
        
        #If any of the items from the MetaCyc reactants/products list match the metabolite which contains the isotope..
        if TempRxn1[metab]==StripMetab:
            
            for location in range(len(match)):
                
                #Insert the isotope at the right location
                ListMetab[CIndicies[match[location]]]='C*'
                
                #Rejoin the list - necessary?
                ConvertedMetab=''.join(ListMetab)
                
                #Replace old metabolite with newly labeled form
                TempRxn[metab]=ConvertedMetab
            
        #Convert back to MetaCyc format
        BigString=''.join(item for item in TempRxn)
        
    return(BigString)

In [24]:
#Take the list of labeled compounds, and find all the Metacyc reactions that contain the compounds, build a matrix
def BuildReactionMatrixFromCpdList(ListOfLabeledCpds):
    
    MasterRxnList=[]
    MasterCompoundList=[]
    
    #Strip all labeled metabolites of isotopes for matching purposes
    StrippedList=[x.replace('C*','C') for x in ListOfLabeledCpds]
    
    for labcpd,nolabcpd in zip(ListOfLabeledCpds,StrippedList):
        
        try:
            
            #Try using the dictionary to generate a list of reactions for each given metabolite
            RxnList=CpdToRxnDict[nolabcpd]
            
            #Get the Metacyc compounds from each reaction
            RxnCompounds=[SmilesMapDict[x] for x in RxnList]
            
            #Insert the labeled compound into the Metacyc compounds list
            RxnCompounds=[InsertLabeledMetabIntoString(x,labcpd,nolabcpd) for x in RxnCompounds]
            
        except:
            RxnList=[]
            RxnCompounds=[]
        
        #Build out the lists
        MasterRxnList.extend(RxnList)
        MasterCompoundList.extend(RxnCompounds)

    #Turn into Series' for later apply functions and indexing
    PrepMatrix=pandas.Series([MasterRxnList,MasterCompoundList])
    
    #Build DataFrame.. is this a memory concern?
    PrepMatrix=BuildReactionMatrixExport(PrepMatrix)
    
    return(PrepMatrix)

In [25]:
#Function to build the reaction matrix export - done, with reactions filled in already
def BuildReactionMatrixExport(PreLabeledReactionMatrix):
    
        #Build new data frame with same rows as ReactionMatrix
        DataFrameExport=pandas.DataFrame(index=numpy.arange(len(PreLabeledReactionMatrix[0])),columns=['Reaction','Compounds','Reactants','Products'])
        
        #Fill the reaction column from the lists of reactions and compounds with isotope addition from 'BuildReactionMatrix' function
        DataFrameExport['Reaction']=PreLabeledReactionMatrix[0]
        DataFrameExport['Compounds']=PreLabeledReactionMatrix[1]
        
        del PreLabeledReactionMatrix
        
        return(DataFrameExport)

In [26]:
#Take list of carbon numbers, find them in the metabolite of interest, and replace Cs with C*s (i.e. with isotope)
def MiniInsertLabel(CarbonRegexList,Metabolite):
    for num in CarbonRegexList:
        Metabolite=re.sub(pattern='\\b'+'C:'+num+'\\b',repl='C*:'+num,string=Metabolite)
    return(Metabolite)

In [27]:
#Take labeled reactants, find carbon numbers, map to products and label those carbons
def GenericLabelingReactionMatrixApply(UnlabeledReactionMatrix):
    
    #Split into metabolite items
    UnlabeledReactionMatrix=MetaCycSplitter.split(UnlabeledReactionMatrix)
    
    #Get all reactants with label
    ReactantList=[item for item in UnlabeledReactionMatrix if '*' in item]
    
    #find carbom numbers to get labeled
    FindThese=LabelFinder.findall(str(ReactantList))
    
    #Return only the numbers that are assigned to the carbons
    FindThese=[x.replace('C*:','') for x in FindThese]
    
    #Get cpds not currently labeled
    HoldList=[item for item in UnlabeledReactionMatrix if not '*' in item]
    
    #Return the smiles format of reactants labeled cpds
    ReactantList=[SmilesFromMetaCycCpd(x) for x in ReactantList]
    
    #Label any metabolite that contains a carbon number from FindThese
    HoldList=[MiniInsertLabel(FindThese,x) for x in HoldList]
    
    #Pull out only metabolites which obtained a labeled (i.e. the Products)
    ProductList=[item for item in HoldList if '*' in item]
    
    #Return Smiles
    ProductList=[SmilesFromMetaCycCpd(x) for x in ProductList]
    
    #Build tuple of reactants and products
    UnlabeledReactionMatrix=(ReactantList,ProductList)
    
    del ReactantList,ProductList,HoldList

    return(UnlabeledReactionMatrix)

In [28]:
#Unpack the tuple of reactants and products, fill in the appropriate columns of the dataframe
def UnpackAndFill(ReactedMatrix):
    Reactants,Products=zip(*ReactedMatrix.Compounds)
    ReactedMatrix.Reactants=Reactants
    ReactedMatrix.Products=Products
    del Reactants,Products
    ReactedMatrix.drop('Compounds',axis=1,inplace=True)
    return(ReactedMatrix)

In [29]:
#New function, take products from Labeled reaction matrix, make new labeled list
def NewLabeledCpdList(ReactedMatrix):
    
    #Build list of cpds from the products column - note NewCpds is a replacement of Products for the paralleled form
    NewList=list(chain.from_iterable(ReactedMatrix.NewCpds))

    #No CO2 - Jan 31, 2017 - optional
    NewList=[x for x in NewList if x!='C(=O)=O']
    
    #Keep unique elements of list
    NewList=list(set(NewList))
    
    return(NewList)

In [30]:
#Use MetaCycReactionLinksDict for reaction conversion from MetaCyc's RXN to EC
def ReactionConvert(RunReactionMatrix):
    if str(MetaCycReactionLinksDict[RunReactionMatrix])!='nan':
        RunReactionMatrix=MetaCycReactionLinksDict[RunReactionMatrix]
    return(RunReactionMatrix)

In [31]:
#Name converter from Smiles to 'English'
def ReactantConvertOne(x):
    try:
        x=PubChemCpdsDict[x]
    except:
        x=''
    return(x)

In [32]:
#Convert a compound string so that it can be matched to an 'English' name

#Compiled patterns
ChargeRemover=re.compile(r'\+|\-')
ExtraPostasRemover=re.compile('^\'|\'$')

def LabelStrippersOne(x):
    
    #Remove labels
    x=x.replace('*','')
    
    #Remove charges
    x=ChargeRemover.sub('',string=str(x))
    
    #Remove any rextra apostrophes
    x=ExtraPostasRemover.sub('',string=str(x))
    
    return(x)

In [33]:
#Convert a labeled metabolite string into 'English isotopomer name (e.g. Glucose M+6)
def CombinedConvertMetabs(OneLabeledMatrixCpd):
    
    #Get number of label counts
    OneLabeledMatrixCpdCarbs=OneLabeledMatrixCpd.count('*')
    
    #Strip labels and charges
    OneLabeledMatrixCpdsEnglish=LabelStrippersOne(OneLabeledMatrixCpd)
    
    #Convert to English using metabolite dictionary
    OneLabeledMatrixCpdsEnglish=ReactantConvertOne(OneLabeledMatrixCpdsEnglish)
    
    #Format string to use both english name and label number
    OneLabeledMatrixCpd=str('{0} M+{1}').format(OneLabeledMatrixCpdsEnglish,OneLabeledMatrixCpdCarbs)
    
    return(OneLabeledMatrixCpd)

In [34]:
#Function to call the conversion of reactants and products to english
def UpdatedMetabConvert(LabeledMatrixCpds):
    LabeledMatrixCpds=[CombinedConvertMetabs(x) for x in LabeledMatrixCpds]
    return(LabeledMatrixCpds)

In [35]:
#Trim reaction matrix to drop nonmatching metabolites (i.e. they can't be named)
def TrimExportReactionMatrix(ExportReactionMatrix):

    #Drop the nans
    ExportReactionMatrix=ExportReactionMatrix.dropna(subset=['Reactants'])
    ExportReactionMatrix=ExportReactionMatrix.dropna(subset=['Products'])
    
    #drop row indicies that have a blank Labeled Reactants or Products cell
    ExportReactionMatrix=ExportReactionMatrix.drop(ExportReactionMatrix[ExportReactionMatrix['Reactants'].map(len)==0].index,axis=0)
    ExportReactionMatrix=ExportReactionMatrix.drop(ExportReactionMatrix[ExportReactionMatrix['Products'].map(len)==0].index,axis=0)

    return(ExportReactionMatrix)

In [36]:
#If metabolite didn't get an english name match, drop it
def DropMissingMetabs(ConvertedMatrixColumnCpd):

    ConvertedMatrixColumnCpd=[item for item in ConvertedMatrixColumnCpd if not re.search('^ M+[0-9]*',item)]

    return(ConvertedMatrixColumnCpd)

In [37]:
#convert to english, build the dictionary for round 1
def ConvertMatrixToEnglish(LabeledReactionMatrix):
    
    #Convert RXN to EC
    LabeledReactionMatrix.Reaction=LabeledReactionMatrix.Reaction.apply(ReactionConvert)
    
    #Convert reactants and products to english M+x isotopomer names
    LabeledReactionMatrix.Reactants=LabeledReactionMatrix.Reactants.apply(UpdatedMetabConvert)    
    LabeledReactionMatrix.Products=LabeledReactionMatrix.Products.apply(UpdatedMetabConvert)

    #Remove unnamed metabolites
    LabeledReactionMatrix.Reactants=LabeledReactionMatrix.Reactants.apply(DropMissingMetabs) #622ms, 25% faster than before
    LabeledReactionMatrix.Products=LabeledReactionMatrix.Products.apply(DropMissingMetabs) #622ms, 25% faster than before

    #Drop empty fields
    LabeledReactionMatrix=TrimExportReactionMatrix(LabeledReactionMatrix)
    LabeledReactionMatrix=LabeledReactionMatrix.reset_index(drop=True)
    
    return(LabeledReactionMatrix)

In [38]:
#Build dictiontary/hashable results from the labeling matrix
def MakingDict(LabeledRxnMatrix):
    
    Dictionary=defaultdict(lambda: defaultdict(list))

    for rxns in range(len(LabeledRxnMatrix)):
        
        #Remove any extra spaces
        Reactants=[SpaceRemover.sub('',x) for x in LabeledRxnMatrix.Reactants[rxns]]

        Products=[SpaceRemover.sub('',x) for x in LabeledRxnMatrix.Products[rxns]]
        
        #Add values to the Product keys, both the reactants as well as the reactants' as keys to the enzymes
        if len(Products)==1:
            if LabeledRxnMatrix.Reaction[rxns] in Dictionary[str(Products).strip('[]|\'')][Reactants[0]]:
                pass
            else:
                Dictionary[str(Products).strip('[]|\'')][Reactants[0]].append(LabeledRxnMatrix.Reaction[rxns])
           
        if len(Products)>1:
            for cpd in range(len(Products)):
                if LabeledRxnMatrix.Reaction[rxns] in Dictionary[str(Products[cpd]).strip('[]|\'')][Reactants[0]]:
                    pass
                else:
                    Dictionary[str(Products[cpd]).strip('[]|\'')][Reactants[0]].append(LabeledRxnMatrix.Reaction[rxns])

    
    return(Dictionary)

In [39]:
#Run rounds 1 as normal before parallel.. to get it going

In [40]:
%%time
#Nonparallel version - round 1
NextRoundReaction=BuildReactionMatrixFromCpdList(LabListInitial)
NextRoundReaction.Compounds=NextRoundReaction.Compounds.apply(GenericLabelingReactionMatrixApply)
NextRoundReaction=UnpackAndFill(NextRoundReaction)
NextRoundReaction['NewCpds']=NextRoundReaction.Products
NextRoundCpdList=NewLabeledCpdList(NextRoundReaction)
CpdFile='LabeledCpds_FromRound_1.csv' #Change
with open(CpdFile,'w') as output:
    writer=csv.writer(output,lineterminator='\n')
    for val in NextRoundCpdList:
        writer.writerow([val])
NextRoundReaction=ConvertMatrixToEnglish(NextRoundReaction)
DictionaryRound=MakingDict(NextRoundReaction)
output=open('Dictionary_FromRound_1.pkl','wb', -1) #Change
pickle.dump(DictionaryRound,output,protocol=4)
print('Round 1 Done') #Change

Round 1 Done
CPU times: user 44 ms, sys: 0 ns, total: 44 ms
Wall time: 45 ms


In [41]:
#Results of Reaction Round #1
DictionaryRound

defaultdict(<function __main__.MakingDict.<locals>.<lambda>>,
            {'1-O-phosphonohexopyranose M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-3.1.3.10', 'EC-2.4.1.64']}),
             '6-o-phosphonohexopyranose M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-2.7.1.147', 'EC-2.7.1.1']}),
             'D-(+)-Trehalose M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-2.4.1.64']}),
             'cellobiose M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-3.2.1.20', 'EC-2.4.1.25']}),
             'glucono-delta-lactone M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-1.1.1.47', 'EC-1.1.5.2']}),
             'maltotriose M+12': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-3.2.1.20']}),
             'melibiose M+6': defaultdict(list,
                         {'Hexopyranose M+6': ['EC-3.2.1.10']})})

In [42]:
###

In [43]:
%%time
#Nonparallel version - round 2
NextRoundReaction=BuildReactionMatrixFromCpdList(NextRoundCpdList)
NextRoundReaction.Compounds=NextRoundReaction.Compounds.apply(GenericLabelingReactionMatrixApply)
NextRoundReaction=UnpackAndFill(NextRoundReaction)
NextRoundReaction['NewCpds']=NextRoundReaction.Products
NextRoundCpdList=NewLabeledCpdList(NextRoundReaction)
CpdFile='LabeledCpds_FromRound_2.csv' #Change
with open(CpdFile,'w') as output:
    writer=csv.writer(output,lineterminator='\n')
    for val in NextRoundCpdList:
        writer.writerow([val])
NextRoundReaction=ConvertMatrixToEnglish(NextRoundReaction)
DictionaryRound=MakingDict(NextRoundReaction)
output=open('Dictionary_FromRound_2.pkl','wb', -1) #Change
pickle.dump(DictionaryRound,output,protocol=4)
print('Round 2 Done') #Change

Round 2 Done
CPU times: user 96 ms, sys: 8 ms, total: 104 ms
Wall time: 104 ms


In [44]:
#Results from Round 2 reaction
DictionaryRound

defaultdict(<function __main__.MakingDict.<locals>.<lambda>>,
            {'1,6-di-o-phosphonohexopyranose M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-2.7.1.10'],
                          '6-o-phosphonohexopyranose M+6': ['EC-2.7.1.11']}),
             '1-O-phosphonohexopyranose M+6': defaultdict(list,
                         {'6-o-phosphonohexopyranose M+6': ['EC-5.4.2.6',
                           'EC-5.4.2.5'],
                          'D-(+)-Trehalose M+6': ['EC-2.4.1.231',
                           'EC-2.4.1.64']}),
             '2906-23-2 M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-2.7.7.33']}),
             '6-o-phosphonohexopyranose M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-5.4.2.6',
                           'EC-5.4.2.5'],
                          '6-o-phosphonohexopyranose M+6': ['EC-5.1.3.15']}),
             '6-o-phosphonohexopyranosyl

In [43]:
##Parallel functions
#Later reaction rounds are slow given the number of possibilities

In [45]:
from pathos.helpers import mp

In [46]:
#Build and run the reaction, also append the new labeled cpd list
def PipePandaProcess(CpdList):
    
    #build reaction matrix
    ReactionMatrix=BuildReactionMatrixFromCpdList(CpdList)
    
    #Run labeling reaction
    ReactionMatrix.Compounds=ReactionMatrix.Compounds.apply(GenericLabelingReactionMatrixApply)
    
    #Prep step
    ReactionMatrix=UnpackAndFill(ReactionMatrix)
    
    #Add new column for labeled list export later
    ReactionMatrix['NewCpds']=ReactionMatrix.Products
    
    #Get list of cpds - check if you can directly export the cpd list instead of add to DF
    #NewList=pandas.Series(NewLabeledCpdList(ReactionMatrix))
    
    #Convert matrix to English
    ReactionMatrix=ConvertMatrixToEnglish(ReactionMatrix)
        
    return(ReactionMatrix)

In [47]:
#Successful write out in parallel function
def ParallelRxnBuildWriteOut(LabeledList,BuildFunction,RoundNum):
    
    #Split into 12x8=96 chunks - using 16 cores
    SplitList=numpy.array_split(LabeledList,16*32)
    
    #Fill new list of labeled cpds
    NewCpdList=[]
    
    #Build multiprocessing pool object with 16 processors
    pooler=mp.Pool(16)

    #Set up the new file
    with open('TempPandaDF.csv','w') as fp:
        
        #For each result in the Build function using an item from SplitList
        for result in pooler.imap(BuildFunction,SplitList):
            
            #Building new cpd list
            NewCpds=NewLabeledCpdList(result)
            NewCpdList.extend(NewCpds)
            
            #Each result is a Pandas Object, so write it to csv
            result.to_csv(fp,index=False,header=False)

    
    pooler.close()
    pooler.join()

    #Unique items
    NewCpdList=list(set(NewCpdList))
    
    #Write File
    CpdFile='LabeledCpds_FromRound_{0}.csv'.format(RoundNum) #Change
    with open(CpdFile,'w') as output:
        writer=csv.writer(output,lineterminator='\n')
        for val in NewCpdList:
            writer.writerow([val])
    
    #Return CpdList for next round
    return(NewCpdList)


In [48]:
#Successful write out in parallel function
def ParallelRxnBuildWriteOutRd2(LabeledList,BuildFunction,RoundNum):
    
    #No chunks for round 2
    SplitList=numpy.array_split(LabeledList,16)
    
    #Fill new list of labeled cpds
    NewCpdList=[]
    
    #Build multiprocessing pool object with num of cpus
    pooler=mp.Pool(16)

    #Set up the new file
    with open('TempPandaDF.csv','w') as fp:
        
        #For each result in the Build function using an item from SplitList
        for result in pooler.imap(BuildFunction,SplitList):
            
            #Building new cpd list
            NewCpds=NewLabeledCpdList(result)
            NewCpdList.extend(NewCpds)
            
            #Each result is a Pandas Object, so write it to csv
            result.to_csv(fp,index=False,header=False)

    
    pooler.close()
    pooler.join()

    #Unique items
    NewCpdList=list(set(NewCpdList))
    
    #Write File
    CpdFile='LabeledCpds_FromRound_{0}.csv'.format(RoundNum) #Change
    with open(CpdFile,'w') as output:
        writer=csv.writer(output,lineterminator='\n')
        for val in NewCpdList:
            writer.writerow([val])
    
    #Return CpdList for next round
    return(NewCpdList)


In [49]:
def ConvertBackForDict(PandaDF):
    #Convert back to original pandas formatting
    PandaDF.Reactants=PandaDF.Reactants.apply(ConvertBackForDictSub)
    PandaDF.Products=PandaDF.Products.apply(ConvertBackForDictSub)
    return(PandaDF)

In [50]:
def ConvertBackForDictSub(PandaDFColumn):
    PandaDFColumn=re.sub('\'|\[|\]','',PandaDFColumn)
    PandaDFColumn=re.split(', ',PandaDFColumn)
    return(PandaDFColumn)

In [51]:
def AddingDict(ExistingDictionary,LabeledRxnMatrix):
    
    LabeledRxnMatrix=LabeledRxnMatrix.reset_index(drop=True)
    
    for rxns in range(len(LabeledRxnMatrix)):
        
        #Remove any extra spaces
        Reactants=[SpaceRemover.sub('',x) for x in LabeledRxnMatrix.Reactants[rxns]]

        Products=[SpaceRemover.sub('',x) for x in LabeledRxnMatrix.Products[rxns]]
        
        #Add values to the Product keys, both the reactants as well as the reactants' as keys to the enzymes
        if len(Products)==1:
            if LabeledRxnMatrix.Reaction[rxns] in ExistingDictionary[str(Products).strip('[]|\'')][Reactants[0]]:
                pass
            else:
                ExistingDictionary[str(Products).strip('[]|\'')][Reactants[0]].append(LabeledRxnMatrix.Reaction[rxns])
           
        if len(Products)>1:
            for cpd in range(len(Products)):
                if LabeledRxnMatrix.Reaction[rxns] in ExistingDictionary[str(Products[cpd]).strip('[]|\'')][Reactants[0]]:
                    pass
                else:
                    ExistingDictionary[str(Products[cpd]).strip('[]|\'')][Reactants[0]].append(LabeledRxnMatrix.Reaction[rxns])

    return(ExistingDictionary)

In [52]:
def BuildDictionaryFromCsv(RoundNum):
    #start with blank Dict
    RoundDictionary=defaultdict(lambda: defaultdict(list))
    
    #Set up multiprocessing
    #pooler=mp.Pool(mp.cpu_count())
    
    #Read in the Pandas DF by 10000 row chunks - when I build the loops, the TempPandaDF will be
    #a dataframe written to disk as a temporary hold for all the reactions run in a given round
    
    for chunk in pandas.read_csv('TempPandaDF.csv',sep=',',index_col=False,skip_blank_lines=True,names=['Reaction','Reactants','Products','NewCpds'],chunksize=10000):
        #Convert the chunk into a format to be built into dictinoary (i.e. original pandas format)
        chunk=ConvertBackForDict(chunk)
        #Use the chunk to add into the exisitng dictionary
        RoundDictionary=AddingDict(RoundDictionary,chunk)
    
    #Write out dictionary
    output=open('Dictionary_FromRound_{0}.pkl'.format(RoundNum),'wb',-1)
    pickle.dump(RoundDictionary,output,protocol=4)
    print('Round {0} Done'.format(RoundNum))
    
    #Remove the TempPandaDF so it doesn't conflict with the next round
    os.remove('TempPandaDF.csv')
    

In [53]:
##Run parallel reactions

In [54]:
%%time
#Starting from Round 2, use parallel version - note you need a smaller chunk
#returning the next round's labeled list, the reaction results are stored as csv to disk
NextRoundCpdList=ParallelRxnBuildWriteOutRd2(NextRoundCpdList,PipePandaProcess,3)
#Build the dictioanry for a given round, using the TempPandaDF.csv file on disk
BuildDictionaryFromCsv(3)

Round 3 Done
CPU times: user 164 ms, sys: 72 ms, total: 236 ms
Wall time: 601 ms


In [55]:
#Results of Reaction Round 3
#Note this can be run iteratively to keep probing deeper into metabolism
DictionaryRound

defaultdict(<function __main__.MakingDict.<locals>.<lambda>>,
            {'1,6-di-o-phosphonohexopyranose M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-2.7.1.10'],
                          '6-o-phosphonohexopyranose M+6': ['EC-2.7.1.11']}),
             '1-O-phosphonohexopyranose M+6': defaultdict(list,
                         {'6-o-phosphonohexopyranose M+6': ['EC-5.4.2.6',
                           'EC-5.4.2.5'],
                          'D-(+)-Trehalose M+6': ['EC-2.4.1.231',
                           'EC-2.4.1.64']}),
             '2906-23-2 M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-2.7.7.33']}),
             '6-o-phosphonohexopyranose M+6': defaultdict(list,
                         {'1-O-phosphonohexopyranose M+6': ['EC-5.4.2.6',
                           'EC-5.4.2.5'],
                          '6-o-phosphonohexopyranose M+6': ['EC-5.1.3.15']}),
             '6-o-phosphonohexopyranosyl