Installing and Importing the beam

In [None]:
!pip install apache-beam
!pip install apache-beam[interactive]

Now upload data file to VM

In [1]:
from google.colab import files
upload =  files.upload()

Saving dept_data.txt to dept_data.txt


In [None]:
#check file is uploaded or not
!ls

dept_data.txt  sample_data


In [None]:
!head -n 10 dept_data.txt

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Gaston,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019


Creating PCollection by reading Textfile

In [None]:
import apache_beam as beam
my_pipeline = beam.Pipeline()

PCol1 = (my_pipeline|beam.io.ReadFromText('dept_data.txt'))

Creating PCollection by Create

In [None]:
my_pipeline_create = beam.Pipeline()

(
          my_pipeline_create
         |beam.Create(['Hello World',
                       'This is my list 2',
                       'Prgram 2'])
         |beam.io.WriteToText('myoutput.txt')
)

<PCollection[[14]: WriteToText/Write/WriteImpl/FinalizeWrite.None] at 0x7fdc480a94d0>

Applying Transformation to the data

In [None]:
import apache_beam as beam

def splitrow(mydata):
    return mydata.split(',')

my_pipeline = beam.Pipeline()

Pcollection_input = (
                  my_pipeline
              | 'Read from file' >>        beam.io.ReadFromText('dept_data.txt')        #read from input file
              | 'Split the row'>>          beam.Map(splitrow)
              )

PCollection_Account =( 
                Pcollection_input
                | 'Filter for accounts' >>   beam.Filter(lambda x:x[3] == 'Accounts')                   #apply filter for accounts department
                | 'create key value pair'>>  beam.Map(lambda x: ('Accounts ' +x[1],1) )                 #using map create key value pair
                | 'Group by key and sum' >>  beam.CombinePerKey(sum)                                    #similar to group by
                | 'Write putput to file'>>   beam.io.WriteToText('Account_data')
              )

PCollection_HR =( 
                Pcollection_input
                | 'Filter for HR' >>   beam.Filter(lambda x:x[3] == 'HR')                               #apply filter for accounts department
                | 'create key value pair HR'>>  beam.Map(lambda x: ('HR ' +x[1],1) )                    #using map create key value pair
                | 'Group by key and sum HR' >>  beam.CombinePerKey(sum)                                 #similar to group by
                | 'Write putput to file HR'>>   beam.io.WriteToText('HR_data')
              )

PCollection_Flat =(
                  (PCollection_Account,PCollection_HR)
                  |beam.Flatten()                                                                     #flatten example. It merges one or more pcollection
                  |beam.io.WriteToText('Flatten_file')
                  |beam.Map(print)
)

my_pipeline.run()

!head -n 5 Account_data-00000-of-00001
!head -n 5 HR_data-00000-of-00001



Flatten_file-00000-of-00001
('Accounts Marco', 31)
('Accounts Rebekah', 31)
('Accounts Itoe', 31)
('Accounts Edouard', 31)
('Accounts Kyle', 62)
('HR Beryl', 62)
('HR Olga', 31)
('HR Leslie', 31)
('HR Mindy', 31)
('HR Vicky', 31)


In [None]:
!cat Flatten_file-00000-of-00001

('HR Beryl', 62)
('HR Olga', 31)
('HR Leslie', 31)
('HR Mindy', 31)
('HR Vicky', 31)
('HR Richard', 31)
('HR Kirk', 31)
('HR Kaori', 31)
('HR Oscar', 31)
('Accounts Marco', 31)
('Accounts Rebekah', 31)
('Accounts Itoe', 31)
('Accounts Edouard', 31)
('Accounts Kyle', 62)
('Accounts Kumiko', 31)
('Accounts Gaston', 31)
('Accounts Ayumi', 30)


**ParDo** Examples

In [None]:
import apache_beam as beam

my_pardo_pipeline = beam.Pipeline()

class splitrow(beam.DoFn):                #create class with inherit DoFn class. DoFn classs is for distributed data processing
  def process(self,element):              #process function is compulsory to be defined
    return [element.split(',')]

class filterHR(beam.DoFn):
  def process(self,element):
    if element[3] == 'HR':
      return [element]

class pairemployee(beam.DoFn):
  def process(self,element):
    return [(element[1],1)]

(
    my_pardo_pipeline
 |beam.io.ReadFromText('dept_data.txt')
 |beam.ParDo(splitrow())
 |beam.ParDo(filterHR())
 |beam.ParDo(pairemployee())
 |beam.CombinePerKey(sum)
 |beam.io.WriteToText('pardo_example')

)

my_pardo_pipeline.run()

!head -n10 pardo_example-00000-of-00001



('Beryl', 62)
('Olga', 31)
('Leslie', 31)
('Mindy', 31)
('Vicky', 31)
('Richard', 31)
('Kirk', 31)
('Kaori', 31)
('Oscar', 31)


In [None]:
!ls

Account_data-00000-of-00001  HR_data-00000-of-00001
dept_data.txt		     pardo_example-00000-of-00001
Flatten_file-00000-of-00001  sample_data


**Composite** Transform

In [None]:
import apache_beam as beam

def splitrow(mydata):
    return mydata.split(',')

class mycomposite_tranform(beam.PTransform):
      '''Ptransform is the base class same like DoFn and expand method has to be overwritten
        This class can be used as generic class across different transformation
      '''
      def __init__(self,param):
        self.param = param
      def expand(self, input_collection):
        'this method takes collection as input'
        output = (
            input_collection
            |'Group by key and sum' >>  beam.CombinePerKey(sum)
            | 'Write putput to file'>>   beam.io.WriteToText(self.param)
        ) 
        return output

my_pipeline = beam.Pipeline()

Pcollection_input = (
                  my_pipeline
              | 'Read from file' >>        beam.io.ReadFromText('dept_data.txt')        #read from input file
              | 'Split the row'>>          beam.Map(splitrow)
              )

PCollection_Account =( 
                Pcollection_input
                | 'Filter for accounts' >>   beam.Filter(lambda x:x[3] == 'Accounts')                   #apply filter for accounts department
                | 'create key value pair'>>  beam.Map(lambda x: ('Accounts ' +x[1],1) )                 #using map create key value pair
                #| 'Group by key and sum' >>  beam.CombinePerKey(sum)                                    #similar to group by
                | 'composite transform for account' >> mycomposite_tranform('Account_data')
               # | 'Write putput to file'>>   beam.io.WriteToText('Account_data')
              )

PCollection_HR =( 
                Pcollection_input
                | 'Filter for HR' >>   beam.Filter(lambda x:x[3] == 'HR')                               #apply filter for accounts department
                | 'create key value pair HR'>>  beam.Map(lambda x: ('HR ' +x[1],1) )                    #using map create key value pair
                #| 'Group by key and sum HR' >>  beam.CombinePerKey(sum)                                 #similar to group by
                | 'composite transform for HR' >> mycomposite_tranform('HR_data')
               # | 'Write putput to file HR'>>   beam.io.WriteToText('HR_data')
              )


my_pipeline.run()

!head -n 5 Account_data-00000-of-00001
!head -n 5 HR_data-00000-of-00001



('Accounts Marco', 31)
('Accounts Rebekah', 31)
('Accounts Itoe', 31)
('Accounts Edouard', 31)
('Accounts Kyle', 62)
('HR Beryl', 62)
('HR Olga', 31)
('HR Leslie', 31)
('HR Mindy', 31)
('HR Vicky', 31)


**CoGroupBY**
Aggregates all input elements by their key

CoGroupByKey expects a **dictionary** of named keyed PCollections, and produces elements joined by their keys. It acts as **relational** join

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  PCollection_Student_data = pipeline|'create student' >> beam.Create([('1','Employee1'),('2','Employee2')])
  PCollection_Dept_data = pipeline|'create depart' >> beam.Create([('1','DEPT1'),('2','DEPT2')])

  output=(
      ({'Student':PCollection_Student_data,'Department':PCollection_Dept_data})
      |'Merge' >> beam.CoGroupByKey()
      |beam.io.WriteToText('Student')
      )

!head -n5 Student*




('1', {'Student': ['Employee1'], 'Department': ['DEPT1']})
('2', {'Student': ['Employee2'], 'Department': ['DEPT2']})


**Side Input**

In [None]:
import apache_beam as beam

side_input = [1,2]

class exclude_from_list(beam.DoFn):
  def process(self,element, param):
    if element[0] not in param:
      return [element]

with beam.Pipeline() as pipeline:
  PCollection_Student_data = (
      pipeline    
    |'create student' >> beam.Create([(1,'Employee1'),(2,'Employee2'),(3,'Employee3'),(4,'Employee4'),(5,'Employee5')])
    |beam.ParDo(exclude_from_list(),side_input) #excluding the 1,2 student
    |beam.Map(print)
                            )



(3, 'Employee3')
(4, 'Employee4')
(5, 'Employee5')


In [None]:
!ls

sample_data  Student-00000-of-00001  Test-00000-of-00001


In [None]:
!rm *00000-of-00001

Additional Output/Generating **multiple** output and **tagging** output

In [31]:
import apache_beam as beam
pipeline = beam.Pipeline()

class seperatename(beam.DoFn):
  def process(self,element):
    element_split = element.split(',')
    if element_split[1].startswith('R'):
      return [beam.pvalue.TaggedOutput('Name_with_R', element)]   #tagging of output
    if element_split[1].startswith('B'):
      return [beam.pvalue.TaggedOutput('Name_with_B', element)]   #tagging of output


PCol_output=(
    pipeline
    |beam.io.ReadFromText('dept_data.txt')
    |beam.ParDo(seperatename()).with_outputs('Name_with_R','Name_with_B') #tags name
    )

Name_with_R_ouput = PCol_output.Name_with_R
Name_with_B_ouput = PCol_output.Name_with_B

Name_with_R_ouput|'write R file' >> beam.io.WriteToText('Name_with_R_ouput')
Name_with_B_ouput|'write B file' >> beam.io.WriteToText('Name_with_B_ouput')

pipeline.run()

!head -n 5 Name_with_R_ouput-00000-of-00001
!head -n 5 Name_with_B_ouput-00000-of-00001



212539MU,Rebekah,10,Accounts,1-01-2019
745411HT,Richard,20,HR,1-01-2019
212539MU,Rebekah,10,Accounts,2-01-2019
745411HT,Richard,20,HR,2-01-2019
212539MU,Rebekah,10,Accounts,3-01-2019
331593PS,Beryl,20,HR,1-01-2019
892691AR,Beryl,20,HR,1-01-2019
331593PS,Beryl,20,HR,2-01-2019
892691AR,Beryl,20,HR,2-01-2019
331593PS,Beryl,20,HR,3-01-2019


In [29]:
!ls

dept_data.txt			  Name_with_R_ouput-00000-of-00001
Name_with_B_ouput-00000-of-00001  sample_data
