In [1]:
!ls

basic_beam.ipynb
beam_create.ipynb
data
map_flat_filter.ipynb
venv


In [2]:
import apache_beam as beam

In [3]:
p = beam.Pipeline()
# Core template

attendance_count = (

    p
    |   beam.io.ReadFromText('data/dept_data.txt')
    |   beam.io.WriteToText('data/output_new')
)
p.run()

In [7]:
!head -n 5 data/output_new-00000-of-00001

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019


In [8]:
def SplitRow(element):
    return element.split(',')

p = beam.Pipeline()
# Map transform
# 1 Input = 1 Output -> 1 to 1 mapping function over each element of the PCollection

attendance_count = (

    p
    |   beam.io.ReadFromText('data/dept_data.txt')
    |   beam.Map(SplitRow)
    |   beam.io.WriteToText('data/output_map')
)
p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1d4fd061c40>

In [9]:
!head -n 5 data/output_map-00000-of-00001

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']


In [10]:
def SplitRow(element):
    return element.split(',') # return [element.split(',')] will give same result as Map

p = beam.Pipeline()
# FlatMap transform
# Multiple Element for Single Element (1 to Many) of the PCollection

attendance_count = (

    p
    |   beam.io.ReadFromText('data/dept_data.txt')
    |   beam.FlatMap(SplitRow)
    |   beam.io.WriteToText('data/output_flatmap')
)
p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1d4fd0ce100>

In [12]:
!head -n 10 data/output_flatmap-00000-of-00001

149633CM
Marco
10
Accounts
1-01-2019
212539MU
Rebekah
10
Accounts
1-01-2019


In [13]:
def SplitRow(element):
    return element.split(',')

def filtering(record):
    return record[3] == 'Accounts'

p = beam.Pipeline()
# Filter transform
# Apply a filter to a PCollection

attendance_count = (

    p
    |   beam.io.ReadFromText('data/dept_data.txt')
    |   beam.Map(SplitRow)
    |   beam.Filter(filtering)
    |   beam.io.WriteToText('data/output_filter')
)
p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1d4fd11fa00>

In [14]:
!head -n 10 data/output_filter-00000-of-00001

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '1-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '1-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '1-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '2-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '2-01-2019']


## Full Pipeline

In [17]:
#p = beam.Pipeline()
with beam.Pipeline() as p:

    attendance_count = (

        p
        |'Read from file'       >>   beam.io.ReadFromText('data/dept_data.txt')
        |'Map transform'        >>   beam.Map(lambda record: record.split(','))
        |'Filter Accounts'      >>   beam.Filter(lambda record: record[3] == 'Accounts')
        |'Tag Passed Accounts'  >>   beam.Map(lambda record: (record[1],1))
        |'Sum Employees'        >>   beam.CombinePerKey(sum) # GroupByKey + Combiner + Reducer
        |'Output to a file'     >>   beam.io.WriteToText('data/output_pipeline')
    )
#p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1d480ff8c10>

In [18]:
!head -n 10 data/output_pipeline-00000-of-00001

('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)
