<a href="https://colab.research.google.com/github/minskim0327/apache-beam-study/blob/main/Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Init

In [None]:
!{'pip install apache-beam'}

Collecting apache-beam
  Downloading apache_beam-2.52.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam)
  Downloading orjson-3.9.10-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.7/138.7 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m19.2 MB/s[0m eta [36m0:00

In [None]:
!{'mkdir -p data'}

In [None]:
!ls

data  sample_data


In [None]:
from google.colab import files
uploaded = files.upload()

Saving dept_data.txt to dept_data.txt


In [6]:
!ls

data  dept_data.txt  sample_data


## Transformations in Beam

In [17]:
import apache_beam as beam

def SplitRow(element):
  return element.split(',')

def filtering(record):
  return record[3] == 'Accounts'

# with beam.Pipeline() as p1:

# Create and give pipeline a name
p1 = beam.Pipeline()

attendance_count = (
    p1
     |'Read from file' >> beam.io.ReadFromText('dept_data.txt')  # Initial PCollection by reading data from a source
     |'Map transform' >> beam.Map(SplitRow) # Ptransforms according to requirements
     |beam.Filter(filtering)
     |beam.Map(lambda record: (record[1], 1))
     |beam.CombinePerKey(sum) ## CombinePerKey: GroupByKey + Combiner + Reducer
     |beam.io.WriteToText('data/output_new') # Write PCollection to a source, Run the pipeline

)
p1.run()

# visualize output
!{('head -n 20 data/output_new-00000-of-00001')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


### Branching Pipelines

In [18]:
import apache_beam as beam

def SplitRow(element):
    return element.split(',')

p = beam.Pipeline()


input_collection = (
                      p
                      | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

accounts_count = (
                      input_collection
                      | 'Get all Accounts dept persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
                      | 'Pair each accounts employee with 1' >> beam.Map(lambda record: ("Accounts, " +record[1], 1))
                      | 'Group and sum1' >> beam.CombinePerKey(sum)
                 )

hr_count = (
                input_collection
                | 'Get all HR dept persons' >> beam.Filter(lambda record: record[3] == 'HR')
                | 'Pair each hr employee with 1' >> beam.Map(lambda record: ("HR, " +record[1], 1))
                | 'Group and sum' >> beam.CombinePerKey(sum)
           )

output =(
         (accounts_count,hr_count)
    | beam.Flatten()
    | beam.io.WriteToText('data/both')
)



p.run()

# Sample the first 20 results, remember there are no ordering guarantees.
!{('head -n 20 data/both-00000-of-00001')}

('Accounts, Marco', 31)
('Accounts, Rebekah', 31)
('Accounts, Itoe', 31)
('Accounts, Edouard', 31)
('Accounts, Kyle', 62)
('Accounts, Kumiko', 31)
('Accounts, Gaston', 31)
('Accounts, Ayumi', 30)
('HR, Beryl', 62)
('HR, Olga', 31)
('HR, Leslie', 31)
('HR, Mindy', 31)
('HR, Vicky', 31)
('HR, Richard', 31)
('HR, Kirk', 31)
('HR, Kaori', 31)
('HR, Oscar', 31)


### Word Count Assignment

In [20]:
from google.colab import files
uploaded = files.upload()

Saving wordcount_data.txt to wordcount_data.txt


Required Transformations are: ReadFromText, Map, FlatMap, CombinePerKey, WriteToText

In [28]:
import apache_beam as beam
import re

inputs_pattern = 'wordcount_data.txt'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
pipeline = beam.Pipeline()

output = (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)

  )

pipeline.run()

# Sample the first 20 results, remember there are no ordering guarantees.
# run('head -n 20 {}-00000-of-*'.format(outputs_prefix))
!{('head -n 20 outputs/part-00000-of-00001')}



('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)


### ParDo Transform

A ParDo transform takes each element of input Pcollection, performs processing function on it and emits 0, 1, or multiple elements.

Functionalities:
- Filtering
- Formatting or Type Conversion
- Extracting individual parts
- Computations

In [31]:
import apache_beam as beam

class SplitRow(beam.DoFn):
  def process(self, element):
    # return type -> list
    return [element.split(',')]

class FilterAccountsEmployee(beam.DoFn):
  def process(self, element):
    # return type -> list
    if element[3] == 'Accounts':
      return [element]

class PairEmployees(beam.DoFn):
  def process(self, element):
    # return type -> list
    return [(element[3]+","+element[1], 1)]

class Counting(beam.DoFn):
  def process(self, element):
    # return type -> list
    (key, values) = element
    return [(key, sum(values))]

# Create and give pipeline a name
p1 = beam.Pipeline()

attendance_count = (
    p1
     |'Read from file' >> beam.io.ReadFromText('dept_data.txt')
     |'Map transform' >> beam.ParDo(SplitRow())
     |beam.ParDo(FilterAccountsEmployee())
     |beam.ParDo(PairEmployees())
     |'Group' >> beam.GroupByKey()
     |'Sum using ParDo' >> beam.ParDo(Counting())
     |beam.io.WriteToText('data/output_new_final') # Write PCollection to a source, Run the pipeline

)
p1.run()

# visualize output
!{('head -n 20 data/output_new_final-00000-of-00001')}



('Accounts,Marco', 31)
('Accounts,Rebekah', 31)
('Accounts,Itoe', 31)
('Accounts,Edouard', 31)
('Accounts,Kyle', 62)
('Accounts,Kumiko', 31)
('Accounts,Gaston', 31)
('Accounts,Ayumi', 30)


### Advanced Combiner of Beam

Comibiner is a mini reducer which does the reduce task locally to a mapper machine.

List of Combiner methods:

- create_accumulator: creates a new local accumulator in each machine. Keep record of (sum, counts)
- add_input: Adds an input element to accumulator, returning new (sum, count) value
- megre_accumulators: merges all machine accumulators into a single one
- extract_ouput: performs the final computation

In [32]:
import apache_beam as beam

p = beam.Pipeline()

class AverageFn(beam.CombineFn):

  def create_accumulator(self):
     return (0.0, 0)   # initialize (sum, count)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    ind_sums, ind_counts = zip(*accumulators)       # zip - [(27, 3), (39, 3), (18, 2)]  -->   [(27,39,18), (3,3,2)]
    return sum(ind_sums), sum(ind_counts)        # (84,8)

  def extract_output(self, sum_count):
    (sum, count) = sum_count    # combine globally using CombineFn
    return sum / count if count else float('NaN')


small_sum = (
           p
            | beam.Create([15,5,7,7,9,23,13,5])
            | "Combine Globally" >> beam.CombineGlobally(AverageFn())
            | 'Write results' >> beam.io.WriteToText('data/combine')
          )
p.run()

# Sample the first 20 results, remember there are no ordering guarantees.
!{'head -n 20 data/combine-00000-of-00001'}

10.5


### Composite Transforms

What if we have repeated trasnformations? Can we group them? Yes!

In [37]:
import apache_beam as beam

class MyTransform(beam.PTransform):

  def expand(self, input_coll):

    a = (
        input_coll
                       | 'Group and sum1' >> beam.CombinePerKey(sum)
                       | 'count filter accounts' >> beam.Filter(filter_on_count)
                       | 'Regular accounts employee' >> beam.Map(format_output)

    )
    return a

def SplitRow(element):
    return element.split(',')


def filter_on_count(element):
  name, count = element
  if count > 30:
    return element

def format_output(element):
  name, count = element
  return ((name, str(count), 'Regular employee'))

p = beam.Pipeline()

input_collection = (
                      p
                      | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

accounts_count = (
                      input_collection
                      | 'Get all Accounts dept persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
                      | 'Pair each accounts employee with 1' >> beam.Map(lambda record: ("Accounts, " +record[1], 1))
                      | 'Composite accounts' >> MyTransform()
                      | 'Write results for account' >> beam.io.WriteToText('data/Account')
                 )

hr_count = (
                input_collection
                | 'Get all HR dept persons' >> beam.Filter(lambda record: record[3] == 'HR')
                | 'Pair each hr employee with 1' >> beam.Map(lambda record: ("HR, " +record[1], 1))
                | 'composite HR' >> MyTransform()
                | 'Write results for hr' >> beam.io.WriteToText('data/HR')
           )
p.run()

# Sample the first 20 results, remember there are no ordering guarantees.
!{('head -n 20 data/Account-00000-of-00001')}
!{('head -n 20 data/HR-00000-of-00001')}



('Accounts, Marco', '31', 'Regular employee')
('Accounts, Rebekah', '31', 'Regular employee')
('Accounts, Itoe', '31', 'Regular employee')
('Accounts, Edouard', '31', 'Regular employee')
('Accounts, Kyle', '62', 'Regular employee')
('Accounts, Kumiko', '31', 'Regular employee')
('Accounts, Gaston', '31', 'Regular employee')
('HR, Beryl', '62', 'Regular employee')
('HR, Olga', '31', 'Regular employee')
('HR, Leslie', '31', 'Regular employee')
('HR, Mindy', '31', 'Regular employee')
('HR, Vicky', '31', 'Regular employee')
('HR, Richard', '31', 'Regular employee')
('HR, Kirk', '31', 'Regular employee')
('HR, Kaori', '31', 'Regular employee')
('HR, Oscar', '31', 'Regular employee')


### CoGroupBy for Joins

Relational join of two or more key/value PCollections

Accepts a dictionary of key/value PCollections and output a single PCollection containing 1 key/value Tuple for each key in the input PCollections

In [38]:
from google.colab import files
uploaded = files.upload()

Saving location.txt to location.txt


In [39]:
import apache_beam as beam

def retTuple(element):

  thisTuple=element.split(',')
  return (thisTuple[0],thisTuple[1:])

p1 = beam.Pipeline()

# Apply a ParDo to the PCollection "words" to compute lengths for each word.
dep_rows = (
                p1
                | "Reading File 1" >> beam.io.ReadFromText('dept_data.txt')
                | 'Pair each employee with key' >> beam.Map(retTuple)          # {149633CM : [Marco,10,Accounts,1-01-2019]}

               )


loc_rows = (
                p1
                | "Reading File 2" >> beam.io.ReadFromText('location.txt')
                | 'Pair each loc with key' >> beam.Map(retTuple)                # {149633CM : [9876843261,New York]}
               )


results = ({'dep_data': dep_rows, 'loc_data': loc_rows}

           | beam.CoGroupByKey()
           | 'Write results' >> beam.io.WriteToText('data/result')
          )


p1.run()

!{('head -n 20 data/result-00000-of-00001')}

('149633CM', {'dep_data': [['Marco', '10', 'Accounts', '1-01-2019'], ['Marco', '10', 'Accounts', '2-01-2019'], ['Marco', '10', 'Accounts', '3-01-2019'], ['Marco', '10', 'Accounts', '4-01-2019'], ['Marco', '10', 'Accounts', '5-01-2019'], ['Marco', '10', 'Accounts', '6-01-2019'], ['Marco', '10', 'Accounts', '7-01-2019'], ['Marco', '10', 'Accounts', '8-01-2019'], ['Marco', '10', 'Accounts', '9-01-2019'], ['Marco', '10', 'Accounts', '10-01-2019'], ['Marco', '10', 'Accounts', '11-01-2019'], ['Marco', '10', 'Accounts', '12-01-2019'], ['Marco', '10', 'Accounts', '13-01-2019'], ['Marco', '10', 'Accounts', '14-01-2019'], ['Marco', '10', 'Accounts', '15-01-2019'], ['Marco', '10', 'Accounts', '16-01-2019'], ['Marco', '10', 'Accounts', '17-01-2019'], ['Marco', '10', 'Accounts', '18-01-2019'], ['Marco', '10', 'Accounts', '19-01-2019'], ['Marco', '10', 'Accounts', '20-01-2019'], ['Marco', '10', 'Accounts', '21-01-2019'], ['Marco', '10', 'Accounts', '22-01-2019'], ['Marco', '10', 'Accounts', '23-01-2