<a href="https://colab.research.google.com/github/thursy/Apache-Beam-Notebook/blob/main/Beam_batch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing the Apache Beam

Apache beam is a unified programming model that can build portable ETL pipelines for BigData
- Unified: enable to dealing the batch and streaming data in the same way.
  *Batch* (the data that is completed and can be loaded as a whole one time). 
  *Streaming* (real-time data, the data hasn't complete yet, new data emerge every time)

- portable: once you write your apache beam code you can run it on any execution engine such as: Spark, Flink, Samza

In [None]:
!pip3 install apache_beam

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache_beam
  Downloading apache_beam-2.46.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.5/14.5 MB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fasteners<1.0,>=0.3
  Downloading fasteners-0.18-py3-none-any.whl (18 kB)
Collecting dill<0.3.2,>=0.3.1.1
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 KB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting objsize<0.7.0,>=0.6.1
  Downloading objsize-0.6.1-py3-none-any.whl (9.3 kB)
Collecting pymongo<4.0.0,>=3.8.0
  Downloading pymongo-3.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (515 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m515.5/515.5 KB[0m [31m9.3 MB/s

# Batch Processing

## ReadFromText, WriteToText

ReadFromText(file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, validate, skip_header_lines)

- file_pattern (required)
hdfs, gfs, local filesystem, *

- min_bundle_size
Pcollection 1GB
1024MB/128MB >> 8 Bundle size
1core for each bundle >> paralellization

- compression_type (optional)
automatically determined by the apache beam, but you can explicitly specify the compression type

- strip_trailing_newlines
determine should remove the newline or not while reading the lines
default == True
False >> create newline in between

- validate
verify the presence of the file during pipeline creation
error if the file to write to is exist after pipeline creation

- skip_header_lines = 1

ReadFromAvro(file_pattern, min_bundle_size, validate, use_fastavro)


WriteToText(file_path_prefix, suffix, append_trailing_newline, no_of_shard, shard_name_template, coder, compression_type, header)

- append_trailing_newline = false
[...][...]

- append_trailing_newline = true
[...]
[...]

- no_of_shard >> the number of outputfile
already set by apache beam by default

- shard_name_template
'SS-NN'

- compression_type=beam.io.filesystem.CompressionTypes.GZIP

- header='movieID,Name,rating'

WriteToAvro(file_path_prefix, Schema, codec, file_name-suffix, num_shards, shard_name_template, mime_type, use_fastavro)

WriteToParquert(file_path_prefix, Schema, row_group_buffer_size, record_batch_size, codec, use_deprecated_int_timestamp,file_name_suffix, num_shards, shard_name_tamplate, mime_type)

In [None]:
!cat /data/movies_rating.txt

movieId,name,rating
1,Titanic,3.5
2,Avengers,4
3,SpiderMan,4.5
4,Green Miles,4


In [None]:
file_path = '/data/movies_rating.txt'

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()
ratings = {
    p1
    |beam.io.ReadFromText(file_path,skip_header_lines=1)
    |beam.Map(lambda record: record.split(','))
    |beam.Filter(lambda record: float(record[2])>4)
    |beam.io.WriteToText('result')
}
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e5741af0>

In [None]:
!cat result-00000-of-00001

['3', 'SpiderMan', '4.5']


## Map and Filter

In [None]:
!cat /data/Customers_age.txt

1,John,NY,22
2,Jim,LA,25
3,Mary,NY,30
4,Albert,LA,20
5,Samza.NY,18
6,Maria,NY,15
7,Shreya,NY,30
8,Kavita,LA,20
9,Mona,NY,18
10,Nandita,NY,15

In [None]:
file_path = '/data/Customers_age.txt'

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()
customers = {
    p1
    |beam.io.ReadFromText(file_path)
    |beam.Map(lambda record: record.split(','))
    |beam.Filter(lambda record: record[2]=='NY' and int(record[3])>20)
    |beam.io.WriteToText('result','customer')
}
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e77e04c0>

In [None]:
!cat result-00000-of-00001customer

['1', 'John', 'NY', '22']
['3', 'Mary', 'NY', '30']
['7', 'Shreya', 'NY', '30']


## FlatMap

In [None]:
!cat /data/Peter_Piper.txt

peter piper picked a peck of pickled pepper
a peck of pickled pepper peter piper picked
if peter piper picked a peck of pickled pepper
where's the peck of pickled pepper peter piper picked


In [None]:
file_path = '/data/Peter_Piper.txt'

In [None]:
words = ['peter','piper', 'pickled', 'picked', 'pecked', 'pepper']

In [None]:
def FindWord(element):
  if element in words:
    return True

In [None]:
p1 = beam.Pipeline()

freq = {
    p1
    |beam.io.ReadFromText(file_path)
    |beam.FlatMap(lambda record: record.split(' '))
    |beam.Filter(FindWord)
    |beam.Map(lambda record: (record,1))
    |beam.CombinePerKey(sum)
    |beam.io.WriteToText('result','freq')
}
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e6e29cd0>

In [None]:
!cat result-00000-of-00001freq

('peter', 4)
('piper', 4)
('picked', 4)
('pickled', 4)
('pepper', 4)


## CoGroupByKey

In [None]:
p = beam.Pipeline()
movie_name = [
    (1, 'SpiderMan'),
    (2, 'Avenger'),
    (3, 'Titanic'),
    (4, 'Green Miles'),
]
movie_rating = [
    (1, 3.5),
    (2, 4),
    (1, 4.5),
    (3, 3.5),
    (2, 4.5)
]

name = p| 'Create Name Pcollection' >> beam.Create(movie_name)
ratings = p| "Create Rating Pcollection" >> beam.Create(movie_rating)

#joinedResult = ({name,ratings} | beam.CoGroupByKey()) | beam.Map(print)
joinedResult = ({'movie_name':name, 'movie_rating':ratings} | beam.CoGroupByKey()) | beam.Map(print)

p.run()

(1, {'movie_name': ['SpiderMan'], 'movie_rating': [3.5, 4.5]})
(2, {'movie_name': ['Avenger'], 'movie_rating': [4, 4.5]})
(3, {'movie_name': ['Titanic'], 'movie_rating': [3.5]})
(4, {'movie_name': ['Green Miles'], 'movie_rating': []})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e7704f70>

## Partition

In [None]:
import apache_beam as beam

p = beam.Pipeline()
number = {1,2,3,4,5,6,7,8}

def partition_fn(element, num_partition):
  return 0 if element%2 else 1

number_pc = p|beam.Create(number)|beam.Partition(partition_fn,2)
number_pc[0]|"Printing first partition" >> beam.Map(print)
#number_pc[1]|"Printing second partition" >> beam.Map(print)
p.run()

1
3
5
7


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e7739550>

## Flatten
union operation that will combine multiple collection into single logical collection

In [None]:
import apache_beam as beam

p = beam.Pipeline()

even = {2,4,6,8}
odd = {1,3,5,6,9}
name = {'Jack','Rose','Jira'}

even_pc = p |"Create Pcollection for even number" >> beam.Create(even)
odd_pc = p | "Create Pcollection for odd number" >> beam.Create(odd)
name_pc = p | "Create Pcollection for name" >> beam.Create(name)

result = ((even_pc, odd_pc, name_pc) | beam.Flatten()) | beam.Map(print)

p.run()

Jack
Jira
Rose
8
2
4
6
1
3
5
6
9


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e763f970>

## Composite Ptransform
### Before

In [None]:
!cat /data/students_marks.txt

John,US,20,22,40
Jim,IN,40,24,36
Sam,US,23,26,40
Sandhya,IN,40,42,36
Naina,IN,34,48,44

In [None]:
file_path = '/data/students_marks.txt'

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()

def SplitRow(input_element):
  return input_element.split(',')

def FilterBasedonCountry(countryName, input_element):
  return input_element[1] == countryName

def CalculateSum(elem):
  return elem[0],(int(elem[2])+int(elem[3])+int(elem[4]))

def FormatText(elem):
  return elem[0]+' has received '+str(elem[1]) +' marks'

input_collection = (
    p1
    | beam.io.ReadFromText(file_path)
    | beam.Map(SplitRow)
)

us_pipeline = (
    input_collection
    | beam.Filter(lambda record: FilterBasedonCountry('US', record))
    | 'Calculate Sum for US' >> beam.Map(CalculateSum)
    | 'Apply Formatting for US' >> beam.Map(FormatText)
    | 'Writing result to US File' >> beam.io.WriteToText('US_Result')
)

india_pipeline = (
    input_collection
    | beam.Filter(lambda record: FilterBasedonCountry('IN', record))
    | 'Calculate Sum for IN' >> beam.Map(CalculateSum)
    | 'Apply Formatting for IN' >> beam.Map(FormatText)
    | 'Writing result to India File' >> beam.io.WriteToText('IN_Result')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e72b6f40>

In [None]:
!cat US_Result-00000-of-00001

John has received 82 marks
Sam has received 89 marks


In [None]:
!cat IN_Result-00000-of-00001

Jim has received 100 marks
Sandhya has received 118 marks
Naina has received 126 marks


### After

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()

def SplitRow(input_element):
  return input_element.split(',')

def FilterBasedonCountry(countryName, input_element):
  return input_element[1] == countryName

def CalculateSum(elem):
  return elem[0],(int(elem[2])+int(elem[3])+int(elem[4]))

def FormatText(elem):
  return elem[0]+' has received '+str(elem[1]) +' marks'

class MyPTransform(beam.PTransform):
  def expand(self, input_col):
    a = (
        input_col
         |"Calculate Sum" >> beam.Map(CalculateSum)
         |"Apply Formatting" >> beam.Map(FormatText)
    )
    return a

input_collection = (
    p1
    | beam.io.ReadFromText(file_path)
    | beam.Map(SplitRow)
)

us_pipeline = (
    input_collection
    | beam.Filter(lambda record: FilterBasedonCountry('US', record))
    | 'Calculate Ptransform for US' >> MyPTransform()
    | 'Writing result to US File' >> beam.io.WriteToText('US_Result')
)

india_pipeline = (
    input_collection
    | beam.Filter(lambda record: FilterBasedonCountry('IN', record))
    | 'Calculate Ptransform for India' >> MyPTransform()
    | 'Writing result to India File' >> beam.io.WriteToText('IN_Result')
)

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e6d0a9d0>

In [None]:
!cat US_Result-00000-of-00001

John has received 82 marks
Sam has received 89 marks


In [None]:
!cat IN_Result-00000-of-00001

Jim has received 100 marks
Sandhya has received 118 marks
Naina has received 126 marks


## ParDo

### ParDo as Map and Filter

In [None]:
!cat /data/Customers_age.txt

1,John,NY,22
2,Jim,LA,25
3,Mary,NY,30
4,Albert,LA,20
5,Samza.NY,18
6,Maria,NY,15
7,Shreya,NY,30
8,Kavita,LA,20
9,Mona,NY,18
10,Nandita,NY,15

In [None]:
file_path = '/data/Customers_age.txt'

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

class SplitRow(beam.DoFn):
  def process(self, element):
    return [element.split(',')]

class FilterCustomer(beam.DoFn):
  def process(self, element):
    if element[2]=='NY' and int(element[3])>20:
      return [element]

customers={
    p1
    | beam.io.ReadFromText(file_path)
    | beam.ParDo(SplitRow()) #Map
    | beam.ParDo(FilterCustomer()) #Filter
    | beam.io.WriteToText('result')
}

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e616c280>

In [None]:
!cat result-00000-of-00001

['1', 'John', 'NY', '22']
['3', 'Mary', 'NY', '30']
['7', 'Shreya', 'NY', '30']


### ParDo as FlatMap

In [None]:
!cat /data/Peter_Piper.txt

peter piper picked a peck of pickled pepper
a peck of pickled pepper peter piper picked
if peter piper picked a peck of pickled pepper
where's the peck of pickled pepper peter piper picked


In [None]:
file_path = '/data/Peter_Piper.txt'

In [None]:
import apache_beam as beam

words=['peter','piper','pickled','picked','peck','pepper']

def SplitRow(element):
  return element.split(' ')


def FindWord(element):
 if element in words:
    return True

class SplitRow(beam.DoFn):
  def process(self,element):
    return element.split(' ')


class CalculateFrequency(beam.DoFn):
  def process(self,element):
    (key,value) = element
    return [(key,sum(value))]

   

p1 = beam.Pipeline()

freq = (
    p1
    |'Read your input file' >> beam.io.ReadFromText(file_path)
    |'Split Records with Space' >> beam.ParDo(SplitRow())
    |'Filtering records' >> beam.Filter(FindWord)
    |'Create tupled records'>> beam.Map(lambda record: (record,1))
    |'Group By Key'>> beam.GroupByKey()
    |'Calculate Frequency of words' >> beam.ParDo(CalculateFrequency())
    |beam.io.WriteToText('result')
)
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e770bfd0>

In [None]:
!cat result-00000-of-00001

('peter', 4)
('piper', 4)
('picked', 4)
('peck', 4)
('pickled', 4)
('pepper', 4)


## Side Inputs

In [None]:
!cat /data/Peter_Piper.txt

peter piper picked a peck of pickled pepper
a peck of pickled pepper peter piper picked
if peter piper picked a peck of pickled pepper
where's the peck of pickled pepper peter piper picked


In [None]:
fileexc_path = '/data/customers_exclude.txt'
file_path = '/data/Customers_age.txt'

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()

side_list = list()
with open (fileexc_path,'r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())

print(side_list)

class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

customers = (
    p1
    |beam.io.ReadFromText(file_path)
    |beam.ParDo(SplitRow(),side_list)
    |beam.io.WriteToText('result')
)
p1.run()

['3', '7', '10']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e6990a90>

In [None]:
!cat result-00000-of-00001

['1', 'John', 'NY', '22']
['2', 'Jim', 'LA', '25']
['4', 'Albert', 'LA', '20']
['5', 'Samza.NY', '18']
['6', 'Maria', 'NY', '15']
['8', 'Kavita', 'LA', '20']
['9', 'Mona', 'NY', '18']


## Side Outputs

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()

side_list = list()
with open (fileexc_path,'r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())

print(side_list)

class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

class ProcessCustomers(beam.DoFn):
  def process(self,element,country,start_char):
    if(element[2]=='NY'):
      yield  element
    else:
      yield  beam.pvalue.TaggedOutput('Other_Cust',element)
    if(element[1].startswith('J')):
       yield  beam.pvalue.TaggedOutput('Names_J',element)
  


customers = (
    p1
    |beam.io.ReadFromText(file_path)
    |beam.ParDo(SplitRow(),side_list)
    |beam.ParDo(ProcessCustomers(),country='NY',start_char='J').with_outputs('Names_J','Other_Cust',main='NewYork_Cust')
)

newyork_customers = customers.NewYork_Cust
other_customers = customers.Other_Cust
customer_withname_J = customers.Names_J

newyork_customers | 'Write Newyork Customers PCollection' >> beam.io.WriteToText("newyork")
other_customers  | 'Write Customers PCollection that lives in other cities' >> beam.io.WriteToText("customers_other_cities")
customer_withname_J  | 'Write Customers names with J PCollection' >> beam.io.WriteToText("customers_names_j")


p1.run()

['3', '7', '10']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f21e523c580>

In [None]:
!cat newyork-00000-of-00001

['1', 'John', 'NY', '22']
['6', 'Maria', 'NY', '15']
['9', 'Mona', 'NY', '18']


In [None]:
!cat customers_names_j-00000-of-00001

['1', 'John', 'NY', '22']
['2', 'Jim', 'LA', '25']


In [None]:
!cat customers_other_cities-00000-of-00001

['2', 'Jim', 'LA', '25']
['4', 'Albert', 'LA', '20']
['5', 'Samza.NY', '18']
['8', 'Kavita', 'LA', '20']
