<a href="https://colab.research.google.com/github/yogeshuser1/learnbeam/blob/master/Beam_Map.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!{'pip install -q apache_beam'}

In [0]:
!{'mkdir data'}

In [9]:
ls

[0m[01;34mdata[0m/  dept-data.txt  [01;34msample_data[0m/


In [0]:
cd content/

# **How the Map transform works**

In [10]:
import apache_beam as beam

def split_row(element):
  return element.split(',')

p = beam.Pipeline()

attendance_count = (
    p
    |beam.io.ReadFromText('data/dept-data.txt')
    |beam.Map(split_row)
    |beam.io.WriteToText('data/output/output_new',file_name_suffix='.txt')
)

p.run()

!{('head -5 data/dept-data.txt')}
!{('head -5 data/output/output_new-00000-of-00001.txt')}



149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']


# **How FlatMap works**

In [17]:
import apache_beam as beam

def split_row(element):
  return element.split(',') 

p = beam.Pipeline()

attendance_count = (
    p
    |beam.io.ReadFromText('data/dept-data.txt')
    |beam.FlatMap(split_row)
    |beam.io.WriteToText('data/output/flat_map_output',file_name_suffix='.txt')
    
)

p.run()

!{('head -5 data/dept-data.txt')}
!{('head -5 data/output/flat_map_output-00000-of-00001.txt')}



149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
149633CM
Marco
10
Accounts
1-01-2019


# **How Filter works**

In [23]:
import apache_beam as beam

p = beam.Pipeline()

attendance_count = (
    p
    |beam.io.ReadFromText('data/dept-data.txt')
    |beam.Map(lambda record: record.split(','))
    |beam.Filter(lambda record: record[3] == 'Accounts')
    |beam.io.WriteToText('data/output/filter_output',file_name_suffix='.txt')
)

p.run()
!{('head -5 data/dept-data.txt')}
!{('head -5 data/output/filter_output-00000-of-00001.txt')}



149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
('Marco', 1)
('Rebekah', 1)
('Itoe', 1)
('Edouard', 1)
('Kyle', 1)


# **How CombinePerKey works**

In [24]:
import apache_beam as beam

p = beam.Pipeline()

attendance_count = (
    p
    |beam.io.ReadFromText('data/dept-data.txt')
    |beam.Map(lambda record: record.split(','))
    |beam.Filter(lambda record: record[3] == 'Accounts')
    |beam.Map(lambda record: (record[1],1))
    |beam.CombinePerKey(sum)
    |beam.io.WriteToText('data/output/combine_per_key_output',file_name_suffix='.txt')
)

p.run()
!{('head -5 data/dept-data.txt')}
!{('head -5 data/output/combine_per_key_output-00000-of-00001.txt')}

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
