# Example of the CSV file that gets loaded to Cloud Storage before calling Dataflow

We create a CSV file with the building blocks and the possible positions that each block can occupy. Note: I am using a pipe as the delimiter. If a SMILES string can have a pipe in it, we need to pick another character.

C1CCCC1|1,2 <br>
C1C=CC=CC=1|1 <br>
C1CCCCC1|1,2 <br>
CC1C=CC=CC=1|2,3 <br>
CC1C(C)=CC=CC=1|2 <br>
CC1C=C(C)C=CC=1|1,2 <br>

# Querying Cloud Storage for Building Blocks and Creating all possible combinations

Make sure the Dataflow API is enabled

In [1]:
!gcloud services enable dataflow

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)


In [2]:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import pvalue
import google.auth

In [3]:
options = {
    'project': 'compound-enumeration',
    'temp_location' : 'gs://compound-enumeration-dataflow-temp'
}

In [4]:
options = pipeline_options.PipelineOptions(flags=[], **options)

In [5]:
p = beam.Pipeline(InteractiveRunner(), options)

Below, we need to set whether or not this is reaction involves three reactants. We should already know this when we create the original CSV file. We can do the same thing for 4-way, 5-way, reactions, if necessary. We'll use this flag to tell some of the cross joins whether to do work or just act as pass throughs

In [6]:
## this can be determined when you are creating the csv file
## is this a trinary reaction?
run_third_step = True

In [7]:
# this is the side input method
# the list of building blocks are passed as iterables
# and we create all the possible combinations

def cross_join(left, rights):
    for x in rights:
        yield (left, x)

def cross_join_3way(left,rights, run_step=False):
    if (run_step):
        for x in rights:
            yield (left[0], left[1], x)
    else:
        yield(left[0], left[1])

In [8]:
class Split(beam.DoFn):
    from apache_beam import pvalue
    
    # These tags will be used to tag the outputs of this DoFn.
    
    # In this DoFn we parse each line of the CSV file.
    # The first part of the string is the SMILES string.
    # The second part is turned into a list that gives the possible positions
    # Then we tag each SMILES string to later use in the cross product
    # Notice that these are IF statements, not IF-ELSE statements. So a SMILES string
    # can be labeled multiple times (as we want)
    
    # For reactions with more reactants, just generalize this 
    OUTPUT_TAG_R1 = 'R1'
    OUTPUT_TAG_R2 = 'R2'
    OUTPUT_TAG_R3 = 'R3'

    def process(self, element):
        """
        tags the input as it processes the orignal PCollection
        """
        
        parsed_element = element.split('|')
        smiles = parsed_element[0]
        positions = parsed_element[1].split(',')
                
        if ('1' in positions):
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_R1, smiles)
        if ('2' in positions): 
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_R2, smiles)
        if ('3' in positions):
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_R3, smiles)


In [9]:
# read the CSV file from cloud storage
# and tag the building blocks with their possible positions

csv = (p
       | 'Read from Text' >> beam.io.ReadFromText("gs://mcule-storage/sample.csv")
       | 'Parse Building Blocks' >> beam.ParDo(Split()).with_outputs(
                              Split.OUTPUT_TAG_R1,
                              Split.OUTPUT_TAG_R2,
                              Split.OUTPUT_TAG_R3)
      )

# label the pcollections according to how they've been tagged

R1 = csv[Split.OUTPUT_TAG_R1]
R2 = csv[Split.OUTPUT_TAG_R2]
R3 = csv[Split.OUTPUT_TAG_R3]

# assumption: the reaction will always contain at least two reactants (though we can adjust the code if needed)

R1_R2 = (
     R1
     | "ApplyCrossJoin" >> beam.FlatMap(cross_join, rights=beam.pvalue.AsIter(R2))
 )

# create the reactants list
# notice the cross join here takes an input run_step which determines whether to apply the cross_join_3way function
# or just act as a passthrough

reactants = (
    R1_R2
    | "ApplyCrossJoin3way" >> beam.FlatMap(cross_join_3way, rights=beam.pvalue.AsIter(R3), run_step=run_third_step)
)


In [10]:
ib.show_graph(p)

/usr/bin/dot


In [11]:
ib.show(reactants)

Now reactants is a PCollection that can be passed along to the the next step of the pipeline. 