<a href="https://colab.research.google.com/github/rashida048/Apache-Beam/blob/main/ParDo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --quiet apache_beam

In [None]:
mkdir -p data

In [None]:
ls

[0m[01;34mdata[0m/  [01;34msample_data[0m/


In [None]:
import apache_beam as beam

In [None]:
#ParDo as Map
class SplitRow(beam.DoFn):
  def process(self, element):
    return [element.split(',')]

p1 = beam.Pipeline()
attendance_count = (p1
                    |beam.io.ReadFromText('data/dept_data.txt')
                    |beam.ParDo(SplitRow())
                    #|beam.Filter(filtering)
                    #|beam.Map(lambda record: (record[1], 1))
                    #|beam.CombinePerKey(sum)
                    |beam.io.WriteToText('data/output_new1'))

p1.run()
!head -n 5 data/output_new1-00000-of-00001

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']


In [None]:
#ParDo as FlatMap
class SplitRow(beam.DoFn):
  def process(self, element):
    return element.split(',')

p2 = beam.Pipeline()
attendance_count = (p2
                    |beam.io.ReadFromText('data/dept_data.txt')
                    |beam.ParDo(SplitRow())
                    #|beam.Filter(filtering)
                    #|beam.Map(lambda record: (record[1], 1))
                    #|beam.CombinePerKey(sum)
                    |beam.io.WriteToText('data/output_new2'))

p2.run()
!head -n 10 data/output_new2-00000-of-00001



149633CM
Marco
10
Accounts
1-01-2019
212539MU
Rebekah
10
Accounts
1-01-2019


In [None]:
def SplitRow(element):
  return element.split(',')

def filtering(record):
  return record[3] == "Accounts"

p6 = beam.Pipeline()
attendance_count = (p6
                    |beam.io.ReadFromText('data/dept_data.txt')
                    |beam.Map(SplitRow)
                    #|beam.Map(lambda record: record.split(',))
                    |beam.Filter(filtering)
                    #|beam.Map(lambda record: record[3] == 'Account')
                    |beam.Map(lambda record: (record[1], 1))
                    |beam.io.WriteToText('data/output_new6'))

p6.run()
!head -n 5 data/output_new6-00000-of-00001

('Marco', 1)
('Rebekah', 1)
('Itoe', 1)
('Edouard', 1)
('Kyle', 1)


In [None]:
#ParDo as FlatMap
class SplitRow(beam.DoFn):
  def process(self, element):
    return [element.split(',')]

class FilterAccountsEmployee(beam.DoFn):
  def process(self, element):
    if element[3] == 'Accounts':
        return [element]

class PairEmployees(beam.DoFn):
  def process(self, element):
    return [(element[3]+ ', '+ element[1], 1)]

class Counting(beam.DoFn):
  def process(self, element):
    (key, values) = element
    return [(key, sum(values))]


p4 = beam.Pipeline()
attendance_count = (p4
                    |beam.io.ReadFromText('data/dept_data.txt')
                    |beam.ParDo(SplitRow())
                    |beam.ParDo(FilterAccountsEmployee())
                    |beam.ParDo(PairEmployees())
                    | 'Group' >> beam.GroupByKey()
                    | 'Sum Using ParDo' >> beam.ParDo(Counting())
                    #|beam.Map(lambda record: (record[1], 1))
                    #|beam.CombinePerKey(sum)
                    |beam.io.WriteToText('data/output_new4'))

p4.run()
!head -n 10 data/output_new4-00000-of-00001

('Accounts, Marco', 31)
('Accounts, Rebekah', 31)
('Accounts, Itoe', 31)
('Accounts, Edouard', 31)
('Accounts, Kyle', 62)
('Accounts, Kumiko', 31)
('Accounts, Gaston', 31)
('Accounts, Ayumi', 30)


**Combiner**

In [None]:
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0) #initialize (sum, count)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum+input, count+1

  def merge_accumulators(self, accumulators):
    ind_sums, ind_counts = zip(*accumulators) #zip - [(27, 3), (39, 3), (18, 2)] --> [(27, 39, 18), (3, 3, 2)]
    return sum(ind_sums), sum(ind_counts)   #(84, 8)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')


p5 = beam.Pipeline()

small_sum = (p5
             |beam.Create([15, 5, 7, 7, 9, 23, 13, 5])
             |beam.CombineGlobally(AverageFn())
             | 'Write results' >> beam.io.WriteToText('data/combine')
)

p5.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f483ca91030>

In [None]:
!head -n 10 data/combine-00000-of-00001

10.5
