In [1]:
!pip install apache_beam

Collecting apache_beam
  Downloading apache_beam-2.58.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache_beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache_beam)
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache_beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.23.6 (from apache_beam)

Building Simple Pipeline

In [2]:
import apache_beam as beam

In [3]:
def is_perennial(plant):
  return plant["duration"] == "perennial"


In [4]:
with beam.Pipeline() as pipeline:
  perennials = (
      pipeline | "Gardening plants" >> beam.Create([
          {
              "icon": "🍓",
              "name": "Strawberry",
              "duration": "perennial"
          },
          {
              "icon": "🥕",
              "name": "Carrot",
              "duration": "biennial"
          },
          {
              "icon": "🍆",
              "name": "Eggplant",
              "duration": "perennial"
          },
          {
              "icon": "🍅",
              "name": "Tomato",
              "duration": "annual"
          },
          {
              "icon": "🥔",
              "name": "Potato",
              "duration": "perennial"
          },
          ])
      | "Filter perennials" >> beam.Filter(is_perennial)
      | beam.Map(print)
  )



{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [5]:
def is_grocerystore(store):
  return store[8] =="Grocery Store"


In [6]:
p2 = beam.Pipeline()

grocery = (p2
           | "Read from text" >> beam.io.ReadFromText("/content/grocery.txt", skip_header_lines=1)
           | "Split record" >> beam.Map(lambda record: record.split(","))
           | "Filter regular" >> beam.Filter(is_grocerystore)
           | "Write to text" >> beam.io.WriteToText("/content/regular_filter.txt")
           | "Print" >> beam.Map(print))
p2.run()

/content/regular_filter.txt-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46cb3eb30>

In [7]:
!ls

grocery.txt  regular_filter.txt-00000-of-00001	sample_data  students.txt


In [8]:
!cat regular_filter.txt-00000-of-00001

PCollection in Apache Beam


●	PCollection: It is an abstraction represents a potentially distributed, multi-element data set. It represents a distributed data set that our beam pipeline operates on.
o	Immutability: Pcollections are immutable in nature. Applying a transformations on a pcollection results in creation of new pcollection.
o	Element type: The elements in pcollection may be of any type, but all must be of same type.
o	Operation type:  Pcollection does not support grained operations. We cannot apply transformations on specific elements in pcollection.
o	Timestamps: Each element in pcollection has an associated timestamp with it.
o	Unbounded pcollections: An unbounded PCollection represents a data set of unlimited size. Source assigns the timestamps.
o	Bounded pcollections: A bounded PCollection represents a data set of a known fixed size. Every element is set to same timestamp.
o	No Random access: Can’t access data using index or some specific element. No size restriction.
o	Ptransform: Ptransform represent a data processing operation, or a step in our pipeline. Ex., Map, Groupby, FlatMap, ParDo, filter, flatten, combine etc.

●	PCollection characteristics:
o	A PCollection is owned by the specific Pipeline object for which it is created; multiple pipelines cannot share a PCollection.
●	Resources:
o	https://beam.apache.org/documentation/programming-guide/#pcollections
o	https://beam.apache.org/releases/pydoc/2.36.0/apache_beam.io.textio.html?highlight=readfromtext#apache_beam.io.textio.ReadFromText


In [9]:
p1= beam.Pipeline()
p1 | beam.io.ReadFromText("/content/grocery.txt", skip_header_lines=1) | beam.Map(print)
p1.run()

FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
FDX07,19.2,Regular,0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
NCD19,8.93,Low Fat,0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
FDP36,10.395,Regular,0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535
FDY07,11.8,Low Fat,

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46e1adc60>

In [10]:
p2= beam.Pipeline()
grocery = (p2
    | "Read from  text" >> beam.io.ReadFromText("/content/grocery.txt", skip_header_lines=1)
    # | beam.Map(lambda record: print(record))
    | "Split the record" >> beam.Map(lambda record: record.split(","))
    # | 'Filter regular' >> beam.Filter(lambda record: record[2]=='Regular')
    | 'Filter regular' >> beam.Filter(lambda record: record[2]=='Low Fat')
    | 'Write to text' >> beam.io.WriteToText('lowfat_filter2.txt')
    | beam.Map(print)
    )
p2.run()

lowfat_filter2.txt-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46cb0a560>

In [11]:
!ls

grocery.txt			   regular_filter.txt-00000-of-00001  students.txt
lowfat_filter2.txt-00000-of-00001  sample_data


In [12]:
!cat regular_filter1.txt-00000-of-00001

cat: regular_filter1.txt-00000-of-00001: No such file or directory


In [13]:
#from external resources
p1 = beam.Pipeline()

grocery = (p1
           | "Read from Text" >> beam.io.ReadFromText("grocery.txt", skip_header_lines=1)
           | "split the record" >> beam.Map(lambda record: record.split(','))
           | 'Filter regular' >> beam.Filter(lambda record: record[2] == 'Regular')
           | 'Write to text'>> beam.io.WriteToText('regular_filter.txt'))  #| beam.Map(print))

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46c970b80>

In [14]:
p3 = beam.Pipeline()
lines = (p3
         | beam.Create([
             "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ",
             "Phasellus maximus laoreet nunc ut euismod. Duis elit dolor, ",
             "Donec a ligula dapibus, dignissim est sit amet, cursus augue.",
             "Phasellus dapibus, odio non faucibus interdum",
             "Sed mattis arcu sed auctor sodales. Nulla facilisi."
             ])
        #  | beam.Map(print)
         | beam.io.WriteToText("testcreate.txt"))

In [15]:
p3.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46c90bc40>

In [16]:
p4 = beam.Pipeline()
lines = (p4
         | beam.Create([
             "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ",
             "Phasellus maximus laoreet nunc ut euismod. Duis elit dolor, ",
             "Donec a ligula dapibus, dignissim est sit amet, cursus augue.",
             "Phasellus dapibus, odio non faucibus interdum",
             "Sed mattis arcu sed auctor sodales. Nulla facilisi."
             ])
        #  | beam.Map(print)
         | beam.io.WriteToText("testcreate.txt"))
p4.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46c934040>

In [17]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create([
               "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ",
               "Phasellus maximus laoreet nunc ut euismod. Duis elit dolor, ",
               "Donec a ligula dapibus, dignissim est sit amet, cursus augue.",
               "Sed mattis arcu sed auctor sodales. Nulla facilisi."
           ])
           | beam.Map(print))
  pipeline.run()

Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Phasellus maximus laoreet nunc ut euismod. Duis elit dolor, 
Donec a ligula dapibus, dignissim est sit amet, cursus augue.
Sed mattis arcu sed auctor sodales. Nulla facilisi.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Phasellus maximus laoreet nunc ut euismod. Duis elit dolor, 
Donec a ligula dapibus, dignissim est sit amet, cursus augue.
Sed mattis arcu sed auctor sodales. Nulla facilisi.


Map, FlatMap and Filter Transforms in Apache Beam


Map:
●	Applies a simple 1-to-1 mapping function over each element in the collection.
FlatMap:
●	Applies a simple 1-to-many mapping function over each element in the collection. The many elements are flattened into the resulting collection.
Filter:
●	Given a predicate, filter out all elements that don’t satisfy that predicate. May also be used to filter based on an inequality with a given value based on the comparison ordering of the element.
Lambda:
●	A lambda function is a small anonymous function. A lambda function can take any number of arguments, but can only have one expression.
o	lambda arguments: expression
Resources:
●	https://beam.apache.org/documentation/transforms/python/elementwise/map/
●	https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/  
●	https://beam.apache.org/documentation/transforms/python/elementwise/filter/#example-2-filtering-with-a-lambda-function


In [18]:
!ls

grocery.txt			   regular_filter.txt-00000-of-00001  students.txt
lowfat_filter2.txt-00000-of-00001  sample_data			      testcreate.txt-00000-of-00001


Map:

● Applies a simple 1-to-1 mapping function over each element in the collection.

In [19]:
def strip_header_and_newline(text):
  return text.strip('# \n')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip_header_and_newline)
      | beam.Map(type)
      # | beam.Filter(lambda record: "berry" in record)
      # | beam.Map(len)
      | beam.Map(print))


<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>



FlatMap


• Applies a simple 1-to-many mapping function over each element in the collection. The many elements are flattened into the resulting collection.

In [20]:
def split_words(text):
  return text.split(',')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(split_words)
      | beam.Map(print))

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato




FlatMapTuple for key-value pairs

If your PCollection consists of (key, value) pairs, you can use FlatMapTuple to unpack them into different function arguments.


In [21]:

def format_plant(icon, plant):
  if icon:
    yield '{}{}'.format(icon, plant)

# def format_plant(record):
#   if record[0]:
#     yield '{}{}'.format(record[0], record[1])

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
          (None, 'Invalid'),
      ])
      | 'Format' >> beam.FlatMapTuple(format_plant)
      # | 'Format' >> beam.FlatMap(format_plant)
      | beam.Map(print))

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato



Filter

Given a predicate, filter out all elements that don’t satisfy that predicate. May also be used to filter based on an inequality with a given value based on the comparison ordering of the element.


In [22]:
def is_perennial(plant):
  return plant['duration'] == 'perennial'

def filter_icon(record):
  return record["icon"] == '🍓'

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      # | 'Filter perennials' >> beam.Filter(is_perennial)
      | 'Filter strawberry icon' >> beam.Filter(filter_icon)
      | beam.Map(print))

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}


In [23]:

#Filtering with multiple arguments

def has_duration(plant, duration):
  return plant['duration'] == duration

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter annual' >> beam.Filter(has_duration, 'annual')
      | beam.Map(print))

{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}


In [24]:
with beam.Pipeline() as pipeline:
  students = (
      pipeline
      |"Read from text" >> beam.io.ReadFromText("students.txt", skip_header_lines= True)
      |"spliting the record" >> beam.Map(lambda record : record.split(','))
      |"filtering the data with PASS" >> beam.Filter(lambda record : record[5]=="PASS")
      |"Write to text" >> beam.io.WriteToText("result/pass_students")
  )


In [25]:
!ls

grocery.txt			   result	 testcreate.txt-00000-of-00001
lowfat_filter2.txt-00000-of-00001  sample_data
regular_filter.txt-00000-of-00001  students.txt


In [26]:
!{('head -n 10 result/pass_students-00000-of-00001')}

['3', 'chandler', 'us', '53', '68', 'PASS']
['4', 'khaula', 'hyd', '26', '99', 'PASS']
['5', 'neethu', 'uae', '27', '100', 'PASS']
['7', 'sai', 'mad', '21', '71', 'PASS']
['8', 'sabari', 'vel', '25', '75', 'PASS']
['10', 'swati', 'ind', '24', '91', 'PASS']


ParDo, Keys, kvswap, Values, ToString Transforms in Apache Beam

Normal way using Tranformation functions

In [27]:
#implementation using normal tranformations
with beam.Pipeline() as pipeline:
  students = (
      pipeline
      | "Read from text" >> beam.io.ReadFromText("/content/students.txt", skip_header_lines=True)
      | "Splitting the records" >> beam.Map(lambda record: record.split(","))
      | "Filtering the data with Fail" >> beam.Filter(lambda record: record[5]=="FAIL")
      | "Write to text">> beam.io.WriteToText("result/fail_students")
  )
  pipeline.run()



In [28]:
!{("head -n 10 /content/result/fail_students-00000-of-00001")}

['1', 'vignesh', 'chn', '27', '15', 'FAIL']
['2', 'joey', 'us', '51', '20', 'FAIL']
['6', 'sree', 'koc', '25', '27', 'FAIL']
['9', 'tinkle', 'ker', '27', '9', 'FAIL']


Using ParDo

In [29]:
class SplitRow(beam.DoFn):
  def process(self, element):
    return [element.split(",")]

class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]

In [30]:
with beam.Pipeline() as pipeline:
  input_data = (pipeline
                | "Read from text" >> beam.io.ReadFromText("/content/students.txt", skip_header_lines=True)
                | "Splitting the record" >> beam.ParDo(SplitRow()))
  data_fail = (input_data
               | "Filter data with fail" >> beam.Filter(lambda record: record[5]=="FAIL"))
  word_lengths = (data_fail
                  | "Count of Record" >> beam.ParDo(ComputeWordLengthFn()))
  counted_data = (word_lengths
                  | "Write counted data to text" >> beam.io.WriteToText("result/data_fail"))
  output_data = (counted_data
                 | "Write to Text" >> beam.io.WriteToText("result/fail_data"))
  pipeline.run()




In [31]:
!{("head -n 10 /content/result/data_fail-00000-of-00001")}

6
6
6
6


Keys

Takes a collection of key-value pairs and returns the key of each element.

In [32]:
with beam.Pipeline() as pipeline:
  icons = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Keys' >> beam.Keys()
      | beam.Map(print))


🍓
🥕
🍆
🍅
🥔


Values

Takes a collection of key-value pairs, and returns the value of each element.

In [33]:
with beam.Pipeline() as pipeline:
  icons = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Keys' >> beam.Values()
      | beam.Map(print))


Strawberry
Carrot
Eggplant
Tomato
Potato


ToString

Transforms every element in an input collection to a string. Any non-string element can be converted to a string using standard Python functions and methods. Many I/O transforms, such as textio.WriteToText, expect their input elements to be strings.

    Key-value pairs to string
    Elements to string
    Iterables to string


In [34]:
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'To string' >> beam.ToString.Kvs()  #Element() #Iterables()
      | beam.Map(print))


🍓,Strawberry
🥕,Carrot
🍆,Eggplant
🍅,Tomato
🥔,Potato


In [35]:
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'To string' >> beam.ToString.Element() #Iterables()
      | beam.Map(print))


('🍓', 'Strawberry')
('🥕', 'Carrot')
('🍆', 'Eggplant')
('🍅', 'Tomato')
('🥔', 'Potato')


In [36]:
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'To string' >> beam.ToString.Iterables()
      | beam.Map(print))


🍓,Strawberry
🥕,Carrot
🍆,Eggplant
🍅,Tomato
🥔,Potato


Kvswap

Takes a collection of key-value pairs and returns a collection of key-value pairs which has each key and value swapped.

In [37]:
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Key-Value swap' >> beam.KvSwap()
      | "Keys" >> beam.Keys()
      | beam.Map(print))

Strawberry
Carrot
Eggplant
Tomato
Potato


In [38]:
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Key-Value swap' >> beam.KvSwap()
      | "Keys" >> beam.Values()
      | beam.Map(print))

🍓
🥕
🍆
🍅
🥔


GroupBy(), GroupByKey(), CoGroupByKey(), GroupIntoBatches() Transforms in Apache Beam


GroupBy:
●	Takes a collection of elements and produces a collection grouped, by properties of those elements.

●	Unlike GroupByKey, the key is dynamically created from the elements themselves.
GroupByKey:

●	GroupByKey is a Beam transform for processing collections of key/value pairs.

●	Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.

●	It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm.

●	For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record).


CoGroupByKey:

●	Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key.

●	While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections.

●	CoGroupByKey expects a dictionary of named keyed PCollections, and produces elements joined by their keys. The values of each output element are dictionaries where the names correspond to the input dictionary, with lists of all the values found for that key.

●	As a result, the result for each key is a tuple of the values associated with that key in each input collection.
GroupIntoBatches:

●	Batches the input into desired batch size.


Resources:
●	https://beam.apache.org/documentation/transforms/python/aggregation/groupby/
●	https://beam.apache.org/documentation/programming-guide/#groupbykey
o	https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/



GroupBy:

  Takes a collection of elements and produces a collection grouped, by properties of those elements.
  Unlike GroupByKey, the key is dynamically created from the elements themselves.



In [39]:
#groups based on first character of record
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
      | beam.GroupBy(lambda s: s[0])
      | beam.Map(print))

('s', ['strawberry'])
('r', ['raspberry'])
('b', ['blueberry', 'blackberry', 'banana'])


Aggregation:

In [40]:
GROCERY_LIST = [
    beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
    beam.Row(recipe='pie', fruit='raspberry', quantity=1, unit_price=3.50),
    beam.Row(recipe='pie', fruit='blackberry', quantity=1, unit_price=4.00),
    beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
    beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
    beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
]


In [41]:
with beam.Pipeline() as p:
  grouped = (p
    | beam.Create(GROCERY_LIST)
    | beam.GroupBy('recipe')
    | beam.Map(print)
  )

('pie', [BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.5), BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='pie', fruit='raspberry', quantity=1, unit_price=3.5), BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='pie', fruit='blackberry', quantity=1, unit_price=4.0), BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.0)])
('muffin', [BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.0), BeamSchema_2aa0b8e0_c78d_4515_b6c8_0114ecc95a50(recipe='muffin', fruit='banana', quantity=3, unit_price=1.0)])


In [42]:
#sum aggregation function
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(GROCERY_LIST)
      | beam.GroupBy('fruit')
          .aggregate_field('quantity', sum, 'total_quantity')
          | beam.Map(print))


Result(fruit='strawberry', total_quantity=3)
Result(fruit='raspberry', total_quantity=1)
Result(fruit='blackberry', total_quantity=1)
Result(fruit='blueberry', total_quantity=3)
Result(fruit='banana', total_quantity=3)


In [43]:

with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(GROCERY_LIST)
      | beam.GroupBy('recipe')
          .aggregate_field('quantity', sum, 'total_quantity')
      | beam.Map(print))

Result(recipe='pie', total_quantity=6)
Result(recipe='muffin', total_quantity=5)



GroupByKey:

  Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.



In [44]:
records = [("vignesh", [27, "engineer"]),
("neethu", [27, "developer"]),
("farooqui", [26, "data analyst"]),
("sai", [29, "web developer"]),
("tinkle", [28, "fullstack developer"]),
("neethu", 'Employed'),
("sai", 'Unemployed'),
("tinkle", 'Employed'),
("farooqui",'Employed'),
("vignesh", 'Unemployed')]

In [45]:
with beam.Pipeline() as pipeline:
  produce_counts = (
      pipeline
      | 'Create produce counts' >> beam.Create(records)
      | 'Group counts per produce' >> beam.GroupByKey()
      | beam.Map(print))

('vignesh', [[27, 'engineer'], 'Unemployed'])
('neethu', [[27, 'developer'], 'Employed'])
('farooqui', [[26, 'data analyst'], 'Employed'])
('sai', [[29, 'web developer'], 'Unemployed'])
('tinkle', [[28, 'fullstack developer'], 'Employed'])



CoGroupByKey:

  Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key.
  
  While GroupByKey performs this operation over a single input collection and thus a single type of input values.
  
  CoGroupByKey operates over multiple input collections. As a result, the result for each key is a tuple of the values associated with that key in each input collection.



In [46]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | 'Create icons' >> beam.Create([
      ('vignesh', 'bangalore'),
      ('khaula', 'hyderabad'),
      ('neethu', 'malapur'),
      ('sai', 'chennai'),
  ])

  student_result = pipeline | 'Create durations' >> beam.Create([
      ('vignesh', [15,"FAIL"]),
      ('khaula', [99,"PASS"]),
      ('neethu', [100,"PASS"]),
      ('sai',[ 37,"FAIL"]),
  ])

  student_marks = (({
      'place': student_pairs, 'result': student_result
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))

('vignesh', {'place': ['bangalore'], 'result': [[15, 'FAIL']]})
('khaula', {'place': ['hyderabad'], 'result': [[99, 'PASS']]})
('neethu', {'place': ['malapur'], 'result': [[100, 'PASS']]})
('sai', {'place': ['chennai'], 'result': [[37, 'FAIL']]})


In [47]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | "Student_pairs">>beam.Create([
      ('vignesh', 15),
      ('khaula', 99),
      ('neethu', 100),
      ('sai', 37),
  ])

  student_result = pipeline  | "Results" >> beam.Create([
      ('vignesh', "FAIL"),
      ('khaula',"PASS"),
      ('neethu',"PASS"),
      ('sai', "FAIL"),
  ])

  students = (({
      'Marks': student_pairs, 'Result': student_result
  })
  | 'Merge' >> beam.CoGroupByKey()
  | beam.Map(print))

('vignesh', {'Marks': [15], 'Result': ['FAIL']})
('khaula', {'Marks': [99], 'Result': ['PASS']})
('neethu', {'Marks': [100], 'Result': ['PASS']})
('sai', {'Marks': [37], 'Result': ['FAIL']})



GroupIntoBatches:

  Batches the input into desired batch size.



In [48]:
with beam.Pipeline() as pipeline:
  batches_with_keys = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('spring', '🍆'),
          ('spring', '🍅'),
          ('summer', '🥕'),
          ('summer', '🍅'),
          ('summer', '🌽'),
          ('fall', '🥕'),
          ('fall', '🍅'),
          ('winter', '🍆'),
      ])
      | 'Group into batches' >> beam.GroupIntoBatches(4)  #3, #2
      | beam.Map(print))

('spring', ['🍓', '🥕', '🍆', '🍅'])
('summer', ['🥕', '🍅', '🌽'])
('fall', ['🥕', '🍅'])
('winter', ['🍆'])


Flatten and Partition Transform in Apache Beam


Partition:
●	Partition is a Beam transform for PCollection objects that store the same data type. It splits a single PCollection into a fixed number of smaller collections.
●	Partition divides the elements of a PCollection according to a partitioning function that you provide.
●	The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection.
●	The number of partitions must be determined at graph construction time.
●	Partition accepts a function that receives the number of partitions, and returns the index of the desired partition for the element. The number of partitions passed must be a positive integer, and it must return an integer in the range 0 to num_partitions-1.

Flatten:
●	Flatten is a Beam transform for PCollection objects that store the same data type.
●	Merges multiple PCollection objects into a single logical PCollection.

Resources:
●	https://beam.apache.org/documentation/programming-guide/#flatten
●	https://beam.apache.org/documentation/programming-guide/#partition
o	https://beam.apache.org/documentation/transforms/python/elementwise/partition/



Flatten:

• Flatten is a Beam transform for PCollection objects that store the same data type. Flatten merges multiple PCollection objects into a single logical PCollection.

• Kind of Union operation


In [49]:
with beam.Pipeline() as pipeline:
  even_data = (pipeline
               |"Create even data" >> beam.Create({2,4,6,8,10}))
  odd_data = (pipeline
              |"Create odd data" >> beam.Create({1,3,5,7,9,11}))

  result = ((even_data, odd_data) | beam.Flatten()) | beam.Map(print)

1
3
5
7
9
11
2
4
6
8
10



Partition:

• Partition is a Beam transform for PCollection objects that store the same data type. It splits a single PCollection into a fixed number of smaller collections.

• Partition divides the elements of a PCollection according to a partitioning function that you provide.

• The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection.

• The number of partitions must be determined at graph construction time.

• Partition accepts a function that receives the number of partitions, and returns the index of the desired partition for the element. The number of partitions passed must be a positive integer, and it must return an integer in the range 0 to num_partitions-1.


In [50]:
import apache_beam as beam

p = beam.Pipeline()
number = {11,12,13,44,55,61,77,88,99}

def partition_fn(element,num_partition):
  return 0 if element%2 ==0 else 1


number_pc = p| beam.Create(number)| beam.Partition(partition_fn,2)

# number_pc[0]| 'Printing even numbers partition' >> beam.Map(print)
number_pc[1]| 'Printing odd numbers partition' >> beam.Map(print)

p.run()

99
11
13
77
55
61


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc46dae0160>

Latest, Max, Min Sample, Sum and Top Transform in Apache Beam


Latest:
●	Gets the element with the latest timestamp.
Max:
●	Gets the element with the maximum value within each aggregation.
Min:
●	Gets the element with the minimum value within each aggregation.
Mean:
●	Transforms for computing the arithmetic mean of the elements in a collection, or the mean of the values associated with each key in a collection of key-value pairs.
Sample:
●	Transforms for taking samples of the elements in a collection, or samples of the values associated with each key in a collection of key-value pairs.
Sum:
●	Sums all the elements within each aggregation.
Top:
●	Transforms for finding the largest (or smallest) set of elements in a collection, or the largest (or smallest) set of values associated with each key in a collection of key-value pairs.
Resources:
●	https://beam.apache.org/documentation/transforms/python/aggregation/latest/
●	https://beam.apache.org/documentation/transforms/python/aggregation/max/
●	https://beam.apache.org/documentation/transforms/python/aggregation/min/
●	https://beam.apache.org/documentation/transforms/python/aggregation/mean/
●	https://beam.apache.org/documentation/transforms/python/aggregation/sample/
●	https://beam.apache.org/documentation/transforms/python/aggregation/sum/
●	https://beam.apache.org/documentation/transforms/python/aggregation/top/


In [51]:
import time


Latest:

  Gets the element with the latest timestamp.
  we create a pipeline with a PCollection of produce with a timestamp for their harvest date. We use Latest to get the element with the latest timestamp from the PCollection.



In [52]:

def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
  return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
  latest_element = (
      pipeline
      | 'Create crops' >> beam.Create([
          {
              'item': '🥬', 'harvest': '2020-02-24 00:00:00'
          },
          {
              'item': '🍓', 'harvest': '2020-06-16 00:00:00'
          },
          {
              'item': '🥕', 'harvest': '2020-07-17 00:00:00'
          },
          {
              'item': '🍆', 'harvest': '2020-10-26 00:00:00'
          },
          {
              'item': '🍅', 'harvest': '2020-10-01 00:00:00'
          },
      ])
      | 'With timestamps' >> beam.Map(
          lambda crop: beam.window.TimestampedValue(
              crop['item'], to_unix_time(crop['harvest'])))
      | 'Get latest element' >> beam.combiners.Latest.Globally()
      | beam.Map(print))

🍆


In [53]:
def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
  return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
  latest_elements_per_key = (
      pipeline
      | 'Create crops' >> beam.Create([
          ('spring', {
              'item': '🥕', 'harvest': '2020-06-28 00:00:00'
          }),
          ('spring', {
              'item': '🍓', 'harvest': '2020-06-16 00:00:00'
          }),
          ('summer', {
              'item': '🥕', 'harvest': '2020-07-17 00:00:00'
          }),
          ('summer', {
              'item': '🍓', 'harvest': '2020-08-26 00:00:00'
          }),
          ('summer', {
              'item': '🍆', 'harvest': '2020-09-04 00:00:00'
          }),
          ('summer', {
              'item': '🥬', 'harvest': '2020-09-18 00:00:00'
          }),
          ('summer', {
              'item': '🍅', 'harvest': '2020-09-22 00:00:00'
          }),
          ('autumn', {
              'item': '🍅', 'harvest': '2020-10-01 00:00:00'
          }),
          ('autumn', {
              'item': '🥬', 'harvest': '2020-10-20 00:00:00'
          }),
          ('autumn', {
              'item': '🍆', 'harvest': '2020-10-26 00:00:00'
          }),
          ('winter', {
              'item': '🥬', 'harvest': '2020-02-24 00:00:00'
          }),
      ])
      | 'With timestamps' >> beam.Map(
          lambda pair: beam.window.TimestampedValue(
              (pair[0], pair[1]['item']), to_unix_time(pair[1]['harvest'])))
      | 'Get latest elements per key' >> beam.combiners.Latest.PerKey()
      | beam.Map(print))

('spring', '🥕')
('summer', '🍅')
('autumn', '🍆')
('winter', '🥬')



Max:

  Gets the element with the maximum value within each aggregation.
  we create a pipeline with a PCollection. Then, we get the element with the maximum value in different ways.



In [54]:
with beam.Pipeline() as pipeline:
  max_element = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      # | 'Get max value' >> beam.CombineGlobally(lambda elements: max(elements or [None]))
      | 'Get max value' >> beam.CombineGlobally(max)
      | beam.Map(print))

4


In [55]:
#when PCollection is key-value pair
with beam.Pipeline() as pipeline:
  elements_with_max_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get max value per key' >> beam.CombinePerKey(max)
      | beam.Map(print))


('🥕', 3)
('🍆', 1)
('🍅', 5)



Min:

  Gets the element with the minimum value within each aggregation.
  we create a pipeline with a PCollection. Then, we get the element with the minimum value in different ways.



In [56]:
with beam.Pipeline() as pipeline:
  min_element = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Get min value' >>
      beam.CombineGlobally(lambda elements: min(elements or [-1]))
      | beam.Map(print))


1


In [57]:
with beam.Pipeline() as pipeline:
  elements_with_min_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get min value per key' >> beam.CombinePerKey(min)
      | beam.Map(print))


('🥕', 2)
('🍆', 1)
('🍅', 3)



Mean:

  Transforms for computing the arithmetic mean of the elements in a collection, or the mean of the values associated with each key in a collection of key-value pairs.
  we create a pipeline with a PCollection. Then, we get the element with the average value in different ways.



In [58]:
#Global mean value
with beam.Pipeline() as pipeline:
  mean_element = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Get mean value' >> beam.combiners.Mean.Globally()
      | beam.Map(print))


2.5


In [59]:
#mean value based on key-value pair
with beam.Pipeline() as pipeline:
  elements_with_mean_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get mean value per key' >> beam.combiners.Mean.PerKey()
      | beam.Map(print))

('🥕', 2.5)
('🍆', 1.0)
('🍅', 4.0)



Sample:

  Transforms for taking samples of the elements in a collection, or samples of the values associated with each key in a collection of key-value pairs.
  we create a pipeline with a PCollection. Then, we get a random sample of elements in different ways.

Sample.FixedSizeGlobally() to get a fixed-size random sample of elements from the entire PCollection.


In [60]:
with beam.Pipeline() as pipeline:
  sample = (
      pipeline
      | 'Create produce' >> beam.Create([
          '🍓 Strawberry',
          '🥕 Carrot',
          '🍆 Eggplant',
          '🍅 Tomato',
          '🥔 Potato',
      ])
      | 'Sample N elements' >> beam.combiners.Sample.FixedSizeGlobally(2)
      | beam.Map(print))

['🍓 Strawberry', '🍆 Eggplant']


In [61]:
with beam.Pipeline() as pipeline:
  samples_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('spring', '🍆'),
          ('spring', '🍅'),
          ('summer', '🥕'),
          ('summer', '🍅'),
          ('summer', '🌽'),
          ('fall', '🥕'),
          ('fall', '🍅'),
          ('winter', '🍆'),
      ])
      | 'Samples per key' >> beam.combiners.Sample.FixedSizePerKey(3)
      | beam.Map(print))

('spring', ['🍆', '🍅', '🍓'])
('summer', ['🌽', '🥕', '🍅'])
('fall', ['🍅', '🥕'])
('winter', ['🍆'])



Sum:

  Sums all the elements within each aggregation.
  we create a pipeline with a PCollection. Then, we get the sum of all the element values in different ways.

Combine.PerKey() to get the sum of all the element values for each unique key in a PCollection of key-values.


In [62]:
with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Sum values' >> beam.CombineGlobally(sum)
      | beam.Map(print))


10


In [63]:
with beam.Pipeline() as pipeline:
  totals_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Sum values per key' >> beam.CombinePerKey(sum)
      | beam.Map(print))


('🥕', 5)
('🍆', 1)
('🍅', 12)



Top:

  Transforms for finding the largest (or smallest) set of elements in a collection, or the largest (or smallest) set of values associated with each key in a collection of key-value pairs.
  
  we create a pipeline with a PCollection. Then, we get the largest or smallest elements in different ways.



In [64]:
with beam.Pipeline() as pipeline:
  largest_elements = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Largest N values' >> beam.combiners.Top.Largest(2)
      | beam.Map(print))


[4, 3]


In [65]:
with beam.Pipeline() as pipeline:
  largest_elements = (
      pipeline
      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Largest N values' >> beam.combiners.Top.Smallest(2)
      | beam.Map(print))

[1, 2]


In [66]:
with beam.Pipeline() as pipeline:
  largest_elements_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Largest N values per key' >> beam.combiners.Top.LargestPerKey(2)
      | beam.Map(print))

('🥕', [3, 2])
('🍆', [1])
('🍅', [5, 4])


In [67]:
with beam.Pipeline() as pipeline:
  largest_elements_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Largest N values per key' >> beam.combiners.Top.SmallestPerKey(2)
      | beam.Map(print))

('🥕', [2, 3])
('🍆', [1])
('🍅', [3, 4])


Side Inputs:

•	A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection.

•	In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs.

•	Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded).

•	Side inputs must be small in size and not as big as pcollection because it has to be kept in memory of each worker

•	Such values might be determined by the input data, or depend on a different branch of your pipeline.

Additional outputs:  

•	While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections.

•	If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) bundled together.

Resources:  
•	https://beam.apache.org/documentation/patterns/side-inputs/

•	https://beam.apache.org/documentation/programming-guide/#additional-outputs

•	https://beam.apache.org/documentation/programming-guide/#side-inputs  



Side Inputs:

• A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection.

• In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs.


In [70]:
p1 = beam.Pipeline()

input_list = list()
with open ('students_exclude.txt','r') as exclude_file:
  for stud_id in exclude_file:
    input_list.append(stud_id.rstrip())

print(input_list)

['1', '3', '7', '9']


In [75]:
class SplitRow(beam.DoFn):
  def process(self,element,input_list):
    customer = element.split(',')
    if customer[0] not in input_list:
      return [customer]

customers = (
    p1
    |beam.io.ReadFromText('Students_age.txt')
    |beam.ParDo(SplitRow(),input_list)  #can pass any number of side inputs in this ParDo function
    # |beam.Map(print)
    |beam.io.WriteToText('data/output')

)
p1.run()



['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']
['10', 'sai', 'chn', '29']




data/output-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc465a31390>

In [76]:
!cat "/content/data/output-00000-of-00001"

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']
['10', 'sai', 'chn', '29']


In [77]:
!{('head -n 10 data/output-00000-of-00001')}

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']
['10', 'sai', 'chn', '29']


In [78]:
p1 = beam.Pipeline()

side_list = list()
with open ('students_exclude.txt','r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())

print(side_list)

class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

class ProcessCustomers(beam.DoFn):
  def process(self,element,country,start_char):
    if(element[2]==country):
      yield  element
    else:
      yield  beam.pvalue.TaggedOutput('Other_student',element)
    if(element[1].startswith('r')):
       yield  beam.pvalue.TaggedOutput('Names_r',element)



customers = (
    p1
    |beam.io.ReadFromText('Students_age.txt')
    |beam.ParDo(SplitRow(),side_list)
    |beam.ParDo(ProcessCustomers(),'chn','r').with_outputs('Names_r','Other_student',main='Chennai_Cust')
)

chennai_customers = customers.Chennai_Cust
other_cities_customers = customers.Other_student
customer_withname_r = customers.Names_r

chennai_customers | 'Write Chennai Students PCollection' >> beam.io.WriteToText("chennai")
other_cities_customers  | 'Write Students PCollection that lives in other cities' >> beam.io.WriteToText("students_other_cities")
customer_withname_r  | 'Write Students names with r PCollection' >> beam.io.WriteToText("customers_names_r")


p1.run()

['1', '3', '7', '9']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7cc465a191e0>

In [79]:
! cat chennai-00000-of-00001

['10', 'sai', 'chn', '29']


In [80]:

!cat students_other_cities-00000-of-00001

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']


In [81]:
!cat customers_names_r-00000-of-00001

['6', 'ross', 'la', '60']


Composite Transformation in Apache Beam:

Composite Transform:

•	Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms.

•	Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.
Creating a composite transform:

•	To create your own composite transform, create a subclass of the PTransform class and override the expand method to specify the actual processing logic.

•	The transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.

•	The following code sample shows how to declare a PTransform that accepts a PCollection of Strings for input, and outputs a PCollection of Integers:

•	The expand method is where you add the processing logic for the PTransform. Your override of expand must accept the appropriate type of input PCollection as a parameter, and specify the output PCollection as the return value.

•	You can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.

•	Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.

Resources:

•	https://beam.apache.org/documentation/programming-guide/#composite-transforms

•	https://beam.apache.org/releases/pydoc/2.36.0/apache_beam.transforms.html


In [82]:
import apache_beam as beam

def SplitRow(element):
  return element.split(',')


def filter_on_count(element):
  name, count = element
  if count > 30:
    return element

def format_output(element):
  name, count = element
  return (name.encode('ascii'),str(count),'Experienced employee')


Using normal transformation like

    Map
    Filter
    CombinePerKey

For three different operations and it includes more code, memory space and time as well.


In [86]:
with beam.Pipeline() as p:
  input_data = (p
                      | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

  accounts_count = (
                      input_data
                      | 'Get all Accounts dept persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
                      | 'Pair each accounts employee with 1 accounts' >> beam.Map(lambda record: ("Accounts, " +record[1], 1))
                      | 'Group and sum1 accounts' >> beam.CombinePerKey(sum)
                      | 'count filter accounts' >> beam.Filter(filter_on_count)
                      | 'Regular accounts employee' >> beam.Map(format_output)
                      | 'Write results for account' >> beam.io.WriteToText('data/Account_quick')
                 )

  hr_count = (
                      input_data
                      | 'Get all HR  dept persons' >> beam.Filter(lambda record: record[3] == 'HR')
                      | 'Pair each HR employee with 1 hr' >> beam.Map(lambda record: ("HR, " +record[1], 1))
                      | 'Group and sum1 hr' >> beam.CombinePerKey(sum)
                      | 'count filter hr' >> beam.Filter(filter_on_count)
                      | 'Regular accounts employee hr' >> beam.Map(format_output)
                      | 'Write results for HR ' >> beam.io.WriteToText('data/HR_quick')
                 )

  finance_count = (
                  input_data
                  | 'Get all finance dept persons' >> beam.Filter(lambda record: record[3] == 'Finance')
                  | 'Pair each finance employee with 1 finance' >> beam.Map(lambda record: ("Finance, " +record[1], 1))
                  | 'Group and sum1 finance' >> beam.CombinePerKey(sum)
                  | 'count filter finance' >> beam.Filter(filter_on_count)
                  | 'Regular accounts employee1 finance' >> beam.Map(format_output)
                  | 'Write results for finance' >> beam.io.WriteToText('data/Finance_quick')
                   )

In [87]:

!{('head -n 10 data/Account_quick-00000-of-00001')}

(b'Accounts, Marco', '31', 'Experienced employee')
(b'Accounts, Rebekah', '31', 'Experienced employee')
(b'Accounts, Itoe', '31', 'Experienced employee')
(b'Accounts, Edouard', '31', 'Experienced employee')
(b'Accounts, Kyle', '62', 'Experienced employee')
(b'Accounts, Kumiko', '31', 'Experienced employee')
(b'Accounts, Gaston', '31', 'Experienced employee')


In [88]:
!{('head -n 10 data/Finance_quick-00000-of-00001')}

(b'Finance, Kumiko', '31', 'Experienced employee')
(b'Finance, Wendy', '31', 'Experienced employee')
(b'Finance, Cristobal', '31', 'Experienced employee')
(b'Finance, Erika', '31', 'Experienced employee')
(b'Finance, Sebastien', '31', 'Experienced employee')
(b'Finance, Valerie', '31', 'Experienced employee')
(b'Finance, Dolly', '31', 'Experienced employee')
(b'Finance, Emily', '31', 'Experienced employee')
(b'Finance, Kaori', '31', 'Experienced employee')
(b'Finance, Hitomi', '31', 'Experienced employee')



Composite Transforms:

  Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms.



In [89]:


class MyTransform(beam.PTransform):

  def expand(self, input_coll):

    a = (
        input_coll
                       | 'Group and sum1' >> beam.CombinePerKey(sum)
                       | 'count filter accounts' >> beam.Filter(filter_on_count)
                       | 'Regular accounts employee' >> beam.Map(format_output)

    )
    return a


with beam.Pipeline() as p:
  input_data = (p
                      | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

  accounts_count = (
                      input_data
                      | 'Get all Accounts dept persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
                      | 'Pair each accounts employee with 1' >> beam.Map(lambda record: ("Accounts, " +record[1], 1))
                      | 'composite accounts' >> MyTransform()
                      | 'Write results for account' >> beam.io.WriteToText('data/Account')
                 )

  finance_count = (
                  input_data
                  | 'Get all Finance dept persons' >> beam.Filter(lambda record: record[3] == 'Finance')
                  | 'Pair each Finance employee with 1' >> beam.Map(lambda record: ("Finance, " +record[1], 1))
                  | 'composite Finance' >> MyTransform()
                  | 'Write results for Finance' >> beam.io.WriteToText('data/Finance')
            )



In [90]:
!{('head -n 10 data/Account-00000-of-00001')}

(b'Accounts, Marco', '31', 'Experienced employee')
(b'Accounts, Rebekah', '31', 'Experienced employee')
(b'Accounts, Itoe', '31', 'Experienced employee')
(b'Accounts, Edouard', '31', 'Experienced employee')
(b'Accounts, Kyle', '62', 'Experienced employee')
(b'Accounts, Kumiko', '31', 'Experienced employee')
(b'Accounts, Gaston', '31', 'Experienced employee')


In [91]:
!{('head -n 10 data/Finance-00000-of-00001')}

(b'Finance, Kumiko', '31', 'Experienced employee')
(b'Finance, Wendy', '31', 'Experienced employee')
(b'Finance, Cristobal', '31', 'Experienced employee')
(b'Finance, Erika', '31', 'Experienced employee')
(b'Finance, Sebastien', '31', 'Experienced employee')
(b'Finance, Valerie', '31', 'Experienced employee')
(b'Finance, Dolly', '31', 'Experienced employee')
(b'Finance, Emily', '31', 'Experienced employee')
(b'Finance, Kaori', '31', 'Experienced employee')
(b'Finance, Hitomi', '31', 'Experienced employee')


Combine Core Transform in Apache Beam:

Combine:

●	Combine is a Beam transform for combining collections of elements or values in your data.

●	Combine has variants that work on entire PCollections, and some that combine the values for each key in PCollections of key/value pairs.

●	When you apply a Combine transform, you must provide the function that contains the logic for combining the elements or values.

●	The combining function should be commutative and associative.

●	The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.

●	complex combination operations might require you to create a subclass of CombineFn that has an accumulation type distinct from the input/output type.
Advanced combinations using CombineFn:

o	A general combining operation consists of four operations. When you create a subclass of CombineFn, you must provide four operations by overriding the corresponding methods:

o	Create Accumulator - creates a new “local” accumulator

o	Add Input - adds an input element to an accumulator, returning the accumulator value.

o	Merge Accumulators - merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation.

o	Extract Output - performs the final computation.
Three types of Aggregator function is supported by beam. They are.

CombineGlobally:  Combines all elements in a collection.

CombinePerKey: Combines all elements for each key in a collection.

CombineValues:  Combines an iterable of values in a keyed collection of elements.




Resources:

●	https://beam.apache.org/documentation/programming-guide/#combine

o	https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/

o	https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/

o	https://beam.apache.org/documentation/transforms/python/aggregation/combinevalues/



CombineGlobally:

  Combines all elements in a collection.

The more general way to combine elements, and the most flexible, is with a class that inherits from CombineFn.

  CombineFn.create_accumulator(): This creates an empty accumulator. For example, an empty accumulator for a sum would be 0, while an empty accumulator for a product (multiplication) would be 1.

  CombineFn.add_input(): Called once per element. Takes an accumulator and an input element, combines them and returns the updated accumulator.

  CombineFn.merge_accumulators(): Multiple accumulators could be processed in parallel, so this function helps merging them into a single accumulator.

  CombineFn.extract_output(): It allows to do additional calculations before extracting a result.



In [93]:


class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')


with beam.Pipeline() as p:
  input_data = (p
                | "Create data" >> beam.Create([21,45,78,99,1,22,5])
                | "Combine Globally" >> beam.CombineGlobally(AverageFn())
                |"Write to Local">> beam.io.WriteToText('data/result'))



In [94]:
!{'head -n 10 data/result-00000-of-00001'}

38.714285714285715



CombinePerKey:

    Combines all elements for each key in a collection.



In [95]:
with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Sum' >> beam.CombinePerKey(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)


In [104]:
def saturated_sum(values):
  # print("values", values)
  # max_value = 8
  # return min(sum(values), max_value)
  # return max(values)
  # return min(values)
  return sum(values)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)



CombineValues:

  Combines an iterable of values in a keyed collection of elements.

  CombineValues accepts a function that takes an iterable of elements as an input, and combines them to return a single element.

  CombineValues expects a keyed PCollection of elements, where the value is an iterable of elements to be combined.



In [106]:
with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Sum' >> beam.CombineValues(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)
