# Apache Beam 

* With beam we can process data for streaming or batch
* We can choose our runner, like spark or dataflow
* Beam works in parallel

## Showing the results

to show the elements we can use .LogElements() or .Map(print)

In [5]:
#!pip install apache-beam
#!pip install apache-beam[interactive]

In [81]:
import apache_beam as beam

with beam.Pipeline() as p:

  (p | beam.Create(['Hello Beam'])
     | beam.LogElements())
  


Hello Beam


## Branching

In [7]:
import matplotlib.pyplot as plt

In [8]:
import apache_beam as beam

with beam.Pipeline() as p:

  hello_beam = (p 
     | beam.Create(['Hello Beam']))
  
  hello = (hello_beam
    | beam.Map(lambda x: x.split()[0])
    | "Print Hello" >> beam.Map(print))
    
  beam = (hello_beam 
    | beam.Map(lambda x: x.split()[1])
    | "Print Beam" >> beam.Map(print))  


Beam
Hello


## Combiners

### We can combine values in apache beam i a lot of ways

* Simple Aggregation
* byKey
* byElements

In this example we will use the Count method in the three ways

In [35]:
import apache_beam as beam

my_array_of_fruits = [
    ("laranja", 1),
    ("maça", 1),
    ("laranja", 1),
    ("maça", 2),
    ("banana", 4)
  ]

print("How many itens has in my array")

with beam.Pipeline() as p:

  numbers = (p | beam.Create(my_array_of_fruits))

  count = (numbers 
         | beam.combiners.Count.Globally()
         | "count" >> beam.LogElements())
  
print("\nCalculating repetead keys")

with beam.Pipeline() as p:
  numbers = (p | beam.Create(my_array_of_fruits))

  count_by_key = (numbers 
           | beam.combiners.Count().PerKey()
           | beam.LogElements())
  
print("\nCalculating repetead elements")
with beam.Pipeline() as p:
  numbers = (p | beam.Create(my_array_of_fruits))
  
  count_by_elements = ((numbers 
           | beam.combiners.Count().PerElement()
           | beam.LogElements()))

How many itens has in my array
5

Calculating repetead keys
('laranja', 2)
('maça', 2)
('banana', 1)

Calculatind repetead elements
(('laranja', 1), 2)
(('maça', 1), 1)
(('maça', 2), 1)
(('banana', 4), 1)


Also we can use .CombinePerKey() and choose our function like .CombinePerKey(sum)

### Here are some built-in methos in Beam

* .Top
* .Mean
* .ToSet
* .ToDict
* .Sample

In [59]:
# Also we have .perKey() in Top
with beam.Pipeline() as p:

  numbers = (p | beam.Create(range(1, 11)))
  
  smallest = (numbers
     | beam.combiners.Top.Smallest(3)
     | "smallest" >> beam.LogElements())
  
  largest = (numbers
     | beam.combiners.Top.Largest(3)
     | "largest" >> beam.LogElements())
  
  average = (numbers
     | beam.combiners.Mean().Globally()
     | "average" >> beam.LogElements())

  transform_in_set = (numbers
      | beam.combiners.ToSet()
      | "transform in set" >> beam.LogElements())

{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
5.5
[1, 2, 3]
[10, 9, 8]


### For complex combining we can create our own functions
* .CombineFn

* SimpleFunctions*

In [74]:
class OddEvenCounter(beam.CombineFn):
    def create_accumulator(self):
        return {'odd_count': 0, 'even_count': 0}           # This is a space to store our values
    
    def add_input(self, accumulator, element):
        if element % 2 == 0:
            accumulator['even_count'] += 1              # Here we are adding value in our space
        else:
            accumulator['odd_count'] += 1
        return accumulator
            
    def merge_accumulators(self, accumulators):
        result = {'odd_count': 0, 'even_count': 0}
        for accumulator in accumulators:                    # Now we are grouping all our spaces and aggregating them
            result['odd_count'] += accumulator['odd_count']
            result['even_count'] += accumulator['even_count']
        return result
    
    def extract_output(self, accumulator):               # here we return our output
        return accumulator

    
with beam.Pipeline() as p:

  (p | beam.Create([10, 3, 5, 70, 90])
     | beam.CombineGlobally(OddEvenCounter())
     | beam.LogElements())

{'odd_count': 2, 'even_count': 3}


In [80]:
def multiply(numbers):
  total = 1

  for num in numbers:
      total *= num

  return total
    
with beam.Pipeline() as p:

  (p | beam.Create([10, 3, 5, 70, 90])
     | beam.CombineGlobally(multiply)
     | beam.LogElements())

945000


# Mapping And Filter

.map

In [103]:
with beam.Pipeline() as p:
    (p | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
     | beam.Map(lambda x: x.split())
     | beam.LogElements())

['Bahia', 'BA']
['São', 'Paulo', 'SP']
['Rio', 'De', 'Janeiro', 'RJ']


.flatmap

In [104]:
# Do an action in 
with beam.Pipeline() as p:
    (p | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
     | beam.FlatMap(lambda x: x.split())
     | beam.LogElements())

Bahia
BA
São
Paulo
SP
Rio
De
Janeiro
RJ


.ParDo

In [148]:
# Filtering only states from Nordeste in Brazil
class Nordeste(beam.DoFn):
    def process(self, element, lower_case):
        siglas_nordeste = ['AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE']

        if element.split()[1] in siglas_nordeste:
            if lower_case==True:
                yield element.lower()
            else:
                yield element


with beam.Pipeline() as p:
    (p | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
     | beam.ParDo(Nordeste(), lower_case=False) # we can pass other arguments to the ParDo function
     | beam.LogElements())

Bahia BA


.Filter

In [109]:
siglas_nordeste = ['AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE']

# Filtering only states from Nordeste in Brazil
with beam.Pipeline() as p:
    (p | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
     | beam.Filter(lambda x: x.split()[1] in siglas_nordeste)
     | beam.LogElements())

Bahia BA


.WithKeys

In [111]:
# Select the keys of the elements
with beam.Pipeline() as p:
    (p | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
     | beam.WithKeys(lambda x: x.split()[-1])
     | beam.LogElements())

('BA', 'Bahia BA')
('SP', 'São Paulo SP')
('RJ', 'Rio De Janeiro RJ')


## Slicing and Partition

slicing with tags

ideal for parallel operations

In [115]:
import apache_beam as beam
from apache_beam import pvalue

# Filtering only states from Nordeste in Brazil
class Nordeste(beam.DoFn):
    def process(self, element):
        siglas_nordeste = ['AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE']

        if element.split()[1] in siglas_nordeste:
            yield element
        else:
            yield pvalue.TaggedOutput('Sudeste', element) # Atribute a Tag for the else elements

# Creating Tags to our states based in out DoFn filter
with beam.Pipeline() as p:
    results = (p 
               | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
               | beam.ParDo(Nordeste()).with_outputs('Sudeste', main='Nordeste'))

    results['Nordeste'] | 'Nordeste' >> beam.LogElements(prefix='From Nordeste > ')
    results['Sudeste'] | 'Sudeste' >> beam.LogElements(prefix='From Sudeste > ')

From Nordeste > Bahia BA
From Sudeste > São Paulo SP
From Sudeste > Rio De Janeiro RJ


.partition

(ideal to no parallel operations)

In [137]:
import apache_beam as beam
from apache_beam import pvalue

# Filtering only states from Nordeste in Brazil
def partitionFn(element, num_partitions):
    siglas_nordeste = ['AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE']

    if element.split()[1] in siglas_nordeste:
        return 0
    else:
        return 1 # Atribute a Tag for the else elements

# Creating Tags to our states based in out DoFn filter
with beam.Pipeline() as p:
    results = (p 
               | beam.Create(['Bahia BA', 'São Paulo SP', 'Rio De Janeiro RJ'])
               | beam.Partition(partitionFn, 2)
    )

    results[0] | 'Nordeste' >> beam.LogElements(prefix='From Nordeste > ')
    results[1] | 'Sudeste' >> beam.LogElements(prefix='From Sudeste > ')

From Nordeste > Bahia BA
From Sudeste > São Paulo SP
From Sudeste > Rio De Janeiro RJ


.Flatten

In [134]:
with beam.Pipeline() as p:
    pip1 = (p | "pip1" >> beam.Create([1,2,3]))
    pip2 = (p | "pip2" >> beam.Create([4,5,6]))

    pip3 = ((pip1, pip2) 
            | beam.Flatten()
            | beam.LogElements())

1
2
3
4
5
6


# Modulate our Pipeline
We can create a class with beam.PTransform, and create steps that repeat in ours pipelines

In [130]:
from typing import Optional
import apache_beam as beam
from apache_beam import pvalue


class Stats(beam.PTransform):

    def __init__(self, method):
        self.method = method

    class Sum(beam.DoFn):
        def process(self, element):
            return [element[0]+element[1]]
    
    class Multiply(beam.DoFn):
        def process(self, element):
            return [element[0]*element[1]]
    
    class Divide(beam.DoFn):
        def process(self, element):
            return [element[0]/element[1]]

    class others_functions(beam.DoFn):
        pass

    def expand(self, pcoll):
        if self.method == "Mult":
            return (pcoll |
                    beam.ParDo(Stats.Multiply())
        )
        else:
            return(pcoll)

# Creating Tags to our states based in out DoFn filter
with beam.Pipeline() as p:
    results = (p 
               | beam.Create([[7,6],[10,10],[15,5],[9,7]])
               | Stats('Mult')
               | beam.LogElements())

42
100
75
63
