<a href="https://colab.research.google.com/github/prateekjoshi2013/Angular-GettingStarted/blob/master/Apache_Beam_Primer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## Installation Apache Beam

In [30]:
!{'pip install --quiet apache_beam'}

## Creating Folders and Uploading Files

In [None]:
!{'mkdir -p data'}

In [None]:
!ls

data  sample_data


In [None]:
from google.colab import files
uploaded=files.upload()

Saving dept_data.txt to dept_data.txt


In [7]:
!ls

data  dept_data.txt  sample_data


## Read Transforms: Reading Data

 ### Read From Text

  - **ReadFromText(...)** : Parses a text file as newline delimited elements i.e it reads the file line by line every line is a single elelment in pCollection

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **compression_type(str)**: we can specify compression type here if input files are compressed
  - **strip_trailing_newlines(boolean)**: whether source should remove the new line char (/n) from each line before reading it, By default true.
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **skip_header_lines(int)**: specifies the number of header lines to be skipped, By default 0




 ### Read From Avro

  - **ReadFromAvro(...)** : Parses a avro files

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **use_fastavro(boolean)**:  when set to true, uses the 'fastavro' library to read avro files


 ### Read From Parquet

  - **ReadFromParquet(...)** : Parses parquet files

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **columns(list[str])**: Specifies the list of columns that will be read from the input files



 ### Read From Tensor Flow records

  - **ReadFromTFRecord(...)** : Parses tensor flow records which are in binary format

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **compression_type(str)**: we can specify compression type here if input files are compressed
  - **coder(str)**: Specifies the coder name to decode the input record .By default 'bytesCoder'.

 ### Read From Pub Sub GCP

  - **ReadFromPubSub(...)** : Used to read messages from Google PubSub service

 #### Parameters

  - **topic(str)**: param specifies the topic name where the messages are getting published and beam has to read it
  - **subscription(str)**: Specifies the subscription from where beam has to read the messages.  **either topic or subscription should be provided and not both**

  - **id_label(str)**: Provide the attribute from incoming messages which should be considered as unique identifier
  - **with_attributes(boolean)**: if set to true then output elements will be of type objects otherwise of type bytes, False by default
  - **timestamp_attributes(int)**: Specifies the value to be used as element timestamp.Specified argument should be of milliseconds since unix epoch.

### Other Read Transforms :

#### Available

- Other readers like kinesis, redis might be dependent on jre

- like : kinesis, jdbc, redis etc



## Create Transform : Generating Data

In [42]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create([
        'Using create transform',
        'to generate in memory data',
        'This is a third line',
        'Thanks',
    ])
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}




Using create transform
to generate in memory data
This is a third line
Thanks


In [43]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create([
        ("maths",52),
        ("english",52),
        ("science",52),
        ("computer",52),
        ("maths",52),
    ])
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('maths', 52)
('english', 52)
('science', 52)
('computer', 52)
('maths', 52)


In [44]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create({
        'row1':[1,2,3,4,5],
        'row2':[1,2,3,4,5],
        'row3':[1,2,3,4,5],
    })
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('row1', [1, 2, 3, 4, 5])
('row2', [1, 2, 3, 4, 5])
('row3', [1, 2, 3, 4, 5])


# Write Transforms : Writng Files

## Write To Text

- **WriteToText(...)** : Writes each element of the PCollection as a single line in the output file

### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **append_trailing_newlines(boolean)** : if the lines should be delimmited with newline,By default true
- **coder(str)** : Specifies the coder name to encode each line
- **header(str)** : Specifies the header line of output




## Write To Avro

- **WriteToAvro(...)**: Writes each element of the pCollection to Avro Files

### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **schema** : the schema to use for writing as returned by avro.schema.Parse
- **codec** : the codec used for block level compression type of the Output file.
- **compression_type(str)** : Specifies the compression type of the output file
- **use_fastavro(boolean)** : when set to true, uses the 'fastavro' library to write the Avro file.
- **mime_type** : MIME type to use for the produced output files.




## Write To Parquet

- **WriteToParquet(...)**: Writes each element of the pCollection to Parquet Files


### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **schema** : the schema to use for writing as returned by avro.schema.Parse
- **codec** : the codec used for block level compression type of the Output file.
- **mime_type** : MIME type to use for the produced output files.
- **row_group_buffer_size** : Specifies the byte size of the row group buffer. Default '67108864'
- **record_batch_size** : Specifies the number of records in each record batch. Default '1000'

## Write To PubSub

- **WriteToPubSub(...)** : Writes each element of the PCollection to the Google cloud PubSub service

### Parameters

- **topic(str)** : Topic name to which the message is published to.
- **with_attributes(boolean)** : If set to true then input elements will be of type 'Objects' , otherwise of type 'Bytes'. By default False
- **id_label** : If set will set an attribute for each Cloud Pub/Sub message with the given name and a unique value.
- **timestamp_attribute(int)** : if set will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value

# Typical Apache Beam Code

In [46]:
import apache_beam as beam

 # Creating and giving pipeline a name
pipeline=beam.Pipeline()

attendance_count=(
    # Each PTransform produces new PCollection
    pipeline
    # reader
    | beam.io.ReadFromText('dept_data.txt')
    # PTransform Map (["id,name,salary,dept,date"])->(id,name,salary,dept,date)
    | beam.Map(lambda record : record.split(','))
    # PTransform Filter ([id,name,salary,dept,date]) where dept=='Accounts'
    | beam.Filter(lambda record: record[3]=='Accounts')
    # PTransform Map ([id,name,salary,dept,date])->(ID,1)
    | beam.Map(lambda record: (record[1],1))
    # PTransform Group By ID
    | beam.CombinePerKey(sum)
    # PTransform Map (id,count_total) -> ("id,total_count")
    | beam.Map(lambda employee_count: str(employee_count))
    # writer
    | beam.io.WriteToText('data/output')
)

pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


# Better Pipeline Code with Context and Annotation

In [48]:
import apache_beam as beam



 # Creating Pipeline with context

with beam.Pipeline() as pipeline:
  attendance_count=(
      # Each PTransform produces new PCollection
      pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(','))
      | "Filter by Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records" >> beam.CombinePerKey(sum)
      | "Record to String" >> beam.Map(lambda employee_count: str(employee_count))
      | "Write to File" >> beam.io.WriteToText('data/output')
  )

# visualize input
!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


# Branching And Merging Pipelines

- We can split pipeline by using intermdeiate pCollection
- We can merge pCollection by using **flatten**

In [77]:
import apache_beam as beam



 # Creating Pipeline with context

with beam.Pipeline() as pipeline:

  records = (pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(',')))

  accounts_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by Accounts Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With Accounts ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For Accounts" >> beam.CombinePerKey(sum)
      | "Tag With Accounts department" >> beam.Map(lambda record: (*record,'Accounts'))
      | "Accounts Record to String" >> beam.Map(lambda employee_count: str(employee_count))
  )
  hr_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by HR Department" >> beam.Filter(lambda record: record[3]=='HR')
      | "Create Record With HR ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For HR" >> beam.CombinePerKey(sum)
      | "Tag With HR department" >> beam.Map(lambda record: (*record,'HR'))
      | "HR Record to String" >> beam.Map(lambda employee_count: str(employee_count))
  )
  output=(
      (accounts_count,hr_count)
      | beam.Flatten()
      | "Write to File" >> beam.io.WriteToText('data/output')
  )


# visualize input
!{('head -n 20 data/output*')}



PCollection[[77]: Accounts Record to String.None]
('Marco', 31, 'Accounts')
('Rebekah', 31, 'Accounts')
('Itoe', 31, 'Accounts')
('Edouard', 31, 'Accounts')
('Kyle', 62, 'Accounts')
('Kumiko', 31, 'Accounts')
('Gaston', 31, 'Accounts')
('Ayumi', 30, 'Accounts')
('Beryl', 62, 'HR')
('Olga', 31, 'HR')
('Leslie', 31, 'HR')
('Mindy', 31, 'HR')
('Vicky', 31, 'HR')
('Richard', 31, 'HR')
('Kirk', 31, 'HR')
('Kaori', 31, 'HR')
('Oscar', 31, 'HR')


# Word Count Example

In [67]:
# import file

from google.colab import files
uploaded=files.upload()

In [68]:
# visualize input
!{('head -n 20 data.txt')}

	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:

DUKE OF BURGUNDY	(BURGUNDY:)

DUKE OF CORNWALL	(CORNWALL:)

DUKE OF ALBANY	(ALBANY:)

EARL OF KENT	(KENT:)

EARL OF GLOUCESTER	(GLOUCESTER:)



In [119]:

import apache_beam as beam
import re

other_than_non_space_char_pattern = r'[^a-zA-Z]+'
non_space_char_word = r'^\S+$'
# Creating Pipeline with context
with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("data.txt")
              | beam.FlatMap(lambda row: re.split(other_than_non_space_char_pattern, row))
              | beam.Filter(lambda word: bool(re.match(non_space_char_word, word)))
              | beam.Map(lambda word: (word,1))
              | beam.CombinePerKey(sum)
              | beam.Map(lambda row: ', '.join([ str(r) for r in row]))
              | "Write to File" >> beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



KING, 243
LEAR, 236
DRAMATIS, 1
PERSONAE, 1
king, 65
of, 447
Britain, 2
OF, 15
FRANCE, 10
DUKE, 3
BURGUNDY, 8
CORNWALL, 63
ALBANY, 68
EARL, 2
KENT, 156
GLOUCESTER, 143
EDGAR, 126
son, 29
to, 440
Gloucester, 36


# ParDo Transform

- A ParDo transform takes each element of input Pcollection, performs processing function on it and emits 0,1 or multiple elements

- Map, FlatMap and Filter are special cases of ParDo function
- **Returns a generator of elements on which next is called by the next transform**

## Functionalities

- **Filtering** : ParDo can take each element of Pcollection and decide either to output or discard it.
- **Formatting or Type conversion** : ParDo can change the type or format of input elements
- **Extracting individual parts** : ParDo can be used to extract individual elements from a single element.
- **Computations** : ParDo can perform any processing function on the input elements and outputs a Pcollection



In [137]:
import apache_beam as beam


# Acts as a map function
class SplitRow(beam.DoFn):

  def process(self,element):
    #return type -> list
    return [element.split(',')]

class PairEmployees(beam.DoFn):
  def process(self,element):
    return [(element[1],1)]

class Counting(beam.DoFn):

  def process(self,element):
    (key,values)=element
    return [(key,sum(values))]

with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("dept_data.txt")
              | beam.ParDo(SplitRow())
              | beam.ParDo(lambda row: [row] if row[3]=="Accounts" else None)
              | beam.ParDo(PairEmployees())
              | beam.GroupByKey()
              | beam.ParDo(Counting())
              | "Write to File" >> beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


# Combiner Transform

- **create_accumlator** : Creates a new local accumlator in each machine, keeps a record of (sum,counts)

- **add_input** : adds am input element to accumlator, returning new (sum,count) value

- **merge_accumlators** : merges all machines accumlators into a single one. In our case all the (sum, counts) from various machines are gathered and summed up.

- **extract_output** : Performs the final computation on the merge_accumlator result. Called only once on the merge_accumlator result

# Example

```python
class SumFn(beam.CombineFn):
    def create_accumulator(self):
        return 0  # Initialize accumulator to 0

    def add_input(self, accumulator, element):
        return accumulator + element  # Incrementally add input element to accumulator

    def merge_accumulators(self, accumulators):
        return sum(accumulators)  # Merge multiple accumulators into one

    def extract_output(self, accumulator):
        return accumulator  # Return the final accumulated sum

```



In [170]:
import apache_beam as beam


# Acts as a map function
class SplitRow(beam.DoFn):

  def process(self,element):
    #return type -> list
    return [element.split(',')]

class PairEmployees(beam.DoFn):
  def process(self,element):
    return [(element[1],1)]


## Modified Counting
class Counting(beam.DoFn):
  def process(self,element):
    result=[]
    for id,values in element:
      result.append((id,sum(values)))

    return result


## Custom Combiner
class GroupByKey(beam.CombineFn):

  def create_accumulator(self):
    return {}

  def add_input(self, accumlator,element,*args, **kwargs):
    (id,_)=element
    counts=accumlator.get(id,[])
    counts.append(1)
    accumlator[id]=counts
    return accumlator

  def merge_accumulators(self, accumlators,*args, **kwargs):
    merged_accumlator = {}
    for accumlator in accumlators:
      for id,counts in accumlator.items():
        merged_counts=merged_accumlator.get(id,[])
        merged_counts.extend(counts)
        merged_accumlator[id]=merged_counts
    return merged_accumlator

  def extract_output(self, merged_counts,*args, **kwargs):
    return [ (id,values) for id,values in merged_counts.items() ]


with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("dept_data.txt")
              | beam.ParDo(SplitRow())
              | beam.ParDo(lambda row: [row] if row[3]=="Accounts" else None)
              | beam.ParDo(PairEmployees())
              | beam.CombineGlobally(GroupByKey())
              | beam.ParDo(Counting())
              | beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)



# Composite Transform

- This is to group a bunch of transforms into one transform
- We do this by extending PTransform class which is base class for all the transforms in Beam


In [174]:
import apache_beam as beam

class MyTransform(beam.PTransform):

  def expand(self, input):
    output = (
    input
      | "Filter on count > 30" >> beam.Filter(lambda record: record[1]>30)
      | "Tags With Employee" >> beam.Map(lambda record: (*record,'Employee'))
      | "Map Record to String" >> beam.Map(lambda employee_count: str(employee_count))
    )
    return output





 # Creating Pipeline with context

with beam.Pipeline() as pipeline:

  records = (pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(',')))

  accounts_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by Accounts Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With Accounts ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For Accounts" >> beam.CombinePerKey(sum)
      | "Accounts Transform" >> MyTransform()
  )
  hr_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by HR Department" >> beam.Filter(lambda record: record[3]=='HR')
      | "Create Record With HR ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For HR" >> beam.CombinePerKey(sum)
      | "HR Tranform" >> MyTransform()
  )
  output=(
      (accounts_count,hr_count)
      | beam.Flatten()
      | "Write to File" >> beam.io.WriteToText('data/output')
  )


# visualize input
!{('head -n 20 data/output*')}



('Marco', 31, 'Employee')
('Rebekah', 31, 'Employee')
('Itoe', 31, 'Employee')
('Edouard', 31, 'Employee')
('Kyle', 62, 'Employee')
('Kumiko', 31, 'Employee')
('Gaston', 31, 'Employee')
('Beryl', 62, 'Employee')
('Olga', 31, 'Employee')
('Leslie', 31, 'Employee')
('Mindy', 31, 'Employee')
('Vicky', 31, 'Employee')
('Richard', 31, 'Employee')
('Kirk', 31, 'Employee')
('Kaori', 31, 'Employee')
('Oscar', 31, 'Employee')
