## Simple transformation.
<details><summary>Click for <b>Spark</b></summary>
<p>

<p>
    
```python
rdd1 = ( sc.parallelize(['one', 'two', 'three', 'four'])
           .map(str.title)
       )
rdd1.collect()
```
</p>

</details>



In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    lines = (
        p | beam.Create(['one', 'two', 'three', 'four'])
          | beam.Map(str.title)
          | beam.Map(print)
    )

# lines is a PCollection object
print('lines = ', lines)

# This is basic Python to do the same thing
x = ['one', 'two', 'three', 'four']
print(list(map(str.title, x)))

## Simple transformation using a lambda.
<details><summary>Click for <b>Spark</b></summary>
<p>

<p>
    
```python
rdd1 = ( sc.parallelize(['one', 'two', 'three', 'four'])
           .map(lambda x : x.title())
       )
rdd1.collect()
```
</p>

</details>
<details><summary>Click for <b>Java</b></summary>
<p>

<p>
    
```java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.TextIO;
import java.util.*;

public class Create1 {
    public static void main(String[] args) {

        String outputsPrefix = "outputs/part";
        Pipeline p = Pipeline.create();
        
        PCollection<String> lines = p.apply(Create.of("one", "two", "three", "four"));
        lines = lines.apply(MapElements.into(TypeDescriptors.strings()).via((String line) -> line.toUpperCase()));
        lines.apply(TextIO.write().to(outputsPrefix));

        p.run().waitUntilFinish();
    }
}

```
</p>

</details>



In [2]:
import apache_beam as beam

with beam.Pipeline() as p:
    lines = (
        p | beam.Create(['one', 'two', 'three', 'four'])
          | beam.Map(lambda x : x.title())
          | beam.Map(print)
    )




One
Two
Three
Four


## The pipe `|` is actually just an operator overload to call the apply method of the pipeline. You would never do this in Python, but it helps to understand what is going on under the hood.

In [3]:
import apache_beam as beam

with beam.Pipeline() as p:
        lines = ((p | 'Create' >> beam.Create(['one', 'two', 'three', 'four']))
             .apply(beam.Map(str.title)) 
             .apply(beam.Map(print))
        )
        



One
Two
Three
Four


## Read from CSV and use Map with lambda.
### Also it's a good idea to name the steps so it's easier to debug and monitor them later.
<br>
<details><summary>Click for <b>Java</b></summary>
<p> 
    
```java

package com.mypackage;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.TextIO;


public class Simple1 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "regions.csv";
         String outputsPrefix = "outputs/part";

        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", MapElements.into(TypeDescriptors.strings()).via((String element) -> element.toUpperCase()));
        regions.apply("Write", TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
}
                       
```
</p>
</details>


In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

regionsfilename = 'regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.Map(lambda x : x.split(','))
          | 'Transform' >> beam.Map(lambda x : (int(x[0]), x[1].upper()))
          | 'Write' >> WriteToText(regionsfilename + '.out')
#          | 'Print' >> beam.Map(print)
    )
    #p.run() # implicit in Python when using with block

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

def splitregion(element):
    #1,North --> ('1','North')
    return tuple(element.split(','))

def splitregion2(element):
    #1,North --> (1,'NORTH')
    x = element.split(',')
    return (int(x[0]), x[1].upper())

regionsfilename = 'regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
            | 'Parse' >> beam.Map(splitregion)
#          | 'Parse' >> beam.Map(splitregion2)
#          | 'Parse' >> beam.Map(lambda x : tuple(x.split(',')))
#          | 'Transform' >> beam.Map(lambda x : (int(x[0]), x[1].upper()))
#          | 'Write' >> WriteToText(regionsfilename + '.out')
          | 'Print' >> beam.Map(print)
    )
    #p.run() # implicit in Pytho when using with block

## Read from CSV and use ParDo

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

class RegionParseTuple(beam.DoFn):
    def process(self,flatt):
        regionid, regionname = element.split(',')
        #return [(int(regionid), regionname)] # ParDo's need to return a list
        yield (int(regionid), regionname) # Can also use yield instead of returning a list
#        yield (int(regionid), regionname.upper()) # Include a transformation instead of doing it as a separate step

regionsfilename = 'regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.ParDo(RegionParseTuple())
#          | 'Upper' >> beam.Map(lambda x : (x[0], x[1].upper()))
          | 'Write' >> WriteToText('regions.out')
    )


In [None]:
! cat regions.out*

## Template showing a full program that can read the command line args

In [None]:
"""A template to import the default package and parse the arguments"""

from __future__ import absolute_import

import argparse
import logging
import re, os

#from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield (int(regionid), regionname) # Can also use yield instead of returning a list

def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the wordcount pipeline."""
  projectid = os.environ.get('PROJECT')
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default=f'gs://{projectid}/regions.csv',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      default = f'gs://{projectid}/regions_output',      
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  # python importregion.py --input gs://projectid/regions1.csv --output gs://projectid/regions1_out --runner 
  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:
    lines = p | 'Read' >> ReadFromText(known_args.input)
    records = lines | 'Split' >> beam.ParDo(RegionParseTuple())
    uppercase = records | 'Uppercase' >> beam.Map(lambda x : (int(x[0]), x[1].upper()))
    uppercase | 'Write' >> WriteToText(known_args.output)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()



## Example of how to create a split ParDo with multiple outputs or splitting the data up to send it down different paths

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText

class OddEvenRegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        if int(regionid) % 2 == 0:
            yield pvalue.TaggedOutput('Even', (int(regionid), regionname, 'Even'))
        else:
            yield pvalue.TaggedOutput('Odd', (int(regionid), regionname, 'Odd'))

regionsfilename = 'regions.csv'
with beam.Pipeline() as p:
    lines = p | 'Read' >> ReadFromText(regionsfilename) 
    # lines would return a tuple of the two tagged outputs
    # unpack the two outputs to two separate variables to process differently
    evens, odds = lines | 'Parse' >> beam.ParDo(OddEvenRegionParseTuple()).with_outputs("Even", "Odd")
    
    print('Evens')
    (evens 
        | 'Upper' >> beam.Map(lambda x : (x[0], x[1].upper()))
        | 'Print Evens' >> beam.Map(print)
    )
    
    print('Odds')
    (
    odds 
        | 'Lower' >> beam.Map(lambda x : (x[0], x[1].lower()))
        | 'Print Odds' >> beam.Map(print)
    )
  

## Example of branching or taking the same data and sending it down multiple paths, such as to group it on two different keys with one read from the source

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        #return [(int(regionid), regionname)] # ParDo's need to return a list
        yield (int(regionid), regionname) # Can also use yield instead of returning a list

regionsfilename = 'regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.ParDo(RegionParseTuple())
    )
    
    # Just use the same variable for the PCollection twice. As long as it's inside the with block for the pipeline
    # it is all one continuous pipeline process
    
    # Branch 1
    (regions 
         | 'Lowercase regions' >> beam.Map(lambda x : (x[0] * 100, x[1].lower()))
         | 'Write' >> WriteToText('regions2.out')
    )
    # Branch 2
    (regions 
         | 'Uppercase regions' >> beam.Map(lambda x : (x[0] * 10, x[1].upper()))
         | 'Print' >> beam.Map(print)
    )



## WithKeys is a convenience function to create the KV tuple that is commonly used to reshape data in preparation for PTransforms like GroupByKey and CoGroupByKey

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield (int(territoryid), territoryname, int(regionid))

territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText(territoriesfilename)
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
                    | 'Territories With Keys' >> beam.util.WithKeys(lambda x : x[2])
#                    | 'With Keys Manually' >> beam.Map(lambda x : (x[2], x))
#                    | 'With Keys Manually removing the key from the second' >> beam.Map(lambda x : (x[2], (x[0], x[1])))
    )
    territories | 'Print KV' >> beam.Map(print)
#    territories | beam.util.Keys() | 'Print Keys' >> beam.Map(print)
#    territories | beam.util.Values() | 'Print Values' >> beam.Map(print)

# (1, 'North')
# (1730, 'Bedford', 1) --> (1, (1730, 'Bedford', 1))
# (1730, 'Bedford', 1) --> (1, (1730, 'Bedford'))


In [None]:
## GroupByKey will cluster the elements as a list under each unique key. The data must be in a KV tuple pair first.

import apache_beam as beam
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
#        yield (int(territoryid), territoryname, int(regionid))  # traditional tuple of 3 elements would require reshaping
        yield (int(regionid), (int(territoryid), territoryname)) # tuple of 2 elements with 2nd being another tuple 

territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#                    | 'Territories With Keys' >> beam.util.WithKeys(lambda x : x[2])
                    | 'Group Territories' >> beam.GroupByKey() 
#                    | 'Print Territories' >> beam.Map(print)
                    | 'Write' >> WriteToText('territories_group.out')
                  )


In [None]:
#! ls territories_group.out*
! cat territories_group.out*

## Flatten is the equivalent of UNION ALL in SQL

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    lines1 = p | 'Create 1' >> beam.Create(['one', 'two', 'three', 'four'])
    lines2 = p | 'Create 2' >> beam.Create(['alpha', 'beta', 'gamma', 'delta'])

    merged = ((lines1, lines2) | 'Merge PCollections' >> beam.Flatten())
    merged | beam.Map(print)


## Combine
### SELECT key, sum(value) as total FROM source GROUP BY key

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', 10), ('a', 20), ('b', 30), ('b', 40), ('c', 50), ('a', 60)])
          | 'Combine' >> beam.CombinePerKey(sum)
          | 'Print' >> beam.Map(print)
    )


## Custom Combine Function

In [None]:
mport apache_beam as beam

class CustomCombine(beam.CombineFn):
    def create_accumulator(self):
        # method defining how to create an empty accumulator
        return dict()

    def add_input(self, accumulator, input):
        # get the input and split it up for easier manipulation
        k, v = input
        # get the values from the accumulator for the input key or initialize it if it's the first time we see this key
        x, y, z = accumulator.get(k, (0, 0, 0))

        # take the max for the first element of the tuple and sum the second element and count for the third
        accumulator[k] = (v[0] if v[0] > x else x, y + v[1], z + 1)
        return accumulator

    def merge_accumulators(self, accumulators):
        # merge the accumulators from the various workers once they have finished accumulating locally
        merged = dict()
        for accum in accumulators:
          for k, v in accum.items():
            x, y, z = merged.get(k, (0, 0, 0))
            merged[k] = (v[0] if v[0] > x else x, y + v[1], z + v[2])
        return merged

    def extract_output(self, accumulator):
        # called when all the works accumulators have been merge to render the final output
        # return the max, the sum, the count and the average for the key
        return {k : (v[0], v[1], v[2], v[1]/v[2]) for k, v in accumulator.items()}
        return accumulator
    

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', (1, 10)), ('a', (2, 20)), 
                                     ('b', (3, 30)), ('c', (5, 50)), 
                                     ('b', (4, 40)), ('a', (6, 60))])
          | 'Combine' >> beam.CombineGlobally(CustomCombine())
          | 'Print' >> beam.Map(print)
    )


## Map vs FlatMap

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create(['Strawberry,Carrot,Eggplant','Tomato,Potato'])
#      | 'Split words' >> beam.Map(lambda x : x.split(','))
      | 'Split words' >> beam.FlatMap(lambda x : x.split(','))
      | beam.Map(print))

## Parsing the line as a tuple

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))

class StartsWithSFilter(beam.DoFn):
    def process(self, element):
        if element[1].startswith('S'):
            yield element
            
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseTuple())
#           | 'Filter 1' >> beam.Filter(lambda x : x[2] % 2 == 0)
#           | 'Filter 2' >> beam.Filter(lambda x : x[1].startswith('S'))
          | 'Filter 2' >> beam.ParDo(StartsWithSFilter())
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


## Parse as dict instead so you can use keys instead of positions to refer to elements

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid' : int(territoryid), 'territoryname' : territoryname, 'regionid'  : int(regionid)}

class StartsWithSFilter(beam.DoFn):
    def process(self, element):
        if element[1].startswith('S'):
            yield element
            
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseDict())
          | 'Filter 1' >> beam.Filter(lambda x : x['regionid'] % 2 == 0)
          | 'Filter 2' >> beam.Filter(lambda x : x['territoryname'].startswith('S'))
#          | 'Print' >> beam.Map(print)
          | 'Write' >> WriteToText('regions.out')
    )


## Parse as a model class based on typing.NamedTuple so you can use properties instead of keys

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

class StartsWithSFilter(beam.DoFn):
    def process(self, element):
        if element.territoryname.startswith('S'):
            yield element
            
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter 1' >> beam.Filter(lambda x : x.regionid % 2 == 0)
          | 'Filter 2' >> beam.Filter(lambda x : x.territoryname.startswith('S'))
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


## Side Inputs with a single value
### Side inputs are about passing extra parameters to a function where the parameters are calculated in the pipeline itself

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))

class LookupRegion(beam.DoFn):
    def process(self, element, uppercase = 0):
        lookuptable = {1:'North', 2:'South', 3:'East', 4:'West'}
        territoryid, territoryname, regionid = element
        region = lookuptable.get(regionid, 'No Region')
        if uppercase == 1:
            region = region.upper()
        yield(territoryid, territoryname, regionid, region)
        
with beam.Pipeline() as p:
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
    )
    
    # This is contrived, but let's just calculate a value that we can pass as a single side input to LookupRegion
    minregion = (
        territories
          | 'Extract RegionID' >> beam.Map(lambda x : x[2])
          | 'MaxTerritories' >> beam.CombineGlobally(lambda elements: min(elements or [None]))
    )

    lookup = (
        territories
#          | beam.ParDo(LookupRegion(), uppercase = 1 ) # This is not a side input but just passing a parameter
#          | beam.ParDo(LookupRegion(), uppercase = minregion ) # fails because minregion is a PCollection not an integer
          | beam.ParDo(LookupRegion(), uppercase = beam.pvalue.AsSingleton(minregion) ) # When the parameter is calculated in the pipeline itself, that makes it a side input
          | 'Print Loopup' >> beam.Map(print)
    )

#    maxregion | 'Print Min' >> beam.Map(print)


## Side input that is a lookup list
### More realistic example where the entire lookup table is read in the pipeline then distributed to each worker as a side input

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsList
from apache_beam.io import ReadFromText

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid': int(regionid), 'regionname': regionname.title()}

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def process(self, element, lookuptable = [{'regionid':1, 'regionname':'North'}, {'regionid':2, 'regionname':'South'}]):
        # {1:'North', 2:'South'}
        territoryid, territoryname, regionid = element
        # Becase the regions PCollection is a different shape, use the following comprehension to make it easier to do a lookup
        lookup = {e['regionid'] : e['regionname'] for e in lookuptable } # {1:'North', 2:'South'}
        yield(territoryid, territoryname, regionid, lookup.get(regionid, 'No Region'))

with beam.Pipeline() as p:
    regions = (
        p | 'Read Regions' >> ReadFromText('regions.csv')
          | 'Parse Regions' >> beam.ParDo(RegionParseDict())
#          | 'Print Regions' >> beam.Map(print)
    )

    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#          | 'Print Territories' >> beam.Map(print)
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), lookuptable = beam.pvalue.AsList(regions))
        | 'Print Loopup' >> beam.Map(print)
    )
        


## Create a nested repeating output
### First create a dataset. Here is python code for the equivalent bq command of bq mk dataflow


In [None]:
# same as doing bq mk dataflow

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set dataset_id to the ID of the dataset to create.
PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'
dataset_id = f"{PROJECT_ID}.dataflow" #.format(client.project)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))



## Run the following in a bq query window to create the table

In [None]:
create table dataflow.region_territory
(regionid NUMERIC
,regionname STRING
,territories ARRAY<STRUCT<territoryid NUMERIC, territoryname STRING>>)

## Sometimes need to manually create a schema for a nested repeating because it cannot use a simple string. In this case we don't really need it but it's included here as a reference in case we do.

In [None]:
from apache_beam.io.gcp.internal.clients import bigquery as bq
region_territory_schema = bq.TableSchema()
regionid = bq.TableFieldSchema(name = 'regionid', type = 'string', mode = 'required')
region_territory_schema.fields.append(regionid)
regionname = bq.TableFieldSchema(name = 'regionname', type = 'string', mode='required')
region_territory_schema.fields.append(regionname)

# A nested field
territories = bq.TableFieldSchema(name = 'territories', type = 'record', mode = 'nullable')
territoryid = bq.TableFieldSchema(name = 'territoryid', type = 'string', mode = 'required')
territories.fields.append(territoryid)
territoryname = bq.TableFieldSchema(name = 'territoryname', type = 'string', mode = 'required')
territories.fields.append(territoryname)

region_territory_schema.fields.append(territories)

print(region_territory_schema)

## The code here is tricky: 
### First parse the two tables into tuples, (regionid, regionname) & (regionid, {'territoryid':territoryid, 'territoryname':territoryname})
### CoGroupByKey yields a shape like (regionid, {'regions':['regionname'], 'territories':[{}]) so we need to reshape it to dicts to write it to BQ


In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield (int(regionid), regionname) # Can also use yield instead of returning a list

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(regionid), {'territoryid': int(territoryid), 'territoryname':territoryname})

regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseTuple())
#                | 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#                    | 'Print Territories' >> beam.Map(print)
                  )
    nested = ( 
        {'regions':regions, 'territories':territories} 
              | 'Nest territories into regions' >> beam.CoGroupByKey()
              | 'Reshape to dict' >> beam.Map(lambda x : {'regionid': x[0], 'regionname': x[1]['regions'][0], 
                                                        'territories': x[1]['territories']})
#              | 'Print' >> beam.Map(print)
    )
    nested | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
                                                                             , project = PROJECT_ID
                                                                             , method = 'STREAMING_INSERTS'
                                                                             )
#    nested | 'Print' >> beam.Map(print)
             
#help(beam.io.WriteToBigQuery)    
#(1, {'regions': ['Eastern'], 'territories': [{'territoryid': 1730, 'territoryname': 'Bedford'}, {'territoryid': 1581, 'territoryname': 'Westboro'}, {'territoryid': 1833, 'territoryname': 'Georgetow'}, {'territoryid': 2116, 'territoryname': 'Bosto
#{'regionid': 1, 'regionname':'Eastern', 'territories' : [{'territoryid':1, 'territoryname':'name1'}, {}, {}]}

## Helper functions to make a generic transform to nest children

In [1]:
import apache_beam as beam

class NestJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToDict(item):
            ret = {self.parent_key : item[0]}
            ret.update(item[1][self.parent_pipeline_name][0])
            ret[self.child_pipeline_name] = item[1][self.child_pipeline_name]
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToDict)
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | NestJoin('regions', 'regionid', 'territories', 'regionid')
    nestjoin | 'Print Nest Join' >> beam.Map(print)
#     nestjoin | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
#                                                                              , project = PROJECT_ID
#                                                                              , method = 'STREAMING_INSERTS'
#                                                                              )



NameError: name 'ReadFromText' is not defined

## Simulate an Outer Join with CoGroup

In [None]:
import apache_beam as beam

class LeftJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToFlatDict(item):
            parent = {self.parent_key : item[0]}
            parent.update(item[1][self.parent_pipeline_name][0])
            ret = []
            for row1 in item[1][self.child_pipeline_name]:
                row = parent.copy()
                row.update(row1)
                ret.append(row)
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToFlatDict)
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | LeftJoin('regions', 'regionid', 'territories', 'regionid')
    nestjoin | 'Print Nest Join' >> beam.Map(print)



## BeamSQL

In [None]:
! docker images
#! docker pull apache/beam_java11_sdk 
#! docker pull apache/beam_java8_sdk 

In [None]:
# %%writefile beamsql1.py
# This code is not running in the notebook
# This example just uses a basic Row object
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

with beam.Pipeline() as p:
    parent = (
            p | 'Create Parent' >> beam.Create([(1, 'One'), (2, 'Two')])
              | 'Map Parent' >> beam.Map(lambda x : beam.Row(parent_id = x[0], parent_name = x[1]))
    )

    child = (
            p | 'Create Child' >> beam.Create([('Uno', 1), ('Due', 2), ('Eins', 1), ('Una', 1), ('Dos', 2)])
              | 'Map Child' >> beam.Map(lambda x : beam.Row(child_name = x[0], parent_id = x[1]))
    )
    
    result = ( {'parent': parent, 'child' : child} 
         | SqlTransform("""
             SELECT p.parent_id, p.parent_name, c.child_name 
             FROM parent as p 
             INNER JOIN child as c ON p.parent_id = c.parent_id
             """)
        | 'Map Join' >> beam.Map(lambda x : f'{x.parent_id} {x.parent_name} {x.child_name}')
        | 'Print Join' >> beam.Map(print)
        )
#     result | beam.Map(..)
#     result | beam.Map(..)
#     parent | 'print parent' >> beam.Map(print)
#     child  | 'print child' >> beam.Map(print)


In [None]:
# %%writefile beamsql2.py
# This code is not running in the notebook
# This example uses a simple class to handle the schemas
import apache_beam as beam
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing


class Parent(typing.NamedTuple):
    parent_id: int
    parent_name: str
beam.coders.registry.register_coder(Parent, beam.coders.RowCoder)

class Child(typing.NamedTuple):
    child_name: str
    parent_id: int
beam.coders.registry.register_coder(Child, beam.coders.RowCoder)
        

with beam.Pipeline() as p:
    parent = (
            p | 'Create Parent' >> beam.Create([(1, 'One'), (2, 'Two')])
              | 'Map Parent' >> beam.Map(lambda x : Parent(parent_id = x[0], parent_name = x[1])).with_output_types(Parent)
              | 'Map for Print' >> beam.Map(print)
    )

    child = (
            p | 'Create Child' >> beam.Create([('Uno', 1), ('Due', 2), ('Eins', 1), ('Una', 1), ('Dos', 2)])
              | 'Map Child' >> beam.Map(lambda x : Child(child_name = x[0], parent_id = x[1])).with_output_types(Child)
#               | 'SQL Child' >> SqlTransform("""SELECT 10 * parent_id as parent_id, upper(child_name) as child_name from PCOLLECTION""")
#               | 'Print Map' >> beam.Map(lambda x : f'{x.parent_id} = {x.child_name}')
              | 'SQL Child' >> SqlTransform("""SELECT parent_id, count(*) as cnt from PCOLLECTION GROUP BY parent_id""")
              | 'Map for Print' >> beam.Map(lambda x : f'{x.parent_id} = {x.cnt}')
              | 'Print SQL' >> beam.Map(print)
    )

In [None]:
%%writefile beamsql3.py
# This code is not running in the notebook
# This example is like example 2 but for a real file 
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int

coders.registry.register_coder(Territory, coders.RowCoder)
        
@beam.typehints.with_output_types(Territory)
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname.title(), int(regionid))

        
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
#                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass()).with_output_types(Territory)
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'SQL Territories' >> SqlTransform("""SELECT regionid, count(*) as `cnt` FROM PCOLLECTION GROUP BY regionid""")
                    | 'Map Territories for Print' >> beam.Map(lambda x : f'{x.regionid} - {x.cnt}')
                    | 'Print SQL' >> beam.Map(print)
                    )
    
#https://www.youtube.com/watch?v=zx4p-UNSmrA

In [None]:
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'SQL Territories' >> SqlTransform("""SELECT regionid, count(*) as `cnt` FROM PCOLLECTION GROUP BY regionid""")
                    )

                    territories | 'Write to BQ' >> beam.WriteToBQ()
                    territories | 'Write TO File' >> beam.WriteToText()
#    p.run()


In [None]:
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                  )
                  
                (territories | 'SQL Territories' >> SqlTransform("""SELECT regionid, count(*) as `cnt` FROM PCOLLECTION GROUP BY regionid""")
                            | 'Write to BQ' >> beam.WriteToBQ()
                )
                (territories | 'SQL Territories2' >> SqlTransform("""SELECT territoryid, count(*) as `cnt` FROM PCOLLECTION GROUP BY regionid""")
                            | 'Write to BQ' >> beam.WriteToBQ()
                    territories | 'Write TO File' >> beam.WriteToText()
                )
#    p.run()


## DoFn Lifecycle

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def setup(self):
        self.lookup = {1:'North', 2:'South', 3:'East', 4:'West'}
        print('setup')
        
    def start_bundle(self):
        print('start bundle')
        
    def process(self, element, uppercase = 0):
        #lookuptable = {1:'North', 2:'South', 3:'East', 4:'West'}
        territoryid, territoryname, regionid = element
        region = self.lookup.get(regionid, 'No Region')
        if uppercase == 1:
            region = region.upper()
        yield(territoryid, territoryname, regionid, region)
        
    def finish_bundle(self):
        print('finish bundle')

    def teardown(self):
        print('teardown')
        del self.lookup
    

with beam.Pipeline() as p:
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), uppercase = 1 ) 
        | 'Print Loopup' >> beam.Map(print)
    )
        


In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsList
from apache_beam.io import ReadFromText, WriteToText

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid': int(regionid), 'regionname': regionname.title()}

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
class LookupRegion(beam.DoFn):
    def __init__(self):
        print('init')
        #self.lookup = {1:'north', 2:'south', 3:'east', 4:'west'}
        #self.init_semaphore = False

    def called_once(self, lookuptable):
        print('called_once')
        self.lookup = { e['regionid'] : e['regionname'] for e in lookuptable }
        self.init_semaphore = False

    def setup(self):
        print('setup')
        self.init_semaphore = True
        #self.lookup = {1:'NORTH', 2:'South', 3:'East', 4:'West'}

    def start_bundle(self):
        print('start bundle')

    def process(self, element, lookuptable = [{'regionid':1, 'regionname':'north'}, {'regionid':2, 'regionname':'south'}]):
        if self.init_semaphore:
            self.called_once(lookuptable)
        territoryid, territoryname, regionid = element
        yield(territoryid, territoryname, regionid, self.lookup.get(regionid, 'No Region'))

    def finish_bundle(self):
        print('finish bundle')

    def teardown(self):
        print('teardown')
        del self.lookup
        del self.init_semaphore
                


with beam.Pipeline() as p:

    regions = (
        p | 'Read Regions' >> ReadFromText('regions.csv')
          | 'Parse Regions' >> beam.ParDo(RegionParseDict())
    )

    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Spare Territories' >> beam.ParDo(TerritoryParseTuple())
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), lookuptable = beam.pvalue.AsList(regions))
        | 'Print Loopup' >> beam.Map(print)
    )
        


## Code below is not working

In [None]:
import json
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.transforms.sql import SqlTransform
#from collections import namedtuple
from typing import NamedTuple
from apache_beam import coders


RegionSchema = namedtuple("RegionSchema", ("regionid", "regionname"))
#coders.registry.register_coder(RegionSchema, coders.RowCoder)
class RegionSplitSchema(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield RegionSchema(int(regionid), regionname.title())

#TerritorySchema = namedtuple("TerritorySchema", ("territoryid", "territoryname", "regionid"))
TerritorySchema = NamedTuple("TerritorySchema", [("territoryid", int), ("territoryname", str), ("regionid", int)])
coders.registry.register_coder(TerritorySchema, coders.RowCoder)
class TerritorySplitSchema(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield TerritorySchema(int(territoryid), territoryname.title(), int(regionid))

        
# class TerritorySplitNamedTuple(beam.DoFn):
#     def process(self, element):
#         territoryid, territoryname, regionid = element.split(',')
#         yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}

regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
print('Start')
with beam.Pipeline() as p:
#     regions = (
#               p | 'Read Regions' >> ReadFromText(regionsfilename)
#                 | 'Split Regions' >> beam.ParDo(RegionSplitSchema()).with_output_types(RegionSchema)
#               )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Split Territories' >> beam.ParDo(TerritorySplitSchema()) #.with_output_types(TerritorySchema)
#                    | beam.Map(lambda x: TerritorySchema(x.territoryid, x.territoryname, x.regionid)).with_output_types(TerritorySchema)
#         Map(lambda x: PythonSchema(f_int32=x)).with_output_types(PythonSchema)
#                     | 'Apply Territories Schema' >> beam.Map(lambda x : beam.Row(territoryid = int(x.territoryid)
#                                                                                  , territoryname = str(x.territoryname)
#                                                                                  , regionid = int(x.regionid)))
#                    | 'Convert to Dictionary' >> beam.Map(lambda row : {"regionid" : row.regionid, "territoryid" : row.territoryid, "territoryname" : row.territoryname})
#                     | SqlTransform("""
#                         SELECT regionid, territoryname as name, territoryid 
#                         FROM PCOLLECTION
#                         """)
#                    | 'Split Territories' >> beam.ParDo(TerritorySplitSchema()).with_output_types(TerritorySchema)
#                     | SqlTransform("""
#                         SELECT regionid, count(*) as territories
#                         FROM PCOLLECTION
#                         GROUP BY regionID
#                         ORDER BY territories DESC
#                         """)
#                    | 'Convert to dictionary' >> beam.Map(lambda row : {"regionid": row.regionid, "territories": row.territories})
                    
#             })
                  )

#     regions | 'Print regions' >> beam.Map(print)
    territories | 'Print territories' >> beam.Map(print)

print('Done')


In [None]:
# with beam.Pipeline() as pipeline:
#     _ = (
#         pipeline
#         | beam.io.ReadFromPubSub(
#             topic='projects/pubsub-public-data/topics/taxirides-realtime',
#             timestamp_attribute="ts").with_output_types(bytes)
#         | "Parse JSON payload" >> beam.Map(json.loads)
#         # Use beam.Row to create a schema-aware PCollection
#         | "Create beam Row" >> beam.Map(
#             lambda x: beam.Row(
#                 ride_status=str(x['ride_status']),
#                 passenger_count=int(x['passenger_count'])))
#         # SqlTransform will computes result within an existing window
#         | "15s fixed windows" >> beam.WindowInto(beam.window.FixedWindows(15))
#         # Aggregate drop offs and pick ups that occur within each 15s window
#         | SqlTransform(
#             """
#              SELECT
#                ride_status,
#                COUNT(*) AS num_rides,
#                SUM(passenger_count) AS total_passengers
#              FROM PCOLLECTION
#              WHERE NOT ride_status = 'enroute'
#              GROUP BY ride_status""")
#         # SqlTransform yields python objects with attributes corresponding to
#         # the outputs of the query.
#         # Collect those attributes, as well as window information, into a dict
#         | "Assemble Dictionary" >> beam.Map(
#             lambda row,
#             window=beam.DoFn.WindowParam: {
#                 "ride_status": row.ride_status,
#                 "num_rides": row.num_rides,
#                 "total_passengers": row.total_passengers,
#                 "window_start": window.start.to_rfc3339(),
#                 "window_end": window.end.to_rfc3339()
#             })
#         | "Convert to JSON" >> beam.Map(json.dumps)
#         | "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
#         | beam.Map(print)
#         #| beam.io.WriteToPubSub(topic=output_topic))
#     )


# if __name__ == '__main__':
#   logging.getLogger().setLevel(logging.INFO)
#   import argparse

#   parser = argparse.ArgumentParser()
#   parser.add_argument(
#       '--output_topic',
#       dest='output_topic',
#       required=True,
#       help=(
#           'Cloud PubSub topic to write to (e.g. '
#           'projects/my-project/topics/my-topic), must be created prior to '
#           'running the pipeline.'))
#   known_args, pipeline_args = parser.parse_known_args()

#   run(known_args.output_topic, pipeline_args)


In [None]:
lookuptable = [{'regionid':1, 'regionname':'North'}, {'regionid':2, 'regionname':'South'}]
lookup = {e['regionid'] : e['regionname'] for e in lookuptable }
print(lookup)

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsIter, AsSingleton, AsList, AsDict
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.io import ReadFromAvro, WriteToAvro
from collections import namedtuple
from apache_beam import coders
from apache_beam.typehints.decorators import with_output_types


class Region:
    def __init__(self, regionid, regionname):
        self.regionid = regionid
        self.regionname = regionname
        
    def __str__(self):
        return f'{self.regionid}|{self.regionname}'

#     def encode(self, o):
#         """Encode to bytes with a trace that coder was used."""
#         # Our encoding prepends an 'x:' prefix.
#         return b'x:%s' % o.encode('utf-8')

#     def decode(self, s):
#         # To decode, we strip off the prepended 'x:' prefix.
#         s = s.decode('utf-8')
#         #assert s[0:2] == 'x:'
#         params = s[0:2].split('|')
#         return Region(*params)

#     def is_deterministic(self):
#         # Since coded Player objects are used as keys below with
#         # beam.CombinePerKey(sum), we require that this coder is deterministic
#         # (i.e., two equivalent instances of the classes are encoded into the same
#         # byte string) in order to guarantee consistent results.
#         return True
    
class RegionSplitClass(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield Region(int(regionid), regionname.title())

# class RegionCoder(coders.Coder):
#   """A custom coder for the RegionSchema"""
#   def encode(self, o):
#     """Encode to bytes with a trace that coder was used."""
#     # Our encoding prepends an 'x:' prefix.
#     return b'x:%s' % o.encode('utf-8')

#   def decode(self, s):
#     # To decode, we strip off the prepended 'x:' prefix.
#     s = s.decode('utf-8')
#     #assert s[0:2] == 'x:'
#     params = s[0:2].split('|')
#     return Region(*params)

#   def is_deterministic(self):
#     # Since coded Player objects are used as keys below with
#     # beam.CombinePerKey(sum), we require that this coder is deterministic
#     # (i.e., two equivalent instances of the classes are encoded into the same
#     # byte string) in order to guarantee consistent results.
#     return True
# coders.registry.register_coder(Region, RegionCoder)

# @with_output_types(typing.Tuple[Region, int])
# def get_regions(descriptor):
#   name, points = descriptor.split(',')
#   return Player(name), int(points)


# RegionSchema = namedtuple("RegionSchema", ("regionid", "regionname"))
# class RegionSplitSchema(beam.DoFn):
#     def process(self, element):
#         regionid, regionname = element.split(',')
#         yield RegionSchema(int(regionid), regionname.title())

class RegionSplitDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid': int(regionid), 'regionname': regionname.title()}


class TerritorySplit(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
#        yield (int(regionid), (territoryid, territoryname.title())) 
        
                
def lookup_region(left, right):
    territoryid, territoryname, regionid = left
    yield territoryid, territoryname, regionid
#    yield (territoryid, territorynme, regionid, right.get(regionid, 'No Region'))


class LookupRegion(beam.DoFn):
    def process(self, element, lookuptable = [{'regionid':1, 'regionname':'North'}, {'regionid':2, 'regionname':'South'}]):
#        yield element
        territoryid, territoryname, regionid = element
        lookup = {e['regionid'] : e['regionname'] for e in lookuptable }
        yield(territoryid, territoryname, regionid, lookup.get(regionid, 'No Region'))
# #        yield (int(regionid), (territoryid, territoryname.title())) 


# def dummy(element):
#     return element
# #     regionid = element[0]
# #     territoryid, territoryname = element[1]
# #     return (territoryid, territoryname, regionid)

with beam.Pipeline() as p:
    regions = (
        p | 'Read Regions' >> ReadFromText('regions.csv')
          | 'Split Regions' >> beam.ParDo(RegionSplitDict())
          #| 'Split Regions' >> beam.ParDo(RegionSplitClass())
#          | 'Print Regions' >> beam.Map(print)
    )

#     regions = {1:"North", 2:"South", 3:"East", 4:"West"}
#     regions = p | 'Create Regions' >> beam.Create([(1, 'North'), (2, 'South')])

    
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Split Territories' >> beam.ParDo(TerritorySplit())
#          | 'Print Territories' >> beam.Map(print)
    )
    
    join = (
        territories
          #| 'Lookup Region' >> beam.Map(dummy)
#          | 'Lookup Region' >> beam.Map(lookup_region, right = beam.pvalue.AsList(regions))
#        | beam.ParDo(LookupRegion())
        | beam.ParDo(LookupRegion(), lookuptable = beam.pvalue.AsList(regions))
        | beam.Map(print)
    )
        


In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}

class UnnestCoGrouped(beam.DoFn):
    def process(self, item, child_pipeline, parent_pipeline):
        k, v = item
        child_dict = v[child_pipeline]
        parent_dict = v[parent_pipeline]
        for child in child_dict:
            try:
                child.update(parent_dict[0])
                yield child
            except IndexError:
                yield child

class LeftJoin(beam.PTransform):
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key

    def expand(self, pcols):
        def _format_as_common_key_tuple(child_dict, child_key):
            return (child_dict[child_key], child_dict)

        return ({
                pipeline_name: pcol1 | f'Convert to ({self.parent_key} = {self.child_key}, object) for {pipeline_name}' 
                >> beam.Map(_format_as_common_key_tuple, self.child_key)
                for (pipeline_name, pcol1) in pcols.items()}
                | f'CoGroupByKey {pcols.keys()}' >> beam.CoGroupByKey()
                | 'Unnest Cogrouped' >> beam.ParDo(UnnestCoGrouped(), self.child_pipeline_name, self.parent_pipeline_name)
        )
        
regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    leftjoin = {'regions':regions, 'territories':territories} | LeftJoin('regions', 'regionid', 'territories', 'regionid')
    leftjoin | 'Print Left Join' >> beam.Map(print)




In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class NestJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_name, parent_key, child_name, child_key):
        self.parent_name = parent_name
        self.parent_key = parent_key
        self.child_name = child_name
        self.child_key = child_key

    @staticmethod
    def excludeKeysFromDict(d, keyset):
        return {k:v for k,v in d.items() if k in set(d.keys()).difference(keyset)}
        
    @staticmethod
    def reshapeToKV(item, key):
        # pipeline object should be a dictionary
        return (item[key], NestJoin.excludeKeysFromDict(item, {key}))

    def reshapeCoGroupToDict(self, item):
        ret = {self.parent_key: item[0]
              , **item[1][self.parent_name][0]
              , self.child_name: item[1][self.child_name]}
        return ret

    def expand(self, pcols):
        return (
                {
                self.parent_name : pcols[self.parent_name] | f'Convert {self.parent_name} to KV' 
                    >> beam.Map(self.reshapeToKV, self.parent_key)
                ,self.child_name : pcols[self.child_name] | f'Convert {self.child_name} to KV'
                    >> beam.Map(self.reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_name} into {self.parent_name}' >> beam.CoGroupByKey()
                  | f'Reshape to dictionary' >> beam.Map(self.reshapeCoGroupToDict)
               )

class LeftJoin(NestJoin):
    '''
    Overloads the reshapeCoGroupToDict method to flatten out all the children to produce a traditional JOIN result
    '''
    def reshapeCoGroupToDict(self, item):
        ret = [{self.parent_key: item[0]
              , **item[1][self.parent_name][0]
              , **row}
              for row in item[1][self.child_name]]
        return ret

    
    
class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'regions.csv'
territoriesfilename = 'territories.csv'
PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | LeftJoin('regions', 'regionid', 'territories', 'regionid')
    nestjoin | 'Print Nest Join' >> beam.Map(print)
