#### Script: Dataflow Basics

Description: Notebook where we will see the functioning of each transformation discussed during the theory.

EDEM. Master Data Analytics<br>
Professor: Javi Briones

### Setup

In [None]:
# GCP Auth

# Local
!gcloud auth application-default login

# Google Colab
# from google.colab import auth
# auth.authenticate_user()

In [None]:
# Install requirements
!pip install "apache_beam[interactive]"

In [None]:
# Import Python Libraries
import logging
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

### Beam Basics

<img src="../00_DocAux/.images/Beam_Pipeline.png" width="1000"/>

##### 01 Understanding basic concepts: PCollection, PTransform & Pipeline Object

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    (p   
        | "Read Text from a File" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "Show content" >> beam.Map(print))

##### 02 Understanding Core Transformations: DoFn & Map

In [None]:
# Map
def edem_map(element, num):
    return element * num

# DoFn
class edemDoFn(beam.DoFn):

    def __init__(self, num):
        self.num_ = num

    def process(self, element):
        yield element * self.num_

# Pipeline
with beam.Pipeline(InteractiveRunner()) as p:
  data = (
      p 
        | "Create a PCollection" >> beam.Create([1,2,3,4,5])
        | "Map" >> beam.Map(edem_map, num=2)
        | "DoFn" >> beam.ParDo(edemDoFn(4))
        | "Print" >> beam.Map(print)
  )

In [None]:
# PTransform
class edem_PTransform(beam.PTransform):

    # Map
    def edem_map(element):
        return element * 2

    # DoFn
    class edemDoFn(beam.DoFn):

        def process(self, element, num):
            yield element * num
    
    def expand(self,PColl):
        
        PColl_ = (PColl 
            | "Map" >> beam.Map(lambda x: x * 2)
            | "ParDo" >> beam.ParDo(edemDoFn(), num=4)
            | "Print" >> beam.Map(print))
        
        yield PColl_

# Pipeline
with beam.Pipeline(InteractiveRunner()) as p:
    data = (
        p 
            | "Create a PCollection" >> beam.Create([1,2,3,4,5])
    )
    
    data | edem_PTransform() | "Print" >> beam.Map(print)

##### 03 DoFn Lifecycle

In [None]:
from datetime import datetime
class DoFnLifeCycle(beam.DoFn):

  def now(self):
    self._now = datetime.now()
    return self._now

  def __init__(self):
    print("Constructor started at: %s" % self.now())

  def setup(self):
    print("worker started at: %s" % self.now())

  def start_bundle(self):
    print("bundle started at: %s" % self.now())

  def process(self, element):
    words = element.split()
    for word in words:
      print("Processing element: %s" % word)
      yield word.upper()

  def finish_bundle(self):
    print("bundle finished at: %s" % self.now())

  def teardown(self):
    print("worker finished at: %s" % self.now())

with beam.Pipeline(InteractiveRunner()) as p:
  input_data = (
      p 
        | "Reading the input file" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "DoFn Life Cycle" >> beam.ParDo(DoFnLifeCycle())
  )

##### 04 Transformations

In [None]:
# GroupByKey
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')]))

    (data 
        | "Combined" >> beam.GroupByKey()
        | "Print" >> beam.Map(print))

In [None]:
# CoGroupByKey
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')])
    p2 = p | "PCollection 02" >> beam.Create([('Spain', 'Madrid'), ('Spain','Alicante'), ('France', 'Lyon')])

    data = ((p1,p2) | beam.CoGroupByKey())

    data | "Print" >> beam.Map(print)

In [None]:
# Combine
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('User1', 1), ('User2', 5), ('User1', 7)]))

    (data 
        | "Combined" >> beam.CombinePerKey(sum)
        | "Print" >> beam.Map(print))

In [None]:
# Flatten
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create(['New York', 'Los Angeles', 'Miami', 'Chicago'])
    p2 = p | "Pcollection 02" >> beam.Create(['Madrid', 'Barcelona', 'Valencia', 'Malaga'])
    p3 = p | "Pcollection 03" >> beam.Create(['London','Manchester', 'Liverpool'])

    merged = ((p1,p2,p3)| beam.Flatten())

    merged | beam.Map(print)

In [None]:
# Partition
countries = ['Spain', 'USA', 'Switzerland']

def partition_fn(country,num_countries):
    return countries.index(country['country'])

with beam.Pipeline(InteractiveRunner()) as p:

        p1,p2,p3 = (
                p 
                | "PCollection" >> beam.Create([
                        {'country': 'Spain', 'city': 'Valencia'},
                        {'country': 'Spain', 'city': 'Barcelona'},
                        {'country': 'USA', 'city': 'New York'},
                        {'country': 'Switzerland', 'city': 'Zurich'},
                        {'country': 'Switzerland', 'city': 'Geneva'}  
                ])
                | "partition" >> beam.Partition(partition_fn, len(countries))
        )

        p3 | "PCollection for Spain" >> beam.Map(print)
        

##### 05 Streaming

In [None]:
# PubSub
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions(streaming=True)) as p:

    data = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=''))

    data | beam.Map(print)