<a href="https://colab.research.google.com/github/prateekjoshi2013/apache-beam-python-primer/blob/main/Apache_Beam_Primer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## Installation Apache Beam

In [6]:
!{'pip install --quiet apache_beam'}

## Creating Folders and Uploading Files

In [None]:
!{'mkdir -p data'}

In [None]:
!ls

data  sample_data


In [None]:
from google.colab import files
uploaded=files.upload()

Saving dept_data.txt to dept_data.txt


In [None]:
!ls

data  dept_data.txt  sample_data


## Read Transforms: Reading Data

 ### Read From Text

  - **ReadFromText(...)** : Parses a text file as newline delimited elements i.e it reads the file line by line every line is a single elelment in pCollection

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **compression_type(str)**: we can specify compression type here if input files are compressed
  - **strip_trailing_newlines(boolean)**: whether source should remove the new line char (/n) from each line before reading it, By default true.
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **skip_header_lines(int)**: specifies the number of header lines to be skipped, By default 0




 ### Read From Avro

  - **ReadFromAvro(...)** : Parses a avro files

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **use_fastavro(boolean)**:  when set to true, uses the 'fastavro' library to read avro files


 ### Read From Parquet

  - **ReadFromParquet(...)** : Parses parquet files

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **min_bundle_size(int)**: minimum size bundles splitting source into chunks for parallel processing
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **columns(list[str])**: Specifies the list of columns that will be read from the input files



 ### Read From Tensor Flow records

  - **ReadFromTFRecord(...)** : Parses tensor flow records which are in binary format

 #### Parameters

  - **file_pattern(str)**: **mandatory** param specifies the input filepath/s can contain globs for ex. **my_input\*** will read all the files prefixed with my_input
  - **validate(boolean)**: verify the presence of file during pipeline creation. By default true
  - **compression_type(str)**: we can specify compression type here if input files are compressed
  - **coder(str)**: Specifies the coder name to decode the input record .By default 'bytesCoder'.

 ### Read From Pub Sub GCP

  - **ReadFromPubSub(...)** : Used to read messages from Google PubSub service

 #### Parameters

  - **topic(str)**: param specifies the topic name where the messages are getting published and beam has to read it
  - **subscription(str)**: Specifies the subscription from where beam has to read the messages.  **either topic or subscription should be provided and not both**

  - **id_label(str)**: Provide the attribute from incoming messages which should be considered as unique identifier
  - **with_attributes(boolean)**: if set to true then output elements will be of type objects otherwise of type bytes, False by default
  - **timestamp_attributes(int)**: Specifies the value to be used as element timestamp.Specified argument should be of milliseconds since unix epoch.

### Other Read Transforms :

#### Available

- Other readers like kinesis, redis might be dependent on jre

- like : kinesis, jdbc, redis etc



## Create Transform : Generating Data

In [None]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create([
        'Using create transform',
        'to generate in memory data',
        'This is a third line',
        'Thanks',
    ])
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}




Using create transform
to generate in memory data
This is a third line
Thanks


In [None]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create([
        ("maths",52),
        ("english",52),
        ("science",52),
        ("computer",52),
        ("maths",52),
    ])
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('maths', 52)
('english', 52)
('science', 52)
('computer', 52)
('maths', 52)


In [None]:
import apache_beam as beam

pipeline=beam.Pipeline()
lines=(
    pipeline
    | beam.Create({
        'row1':[1,2,3,4,5],
        'row2':[1,2,3,4,5],
        'row3':[1,2,3,4,5],
    })
    | beam.io.WriteToText('data/output')
)
pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('row1', [1, 2, 3, 4, 5])
('row2', [1, 2, 3, 4, 5])
('row3', [1, 2, 3, 4, 5])


# Write Transforms : Writng Files

## Write To Text

- **WriteToText(...)** : Writes each element of the PCollection as a single line in the output file

### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **append_trailing_newlines(boolean)** : if the lines should be delimmited with newline,By default true
- **coder(str)** : Specifies the coder name to encode each line
- **header(str)** : Specifies the header line of output




## Write To Avro

- **WriteToAvro(...)**: Writes each element of the pCollection to Avro Files

### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **schema** : the schema to use for writing as returned by avro.schema.Parse
- **codec** : the codec used for block level compression type of the Output file.
- **compression_type(str)** : Specifies the compression type of the output file
- **use_fastavro(boolean)** : when set to true, uses the 'fastavro' library to write the Avro file.
- **mime_type** : MIME type to use for the produced output files.




## Write To Parquet

- **WriteToParquet(...)**: Writes each element of the pCollection to Parquet Files


### Parameters

- **file_path_prefix(str)** : this is mandatory param specifies the file path to write the pCollection to.The files will be written with specified prefix
- **file_name_suffix(str)** : specifies the suffix of the output file name
- **num_shards(int)** : specifies no. of shards of files written as output.If not specified the framework decides an optimal number.Should not be specified until you explicitly want a particular number of output files
- **schema** : the schema to use for writing as returned by avro.schema.Parse
- **codec** : the codec used for block level compression type of the Output file.
- **mime_type** : MIME type to use for the produced output files.
- **row_group_buffer_size** : Specifies the byte size of the row group buffer. Default '67108864'
- **record_batch_size** : Specifies the number of records in each record batch. Default '1000'

## Write To PubSub

- **WriteToPubSub(...)** : Writes each element of the PCollection to the Google cloud PubSub service

### Parameters

- **topic(str)** : Topic name to which the message is published to.
- **with_attributes(boolean)** : If set to true then input elements will be of type 'Objects' , otherwise of type 'Bytes'. By default False
- **id_label** : If set will set an attribute for each Cloud Pub/Sub message with the given name and a unique value.
- **timestamp_attribute(int)** : if set will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value

# Typical Apache Beam Code

In [None]:
import apache_beam as beam

 # Creating and giving pipeline a name
pipeline=beam.Pipeline()

attendance_count=(
    # Each PTransform produces new PCollection
    pipeline
    # reader
    | beam.io.ReadFromText('dept_data.txt')
    # PTransform Map (["id,name,salary,dept,date"])->(id,name,salary,dept,date)
    | beam.Map(lambda record : record.split(','))
    # PTransform Filter ([id,name,salary,dept,date]) where dept=='Accounts'
    | beam.Filter(lambda record: record[3]=='Accounts')
    # PTransform Map ([id,name,salary,dept,date])->(ID,1)
    | beam.Map(lambda record: (record[1],1))
    # PTransform Group By ID
    | beam.CombinePerKey(sum)
    # PTransform Map (id,count_total) -> ("id,total_count")
    | beam.Map(lambda employee_count: str(employee_count))
    # writer
    | beam.io.WriteToText('data/output')
)

pipeline.run()

# visualize input
!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


# Better Pipeline Code with Context and Annotation

In [None]:
import apache_beam as beam



 # Creating Pipeline with context

with beam.Pipeline() as pipeline:
  attendance_count=(
      # Each PTransform produces new PCollection
      pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(','))
      | "Filter by Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records" >> beam.CombinePerKey(sum)
      | "Record to String" >> beam.Map(lambda employee_count: str(employee_count))
      | "Write to File" >> beam.io.WriteToText('data/output')
  )

# visualize input
!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


# Branching And Merging Pipelines

- We can split pipeline by using intermdeiate pCollection
- We can merge pCollection by using **flatten**

In [None]:
import apache_beam as beam



 # Creating Pipeline with context

with beam.Pipeline() as pipeline:

  records = (pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(',')))

  accounts_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by Accounts Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With Accounts ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For Accounts" >> beam.CombinePerKey(sum)
      | "Tag With Accounts department" >> beam.Map(lambda record: (*record,'Accounts'))
      | "Accounts Record to String" >> beam.Map(lambda employee_count: str(employee_count))
  )
  hr_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by HR Department" >> beam.Filter(lambda record: record[3]=='HR')
      | "Create Record With HR ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For HR" >> beam.CombinePerKey(sum)
      | "Tag With HR department" >> beam.Map(lambda record: (*record,'HR'))
      | "HR Record to String" >> beam.Map(lambda employee_count: str(employee_count))
  )
  output=(
      (accounts_count,hr_count)
      | beam.Flatten()
      | "Write to File" >> beam.io.WriteToText('data/output')
  )


# visualize input
!{('head -n 20 data/output*')}



PCollection[[77]: Accounts Record to String.None]
('Marco', 31, 'Accounts')
('Rebekah', 31, 'Accounts')
('Itoe', 31, 'Accounts')
('Edouard', 31, 'Accounts')
('Kyle', 62, 'Accounts')
('Kumiko', 31, 'Accounts')
('Gaston', 31, 'Accounts')
('Ayumi', 30, 'Accounts')
('Beryl', 62, 'HR')
('Olga', 31, 'HR')
('Leslie', 31, 'HR')
('Mindy', 31, 'HR')
('Vicky', 31, 'HR')
('Richard', 31, 'HR')
('Kirk', 31, 'HR')
('Kaori', 31, 'HR')
('Oscar', 31, 'HR')


# Word Count Example

In [None]:
# import file

from google.colab import files
uploaded=files.upload()

In [None]:
# visualize input
!{('head -n 20 data.txt')}

	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:

DUKE OF BURGUNDY	(BURGUNDY:)

DUKE OF CORNWALL	(CORNWALL:)

DUKE OF ALBANY	(ALBANY:)

EARL OF KENT	(KENT:)

EARL OF GLOUCESTER	(GLOUCESTER:)



In [None]:

import apache_beam as beam
import re

other_than_non_space_char_pattern = r'[^a-zA-Z]+'
non_space_char_word = r'^\S+$'
# Creating Pipeline with context
with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("data.txt")
              | beam.FlatMap(lambda row: re.split(other_than_non_space_char_pattern, row))
              | beam.Filter(lambda word: bool(re.match(non_space_char_word, word)))
              | beam.Map(lambda word: (word,1))
              | beam.CombinePerKey(sum)
              | beam.Map(lambda row: ', '.join([ str(r) for r in row]))
              | "Write to File" >> beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



KING, 243
LEAR, 236
DRAMATIS, 1
PERSONAE, 1
king, 65
of, 447
Britain, 2
OF, 15
FRANCE, 10
DUKE, 3
BURGUNDY, 8
CORNWALL, 63
ALBANY, 68
EARL, 2
KENT, 156
GLOUCESTER, 143
EDGAR, 126
son, 29
to, 440
Gloucester, 36


# ParDo Transform

- A ParDo transform takes each element of input Pcollection, performs processing function on it and emits 0,1 or multiple elements

- Map, FlatMap and Filter are special cases of ParDo function
- **Returns a generator of elements on which next is called by the next transform**

## Functionalities

- **Filtering** : ParDo can take each element of Pcollection and decide either to output or discard it.
- **Formatting or Type conversion** : ParDo can change the type or format of input elements
- **Extracting individual parts** : ParDo can be used to extract individual elements from a single element.
- **Computations** : ParDo can perform any processing function on the input elements and outputs a Pcollection



In [None]:
import apache_beam as beam


# Acts as a map function
class SplitRow(beam.DoFn):

  def process(self,element):
    #return type -> list
    return [element.split(',')]

class PairEmployees(beam.DoFn):
  def process(self,element):
    return [(element[1],1)]

class Counting(beam.DoFn):

  def process(self,element):
    (key,values)=element
    return [(key,sum(values))]

with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("dept_data.txt")
              | beam.ParDo(SplitRow())
              | beam.ParDo(lambda row: [row] if row[3]=="Accounts" else None)
              | beam.ParDo(PairEmployees())
              | beam.GroupByKey()
              | beam.ParDo(Counting())
              | "Write to File" >> beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


## Side Inputs

- Additional data provided to a DoFn object
- Can be provided to ParDo or its child Transforms Map and FlatMap
- sided inputs need to be determined at runtime of the pipeline
- side inputs should be smaller in size then the actual input


In [None]:
from math import inf
import apache_beam as beam

'''
  Do the attendance count with below conditions

  - Accounts dept employees excluding few in exclusions.txt
  - Also the employee name's length be between 3-10
'''


side_list=list()
with open('exclusions.txt','r') as exclusions_file:
  for line in exclusions_file:
    side_list.append(line.rstrip())


class FilterUsingLength(beam.DoFn):

  def process(self,element,exclusion_list,lower_bound,upper_bound=inf):
    id=element.split(',')[0]
    name=element.split(',')[1]
    element_list=element.split(',')
    if lower_bound <= len(name) <= upper_bound and id not in side_list:
      return [element_list]


with beam.Pipeline() as pipeline:
  (pipeline
    | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
    | "ParDo with side inputs" >> beam.ParDo(FilterUsingLength(),side_list,3,10)
    | "Filter on account Dept" >> beam.Filter(lambda record: record[3]=='Accounts')
    | "Map to count record" >> beam.Map(lambda record: ((record[0],record[1]),1))
    | "Combine By Id and sum the 1 count" >> beam.CombinePerKey(sum)
    | "Write results" >> beam.io.WriteToText('data/output')
  )


!{('head -n 20 data/output*')}



(('503996WI', 'Edouard'), 31)
(('957149WC', 'Kyle'), 31)
(('241316NX', 'Kumiko'), 31)
(('796656IE', 'Gaston'), 31)
(('718737IX', 'Ayumi'), 30)


## Branching using Additional Outputs






In [None]:
import apache_beam as beam

# DoFn implementation
class ProcessWords(beam.DoFn):
  def process(self,element,cutoff_length,marker):
    name=element.split(',')[1]
    if name.startswith(marker):
      return [name]
    elif len(name) <= cutoff_length:
      return [beam.pvalue.TaggedOutput('Short_Names',name)]
    else:
      return [beam.pvalue.TaggedOutput('Long_Names',name)]


with beam.Pipeline() as pipeline:
  results=(
      pipeline
      | beam.io.ReadFromText('dept_data.txt')
      | beam.ParDo(ProcessWords(), cutoff_length=4, marker='K').with_outputs('Short_Names','Long_Names',main='K')
  )
  short_collection= results.Short_Names
  short_collection | "write short names" >> beam.io.WriteToText('data/short-names')
  long_collection= results.Long_Names
  long_collection | "write long names" >> beam.io.WriteToText('data/long-names')
  startK_collection= results.K
  startK_collection | "write names starting with K" >> beam.io.WriteToText('data/names_starting')

print("----short names----")
!{('head -n 20 data/short-names*')}
print("----long names-----")
!{('head -n 20 data/long-names*')}
print("----names starting with k-----")
!{('head -n 20 data/names_starting*')}





----short names----
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
Itoe
Olga
----long names-----
Marco
Rebekah
Edouard
Gaston
Beryl
Leslie
Mindy
Vicky
Richard
Beryl
Oscar
Wendy
Cristobal
Erika
Sebastien
Valerie
Dolly
Emily
Hitomi
Marco
----names starting with k-----
==> data/names_starting-00000-of-00001 <==
Kyle
Kyle
Kumiko
Kirk
Kaori
Kumiko
Kaori
Kyle
Kyle
Kumiko
Kirk
Kaori
Kumiko
Kaori
Kyle
Kyle
Kumiko
Kirk
Kaori
Kumiko

==> data/names_starting_a-00000-of-00001 <==


# Combiner Transform

- **create_accumlator** : Creates a new local accumlator in each machine, keeps a record of (sum,counts)

- **add_input** : adds am input element to accumlator, returning new (sum,count) value

- **merge_accumlators** : merges all machines accumlators into a single one. In our case all the (sum, counts) from various machines are gathered and summed up.

- **extract_output** : Performs the final computation on the merge_accumlator result. Called only once on the merge_accumlator result

# Example

```python
class SumFn(beam.CombineFn):
    def create_accumulator(self):
        return 0  # Initialize accumulator to 0

    def add_input(self, accumulator, element):
        return accumulator + element  # Incrementally add input element to accumulator

    def merge_accumulators(self, accumulators):
        return sum(accumulators)  # Merge multiple accumulators into one

    def extract_output(self, accumulator):
        return accumulator  # Return the final accumulated sum

```



In [None]:
import apache_beam as beam


# Acts as a map function
class SplitRow(beam.DoFn):

  def process(self,element):
    #return type -> list
    return [element.split(',')]

class PairEmployees(beam.DoFn):
  def process(self,element):
    return [(element[1],1)]


## Modified Counting
class Counting(beam.DoFn):
  def process(self,element):
    result=[]
    for id,values in element:
      result.append((id,sum(values)))

    return result


## Custom Combiner
class GroupByKey(beam.CombineFn):

  def create_accumulator(self):
    return {}

  def add_input(self, accumlator,element,*args, **kwargs):
    (id,_)=element
    counts=accumlator.get(id,[])
    counts.append(1)
    accumlator[id]=counts
    return accumlator

  def merge_accumulators(self, accumlators,*args, **kwargs):
    merged_accumlator = {}
    for accumlator in accumlators:
      for id,counts in accumlator.items():
        merged_counts=merged_accumlator.get(id,[])
        merged_counts.extend(counts)
        merged_accumlator[id]=merged_counts
    return merged_accumlator

  def extract_output(self, merged_counts,*args, **kwargs):
    return [ (id,values) for id,values in merged_counts.items() ]


with beam.Pipeline() as pipeline:
  imported=(pipeline
              | beam.io.ReadFromText("dept_data.txt")
              | beam.ParDo(SplitRow())
              | beam.ParDo(lambda row: [row] if row[3]=="Accounts" else None)
              | beam.ParDo(PairEmployees())
              | beam.CombineGlobally(GroupByKey())
              | beam.ParDo(Counting())
              | beam.io.WriteToText('data/output')
            )

!{('head -n 20 data/output*')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)



# Composite Transform

- This is to group a bunch of transforms into one transform
- We do this by extending PTransform class which is base class for all the transforms in Beam


In [None]:
import apache_beam as beam

class MyTransform(beam.PTransform):

  def expand(self, input):
    output = (
    input
      | "Filter on count > 30" >> beam.Filter(lambda record: record[1]>30)
      | "Tags With Employee" >> beam.Map(lambda record: (*record,'Employee'))
      | "Map Record to String" >> beam.Map(lambda employee_count: str(employee_count))
    )
    return output





 # Creating Pipeline with context

with beam.Pipeline() as pipeline:

  records = (pipeline
      | "Reader" >> beam.io.ReadFromText('dept_data.txt')
      | "Split into records" >> beam.Map(lambda record : record.split(',')))

  accounts_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by Accounts Department" >> beam.Filter(lambda record: record[3]=='Accounts')
      | "Create Record With Accounts ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For Accounts" >> beam.CombinePerKey(sum)
      | "Accounts Transform" >> MyTransform()
  )
  hr_count=(
      # Each PTransform produces new PCollection
      records
      | "Filter by HR Department" >> beam.Filter(lambda record: record[3]=='HR')
      | "Create Record With HR ID and identity" >> beam.Map(lambda record: (record[1],1))
      | "Group By Id and Sum Identity Records For HR" >> beam.CombinePerKey(sum)
      | "HR Tranform" >> MyTransform()
  )
  output=(
      (accounts_count,hr_count)
      | beam.Flatten()
      | "Write to File" >> beam.io.WriteToText('data/output')
  )


# visualize input
!{('head -n 20 data/output*')}



('Marco', 31, 'Employee')
('Rebekah', 31, 'Employee')
('Itoe', 31, 'Employee')
('Edouard', 31, 'Employee')
('Kyle', 62, 'Employee')
('Kumiko', 31, 'Employee')
('Gaston', 31, 'Employee')
('Beryl', 62, 'Employee')
('Olga', 31, 'Employee')
('Leslie', 31, 'Employee')
('Mindy', 31, 'Employee')
('Vicky', 31, 'Employee')
('Richard', 31, 'Employee')
('Kirk', 31, 'Employee')
('Kaori', 31, 'Employee')
('Oscar', 31, 'Employee')


# CoGroupByKey

- Relational join of two or more key/value Pcollections
- Accepts a dictionary of key/value Pcollections and outputs a single Pcollection containing 1 key/value Tuple for each key in the input Pcollections


In [None]:
from google.colab import files
uploaded=files.upload()

Saving dept_data.txt to dept_data.txt
Saving location.txt to location.txt


In [None]:
import apache_beam as beam


with beam.Pipeline() as pipeline:

  dep_rows= (
    pipeline
    | "reading file 1" >> beam.io.ReadFromText('dept_data.txt')
    | "Pair each employee with key" >> beam.Map(lambda row: (row.split(",")[0],row.split(",")[1:]))
  )

  loc_rows= (
    pipeline
    | "reading file 2" >> beam.io.ReadFromText('location.txt')
    | "Pair each location with key" >> beam.Map(lambda row: (row.split(",")[0],row.split(",")[1:]))
  )

  joined_rows=(
    {'dep_data': dep_rows, 'loc_data': loc_rows}
    | "joining employee records with locations" >> beam.CoGroupByKey()
    | "Write results" >> beam.io.WriteToText('data/output')
  )


# visualize input
!{('head -n 2 data/output*')}



('149633CM', {'dep_data': [['Marco', '10', 'Accounts', '1-01-2019'], ['Marco', '10', 'Accounts', '2-01-2019'], ['Marco', '10', 'Accounts', '3-01-2019'], ['Marco', '10', 'Accounts', '4-01-2019'], ['Marco', '10', 'Accounts', '5-01-2019'], ['Marco', '10', 'Accounts', '6-01-2019'], ['Marco', '10', 'Accounts', '7-01-2019'], ['Marco', '10', 'Accounts', '8-01-2019'], ['Marco', '10', 'Accounts', '9-01-2019'], ['Marco', '10', 'Accounts', '10-01-2019'], ['Marco', '10', 'Accounts', '11-01-2019'], ['Marco', '10', 'Accounts', '12-01-2019'], ['Marco', '10', 'Accounts', '13-01-2019'], ['Marco', '10', 'Accounts', '14-01-2019'], ['Marco', '10', 'Accounts', '15-01-2019'], ['Marco', '10', 'Accounts', '16-01-2019'], ['Marco', '10', 'Accounts', '17-01-2019'], ['Marco', '10', 'Accounts', '18-01-2019'], ['Marco', '10', 'Accounts', '19-01-2019'], ['Marco', '10', 'Accounts', '20-01-2019'], ['Marco', '10', 'Accounts', '21-01-2019'], ['Marco', '10', 'Accounts', '22-01-2019'], ['Marco', '10', 'Accounts', '23-01-2


# Union Transform : Flatten

- In Apache Beam, the Flatten transformation is used to merge multiple PCollections into a single PCollection.

- It is similar to the SQL UNION ALL operation, where all elements from the input collections are combined into one output collection without removing duplicates.

```python
  # medical_loan_defaulters, personal_loan_defaulters two Pcollections
  
  (
    (medical_loan_defaulters,personal_loan_defaulters)
    | "flatten map" >> beam.Flatten()
    | "Write loan defaulters" >> beam.io.WriteToText('data/output-loan-defaulters')
  )
```

# Bank Use Case


## Credit card skippers/defaulters:

- Assign 1 point to customer for short payment, where a short payment means when customer fails to clear atleast 70% of its monthly spends.

- Assign 1 point to customer where he has spent 100% of his max_limit but did not clear the full amount.

- If for any month customer is meeting both the above conditions,assign 1 additional point.

- Sum up all the points for a customer and output in file.

In [None]:
from google.colab import files
uploaded=files.upload()
!{('head -n 2 cards.txt')}

Saving cards.txt to cards (1).txt
#Customer_id, First_name,Last_name,Relationship_no.,Card_type,Max_credit_limit,Total_Spent,Cash_withdrawn,Cleared_amount,Last_date
CT28383,Miyako,Burns,R_7488,Issuers,500,490,38,101,30-01-2018


In [None]:
import apache_beam as beam

def map_card_defaulter(row):
  (
    customer_id,
    first_name,
    last_name,
    relationship_no,
    card_type,
    max_credit_limit,
    total_spent,
    cash_withdrawn,
    cleared_amount,
    last_date
  )=row.split(",")
  defaulter_points=0
  defaulter_points += (1 if float(cleared_amount) < 0.7*float(total_spent) else 0)
  defaulter_points += (1 if float(total_spent) >= float(max_credit_limit) and float(cleared_amount) < float(total_spent) else 0)
  defaulter_points += (1 if int(defaulter_points) > 2 else 0)
  return (
    (
      customer_id,
      first_name,
      last_name
    ),
    defaulter_points
  )





with beam.Pipeline() as pipeline:
  (pipeline
  | "reading file cards.txt" >> beam.io.ReadFromText('cards.txt',skip_header_lines=1)
  | "add defualter points" >> beam.Map(map_card_defaulter)
  | "Group by sum" >> beam.CombinePerKey(sum)
  # | "output" >> beam.Map(lambda row: print(row))
  | "output" >>  beam.Map(lambda row: " ".join([row[0][0],row[0][1],row[0][2],str(row[-1])]))
  | "Write results" >> beam.io.WriteToText('data/output')
  )

# visualize input
!{('head -n 2 data/output*')}



CT28383 Miyako Burns 3
CT74474 Nanaho Brennan 3


## Loan file key points:

- For Personal loan category, Bank does not accept short or late payments.
- If a person has not paid monthly installment then that month's entry won't be present in the file.
- For Medical loan, Bank does accepts late payments but it should be the full amount.
- It is assumed that there is every month's data/record for Medical Loan.
       
## Loan defaulters:

- Medical Loan defaulters : If customer has made a total of 3 or more late payments.

- Personal Loan defaulters : If customer has missed a total of 4 or more installments OR missed 2 consecutive installments.

In [None]:
from google.colab import files
uploaded=files.upload()
!{('head -n 2 loan.txt')}

Saving loan.txt to loan.txt
#customer_id, first_name,last_name,customer_category,loan_id, loan_category, due_date, due_amount, payment_date
CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018, 2000, 30-01-2018


In [None]:
import apache_beam as beam
from datetime import datetime

date_format = "%d-%m-%Y"

class CollectLoanRecordsPerPerson(beam.CombineFn):
    def create_accumulator(self):
      return [] # Initialize accumulator to 0

    def add_input(self, accumulator, element):
      accumulator.append(element)
      return accumulator  # Incrementally add input element to accumulator

    def merge_accumulators(self, accumulators):
      return accumulators


    def extract_output(self, accumulator):
      return accumulator  # Return the final accumulated sum



# DoFn implementation
class ProcessRecords(beam.DoFn):
  def process(self,element):
    (
        customer_id,
        first_name,
        last_name,
        customer_category,
        loan_id,
        loan_category,
        due_date,
        due_amount,
        payment_date
    )=element.split(',')
    due_date=datetime.strptime(due_date.strip(), date_format)
    payment_date=datetime.strptime(payment_date.strip(), date_format)
    record=((customer_id, first_name, last_name),(due_date,payment_date,1 if due_date<payment_date else 0))
    if loan_category=='Medical Loan':
      return [beam.pvalue.TaggedOutput('medical_loan',record)]
    else:
      return [record]

def find_personal_loan_defaulter(row):
  personal_detail,payments=row
  prev=None
  count=0
  total=0
  map={ i:1 for i in range(1,13)}
  for month,missed in payments:
    map[month]=missed
  if sum(map.values())>4:
    return row
  else:
    for curr_month in range(2,13):
      if map[curr_month] and map.get(curr_month-1,1):
        return row
    return None


with beam.Pipeline() as pipeline:
  results = (pipeline
              | "split string" >> beam.io.ReadFromText('loan.txt',skip_header_lines=1)
              | "map rows " >> beam.ParDo(ProcessRecords()).with_outputs('medical_loan',main='personal_loan')
            )
  medical_loans=results.medical_loan
  personal_loans=results.personal_loan
  medical_loan_defaulters=(
    medical_loans
    | "map to personal_details,defaulter_point" >> beam.Map(lambda row:(row[0],row[1][2]))
    | "collect medical loans per person" >> beam.CombinePerKey(sum)
    | "filter potential medical loans defaulters" >> beam.Filter(lambda row: row[-1]>=3)
    | "medical loans names" >> beam.Map(lambda row: " ".join(row[0]))
    | "Write medical loans defaulters" >> beam.io.WriteToText('data/output-medical-loan-defaulters')
  )

  personal_loan_defaulters=(
    personal_loans
    | "create monthly mapper" >> beam.Map(lambda row: (row[0],(row[1][0].month,row[1][-1])))
    | "collect personal loans per person" >> beam.CombinePerKey(CollectLoanRecordsPerPerson())
    | "collect personal loans per person all months" >> beam.Map(lambda row: (row[0],row[1][0]))
    | "filter defaulters" >> beam.Filter(find_personal_loan_defaulter)
    | "personal loans names" >> beam.Map(lambda row: " ".join(row[0]))
  )

  (
    (medical_loan_defaulters,personal_loan_defaulters)
    | "flatten map" >> beam.Flatten()
    | "Write loan defaulters" >> beam.io.WriteToText('data/output-loan-defaulters')
  )


# visualize input
!{('head -n 2 data/output-loan-defaulters*')}



CT68554 Ronald Chiki
CT74421 Fabian Browning


# Type Safety

- Type Safety is prevention of typed errors in a programming language where type error means when someone attempts to perform an operation on a value that does not support that operation

- We can provide TypeHints to provide type safety

## Types of typehints

- **Inline**: Provided during pipeline construction (on Transforms).
- **Outline**: Provided as properties of the DoFn using decorators.

## Type Hint type:

- Simple Typehint: Includes primitive types like **`int`**, **`str`**, **`user defined classes`**
- Parameterized Typehint: Includes nested types, basically for container Python objects for ex. **`List`**, **`Tuple`**, **`List[Tuple[int,str,str]]`**
- Special Typehint: Includes those special types which are introduced in PEP 484 for ex. **`Any`**, **`Union`**, **`Optional[T]`**


In [30]:
import apache_beam as beam
from typing import List


@beam.typehints.with_output_types(str) # outline output typing
@beam.typehints.with_input_types(int) # inline output typing
class IntToStr(beam.DoFn):
  def process(self,element):
    return str(element)

with beam.Pipeline() as pipeline:
  (
    pipeline
    | beam.Create([1,2,3,4,5])
    | beam.Filter(lambda x: x%2==0 ).with_input_types(int).with_output_types(int) # inline input and output typing
    | beam.ParDo(IntToStr())
    | beam.Map(lambda x: print(x)).with_input_types(str).with_output_types(None) # inline input and output typing
  )

2
4


# Data Encoding in Beam

- **Coders** encode and decode the elements of a Pcollection
- **CoderRegistry** maps the types to their default coders.
- Coders do not necessarily have one to one relationships with types for ex. int type can be coded/endcoded by both VarIntCoder and FloatCoder

|Python Type|Default Coder|
|----|----|
|int|VarIntCoder|
|float|FloatCoder|
|str|BytesCoder|
|bytes|StrUtf8Coder|
|tuple|TupleCoder|


In [5]:
from apache_beam import coders

print("default int registry: ",coders.registry.get_coder(int))
coders.registry.register_coder(int,coders.BigIntegerCoder)
print("newly set int registry: ",coders.registry.get_coder(int))
coders.registry.register_coder(int,coders.VarIntCoder)
print("reverted to default int registry: ",coders.registry.get_coder(int))

default int registry:  BigIntegerCoder
newly set int registry:  BigIntegerCoder
reverted to default int registry:  VarIntCoder


## Custom Coders for Custom DataTypes

- For custom Data Types we need to define Custom Coders
- For defining custom coders we need to subclass `apache_beam.coders.Coder` and override these methods

- **`encode(self, value)`**: This method defines how to serialize your custom data type into a byte representation.
- **`decode(self, encoded)`**: This method defines how to deserialize the byte representation back into your custom data type.
- **`is_deterministic(self)`**: This method indicates whether the coder always produces the same encoded output for the same input value. It should return True if the coder is deterministic, otherwise False.
  - A deterministic coder is usually required for group by operations as it relies on keys to group records and hence needs the keys to be consistent keys
- **`estimate_size(self, value)`**: This method returns the estimated size of the encoded object which helps the runners optimize the pipelines
- **`to_type_hint(self)`** (Optional): This method returns the type hint for the values that this coder encodes and decodes. This can help with type inference and checking within the pipeline.


```python
import apache_beam as beam
from apache_beam.coders import Coder

class StrUtf8Coder(Coder):
    """A coder for encoding and decoding strings as UTF-8."""

    def encode(self, value):
        if not isinstance(value, str):
            raise ValueError("StrUtf8Coder can only encode strings")
        return value.encode('utf-8')

    def decode(self, encoded):
        return encoded.decode('utf-8')

    def is_deterministic(self):
        return True

    def estimate_size(self, value):
        return len(self.encode(value))

    def to_type_hint(self):
        return str
```

In [38]:
from google.colab import files
uploaded=files.upload()
!{('head -n 2 dept_data.txt')}

Saving dept_data.txt to dept_data (1).txt
149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019


In [44]:
import apache_beam as beam
import typing

class Employee():
  def __init__(self,id,name):
    self.id=id
    self.name=name

  def __repr__(self):
    return f"Employee(id={self.id},name={self.name})"


class EmployeeCoder(beam.coders.Coder):

  def encode(self,employee):
    return (f"{employee.id}:{employee.name}").encode("utf-8")

  def decode(self,s):
    return Employee(*s.decode("utf-8").split(":"))

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(Employee,EmployeeCoder)

with beam.Pipeline() as pipeline:
  (
    pipeline
    | beam.io.ReadFromText('dept_data.txt')
    | beam.Map(lambda row: (Employee(row.split(",")[0],row.split(",")[1]),int(row.split(",")[2])))
    | beam.CombinePerKey(sum).with_input_types(typing.Tuple[Employee,int])
    | beam.Map(lambda row: print(row))
  )


(Employee(id=149633CM,name=Marco), 310)
(Employee(id=212539MU,name=Rebekah), 310)
(Employee(id=231555ZZ,name=Itoe), 310)
(Employee(id=503996WI,name=Edouard), 310)
(Employee(id=704275DC,name=Kyle), 310)
(Employee(id=957149WC,name=Kyle), 310)
(Employee(id=241316NX,name=Kumiko), 310)
(Employee(id=796656IE,name=Gaston), 310)
(Employee(id=331593PS,name=Beryl), 620)
(Employee(id=560447WH,name=Olga), 620)
(Employee(id=222997TJ,name=Leslie), 620)
(Employee(id=171752SY,name=Mindy), 620)
(Employee(id=153636AS,name=Vicky), 620)
(Employee(id=745411HT,name=Richard), 620)
(Employee(id=298464HN,name=Kirk), 620)
(Employee(id=783950BW,name=Kaori), 620)
(Employee(id=892691AR,name=Beryl), 620)
(Employee(id=245668UZ,name=Oscar), 620)
(Employee(id=231206QD,name=Kumiko), 930)
(Employee(id=357919KT,name=Wendy), 930)
(Employee(id=472418ZG,name=Cristobal), 930)
(Employee(id=442292OI,name=Erika), 930)
(Employee(id=503647MN,name=Sebastien), 930)
(Employee(id=245319LD,name=Valerie), 930)
(Employee(id=818776XR,nam