**Install Apache Beam SDK**

In [None]:
%pip install -q apache-beam[interactive] --no-warn-conflicts

[K     |████████████████████████████████| 10.1 MB 23.7 MB/s 
[K     |████████████████████████████████| 63 kB 1.4 MB/s 
[K     |████████████████████████████████| 151 kB 49.3 MB/s 
[K     |████████████████████████████████| 245 kB 41.2 MB/s 
[K     |████████████████████████████████| 45 kB 2.8 MB/s 
[K     |████████████████████████████████| 508 kB 55.9 MB/s 
[K     |████████████████████████████████| 2.3 MB 44.5 MB/s 
[K     |████████████████████████████████| 112 kB 69.3 MB/s 
[K     |████████████████████████████████| 121 kB 56.4 MB/s 
[K     |████████████████████████████████| 792 kB 57.0 MB/s 
[K     |████████████████████████████████| 380 kB 49.4 MB/s 
[K     |████████████████████████████████| 1.1 MB 39.7 MB/s 
[?25h  Building wheel for dill (setup.py) ... [?25l[?25hdone
  Building wheel for timeloop (setup.py) ... [?25l[?25hdone


# **IO in Apache Beam**

### [**Working with Text Files**](https://)




In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.io.WriteToText('text-output',file_name_suffix='.csv')
)
p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f6ed8d8ad10>

In [None]:
! cat text-output-00000-of-00001.csv

1,John,NY,22
2,Jim,LA,25
3,Mary,NY,30
4,Albert,LA,20
5,Samza,NY,18
6,Maria,NY,15
7,Shreya,NY,30
8,Kavita,LA,20
9,Mona,NY,18
10,Nandita,NY,15


### [***Adding Map PTransform***](https://)

Map Transformation is applied on every element of the input dataset and produces exactly one element per input.

Let's add Map transformation 
 

1.   ***Get only customer name and country***
2.   ***Change name of customers to upper case***

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def getUpper(customers):
  c = customers.split(",")
  return c[1].upper(),c[2]

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(lambda c: (c.split(",")[1].upper(),c.split(",")[2]))
    |beam.io.WriteToText('map-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeb155d50>

**Let's check the output**

In [None]:
! cat map-output-00000-of-00001

('JOHN', 'NY')
('JIM', 'LA')
('MARY', 'NY')
('ALBERT', 'LA')
('SAMZA.NY', '18')
('MARIA', 'NY')
('SHREYA', 'NY')
('KAVITA', 'LA')
('MONA', 'NY')
('NANDITA', 'NY')


### [***Adding Filter PTransform***](https://)

Filter Transformation is applied on every element to filter out only those records that matches the condition

Let's add Filter transformation in above pipeline 
 

1.   ***Get only those customer whose country is NY***

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def getUpper(customers):
  c = customers.split(",")
  return c[1].upper(),c[2]

def filter_country(customers):
  return customers[1]=='NY'

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(getUpper)
    |beam.Filter(filter_country)
    |beam.io.WriteToText('filter-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eea392690>

***Let's check the output for above pipeline***

In [None]:
! cat filter-output-00000-of-00001

('JOHN', 'NY')
('MARY', 'NY')
('MARIA', 'NY')
('SHREYA', 'NY')
('MONA', 'NY')
('NANDITA', 'NY')


## [***Working with Schema***](https://)

**Let's now add schema to customers**

In [None]:
import apache_beam as beam
import typing

class Customer(typing.NamedTuple):
  id: int
  name: str
  country: str

p1 = beam.Pipeline()

def convert(customers):
   c = customers.split(",")
   return beam.Row(id=int(c[0]), name=c[1],country=c[2])
   
def getUpper(customer):
  return beam.Row(id=customer.id, name=customer.name.upper(),country=customer.country)

def filter_country(customer):
  return customer.country=='NY'


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(convert).with_output_types(Customer)
    |beam.Map(getUpper).with_output_types(Customer)
    |beam.Filter(filter_country)
    |beam.io.WriteToText('schema-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeaea4590>

***Let's check the output of above pipeline***

In [None]:
! cat schema-output-00000-of-00001

Row(id=1, name='JOHN', country='NY')
Row(id=3, name='MARY', country='NY')
Row(id=6, name='MARIA', country='NY')
Row(id=7, name='SHREYA', country='NY')
Row(id=9, name='MONA', country='NY')
Row(id=10, name='NANDITA', country='NY')


**Did you noticed above result is in Row, Let's format the output in text**

In [None]:
import apache_beam as beam
import typing

class Customer(typing.NamedTuple):
  id: int
  name: str
  country: str

p1 = beam.Pipeline()

def convert(customers):
   c = customers.split(",")
   return beam.Row(id=int(c[0]), name=c[1],country=c[2])
   
def getUpper(customer):
  return beam.Row(id=customer.id, name=customer.name.upper(),country=customer.country)

def format(customer):
  c1 = customer.name
  c2 = customer.country
  return c1 + " lives in " + c2


def filter_country(customer):
  return customer.country=='NY'


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(convert).with_output_types(Customer)
    |beam.Map(getUpper).with_output_types(Customer)
    |beam.Filter(filter_country)
    |beam.Map(format)
    |beam.io.WriteToText('schema-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeb537450>

In [None]:
! cat schema-output-00000-of-00001

JOHN lives in NY
MARY lives in NY
MARIA lives in NY
SHREYA lives in NY
MONA lives in NY
NANDITA lives in NY


## **[PCollections from In memory](https://)**

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  country = (
      pipeline
      | beam.Create(['India','United States','China','Russia'])
      | beam.Map(print)
      )



India
United States
China
Russia


## **[Working with Parquet Files](https://)**

class apache_beam.io.parquetio.***ReadFromParquetBatched***(file_pattern=None, min_bundle_size=0, validate=True, columns=None)

Parameters:	
* ***file_pattern (str)*** – the file glob to read
* ***min_bundle_size (int)*** – the minimum size in bytes, to be considered when splitting the input into bundles.
* ***validate (bool) ***– flag to verify that the files exist during the pipeline creation time.
* ***columns (List[str])*** – list of columns that will be read from files.

**This Apache Beam program reads a parquet file episodes.parquet**

*  Filter out all items whose title starts with 'The'
*  Output 'Title' in upper case followed by 'Doctor'
* Output is saved in Text format





***First, let's see what is inside episodes.parquet***

In [None]:
import pandas as pd
pd.read_parquet('episodes.parquet', engine='pyarrow')

Unnamed: 0,title,air_date,doctor
0,The Eleventh Hour,3 April 2010,11
1,The Doctor's Wife,14 May 2011,11
2,Horror of Fang Rock,3 September 1977,4
3,The Mysterious Planet,6 September 1986,6
4,Rose,26 March 2005,9
5,Castrolava,4 January 1982,5


***Let's write the pipeline to***

* Let's only read Title and doctor columns
* Filter out all items whose title starts with 'The'
*  Output 'Title' in upper case followed by 'Doctor'
* Output is saved in Text format


In [None]:
import apache_beam as beam

def getTitle(episodes):
  return episodes['title'].startswith('The')

def formatOutput(episodes):
  return episodes['title'].upper(),episodes['doctor']

p1 = beam.Pipeline()
customers = (
    p1
    |beam.io.ReadFromParquet(file_pattern='episodes.parquet',columns=['doctor','title'])
    |beam.Filter(getTitle)
    |beam.Map(formatOutput)
    |beam.io.WriteToText('parquet-output')
)
p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeaddc590>

***Let's check the results of above pipeline***

In [None]:
! cat parquet-output-00000-of-00001

('THE ELEVENTH HOUR', 11)
("THE DOCTOR'S WIFE", 11)
('THE MYSTERIOUS PLANET', 6)


***Now, this let's save the results in Parquet file format***

To write the output in Parquet format, we have to define pyarrow schema

Also the output should be made as dictionary

class apache_beam.io.parquetio.***WriteToParquet***(
* file_path_prefix,
* schema, 
* row_group_buffer_size=67108864, 
* record_batch_size=1000, 
* codec='none', 
* use_deprecated_int96_timestamps=False, 
* file_name_suffix='', 
* num_shards=0, 
* shard_name_template=None, 
* mime_type='application/x-parquet')

In [None]:
import apache_beam as beam
import pyarrow

def getTitle(episodes):
  return episodes['title'].startswith('The')

def formatOutput(episodes):
  return {'title': episodes['title'].upper(), 'doctor': episodes['doctor']}

schema =  pyarrow.schema(
          [('title', pyarrow.binary()), ('doctor', pyarrow.int64())]
      )

p1 = beam.Pipeline()
customers = (
    p1
    |beam.io.ReadFromParquet('episodes.parquet')
    |beam.Filter(getTitle)
    |beam.Map(formatOutput)
    |'Write' >> beam.io.WriteToParquet('myoutput',schema,codec='snappy')
)
p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eead46c50>

***Let's check the results of above pipeline***

In [None]:
import pandas as pd
pd.read_parquet('myoutput-00000-of-00001', engine='pyarrow')

Unnamed: 0,title,doctor
0,b'THE ELEVENTH HOUR',11
1,"b""THE DOCTOR'S WIFE""",11
2,b'THE MYSTERIOUS PLANET',6


# Core Transforms

## [***FlatMap***](https://)

This process every input element  and outputs a list of zero or more elements.

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def getUpper(customers):
  c = customers.split(",")
  return c[1].upper(),c[2]

customers = (
    p1
    |beam.io.ReadFromText('Peter_Piper.txt')
    |beam.FlatMap(lambda c: c.split(","))
    |beam.io.WriteToText('flatmap-output')
)

p1.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f0a5a364ed0>

**Running the above code using Map Transform**

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def getUpper(customers):
  c = customers.split(",")
  return c[1].upper(),c[2]

customers = (
    p1
    |beam.io.ReadFromText('Peter_Piper.txt')
    |beam.Map(lambda c: c.split(","))
    |beam.io.WriteToText('map-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f0a5a056e90>

**Let's see the difference in output**

In [None]:
! cat /content/map-output-00000-of-00001

['peter piper picked a peck of pickled pepper']
['a peck of pickled pepper peter piper picked']
['if peter piper picked a peck of pickled pepper']
["where's the peck of pickled pepper peter piper picked"]


In [None]:
cat /content/flatmap-output-00000-of-00001

peter piper picked a peck of pickled pepper
a peck of pickled pepper peter piper picked
if peter piper picked a peck of pickled pepper
where's the peck of pickled pepper peter piper picked


## **[Partition Transform](https://)**

Separates elements in a collection into multiple output collections. The partitioning function contains the logic that determines how to separate the elements of the input collection into each resulting partition output collection.

In [None]:
import apache_beam as beam

p = beam.Pipeline()
number = {1,2,3,4,5,6,7,8}

def partition_fn(element,num_partition):
  return 0 if element%2 ==0 else 1


number_pc = p| beam.Create(number)| beam.Partition(partition_fn,2)

print("Printing First Partition")
number_pc[0]| 'Printing first partition' >> beam.Map(print)

p.run()



Printing First Partition
2
4
6
8


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f0a5a0ec690>

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def partition_fn(element,num_partition):
  return 0 if element[2] =='NY' else 1

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(lambda c: (c.split(",")))
    |beam.Partition(partition_fn,2)
)

customers[0]| 'Printing  partition with Newyork' >> beam.Map(print)

p1.run()



['1', 'John', 'NY', '22']
['3', 'Mary', 'NY', '30']
['5', 'Samza', 'NY', '18']
['6', 'Maria', 'NY', '15']
['7', 'Shreya', 'NY', '30']
['9', 'Mona', 'NY', '18']
['10', 'Nandita', 'NY', '15']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f0a59c4b7d0>

In [None]:
! cat /content/partition-output-00000-of-00001

<apache_beam.pvalue._InvalidUnpickledPCollection object at 0x7f0a59b9f610>
<apache_beam.pvalue._InvalidUnpickledPCollection object at 0x7f0a59b9f110>


## **[ParDo Transform](https://)**

*ParDo is a Beam transform for generic parallel processing.It considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output.*

ParDo is useful for a variety of common data processing operations, including:

* ***Filtering a data set***. 
You can use ParDo to consider each element in a PCollection and either output that element to a new collection or discard it.

* ***Formatting or type-converting each element in a data set.*** 
If your input PCollection contains elements that are of a different type or format than you want, you can use ParDo to perform a conversion on each element and output the result to a new PCollection.

* ***Extracting parts of each element in a data set.*** 
If you have a PCollection of records with multiple fields, for example, you can use a ParDo to parse out just the fields you want to consider into a new PCollection.

* ***Performing computations on each element in a data set.*** 
You can use ParDo to perform simple or complex computations on every element, or certain elements, of a PCollection and output the results as a new PCollection.


***In the below pipeline, we are using simple Map and Filter Transforms ***

* To get all customers fromm newyork whose age is greater than 20

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def filter_country(customers):
  return customers[1]=='NY'

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(lambda c: (c.split(",")))
    |beam.Filter(filter_country)
    |beam.io.WriteToText('filter-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0bf466a90>

In [None]:
! cat /content/filter-output-00000-of-00001

['1', 'John', 'NY', '22']
['3', 'Mary', 'NY', '30']
['7', 'Shreya', 'NY', '30']


***Let's change the above Map and Filter Transforms into Pardo***

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

# The DoFn to perform on each element in the input PCollection.

class SplitRow(beam.DoFn):
  def process(self,element):
    return [element.split(',')]

class FilterCustomer(beam.DoFn):
  def process(self,element):
    if element[2]=='NY' and int(element[3])>20:
      return [element]


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.ParDo(SplitRow())
    |beam.ParDo(FilterCustomer())
    |beam.io.WriteToText('pardo-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0ba79fd90>

In [None]:
! cat /content/pardo-output-00000-of-00001

['1', 'John', 'NY', '22']
['3', 'Mary', 'NY', '30']
['7', 'Shreya', 'NY', '30']


### **DoFn LifeCycle**
You may use a DoFn just for a map function. For that, overwrite the process method and you are good to go.

However, there are many more methods that you can override to control the lifecycle of a DoFn: creation of the worker, start of a new bundle, end of a new bundle, deletion of the worker, and of course, process of every element.

The setup and teardown methods will be executed only once per worker.

The start_bundle and finish_bundle will be executed once by data bundle

In [None]:

import apache_beam as beam
from datetime import datetime


p1 = beam.Pipeline()

class sampleDoFn(beam.DoFn):
  def process(self, element):
    print("Processing element: %s" % element)
    yield element.upper()

  def start_bundle(self):
    print("Bundle started")

  def finish_bundle(self):
    print("Bundle finished")

  def setup(self):
    dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    print("Worker started %s" % dt_string)

  def teardown(self):
    dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")    
    print("Worker finished %s" % dt_string)


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.ParDo(sampleDoFn())
    |beam.io.WriteToText('pardo-output')
)

p1.run()



Worker started 14/02/2022 12:35:23
Bundle started
Processing element: 1,John,NY,22
Processing element: 2,Jim,LA,25
Processing element: 3,Mary,NY,30
Processing element: 4,Albert,LA,20
Processing element: 5,Samza,NY,18
Processing element: 6,Maria,NY,15
Processing element: 7,Shreya,NY,30
Processing element: 8,Kavita,LA,20
Processing element: 9,Mona,NY,18
Processing element: 10,Nandita,NY,15
Bundle finished
Worker finished 14/02/2022 12:35:24


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f6ed73b5ad0>

***Let's change the below code to ParDo ***

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

def getUpper(customers):
  c = customers.split(",")
  return c[1].upper(),c[2]

def filter_country(customers):
  return customers[1]=='NY'

customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.Map(getUpper)
    |beam.Filter(filter_country)
    |beam.io.WriteToText('filter-output')
)

p1.run()

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

# The DoFn to perform on each element in the input PCollection.

class FilterAndTransformCustomer(beam.DoFn):
  def process(self,element):
    element = element.split(",")
    if element[2]=='NY' and int(element[3])>20:
      element[1] = element[1].upper()
      return [element]


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.ParDo(FilterAndTransformCustomer())
    |beam.io.WriteToText('pardo-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0b9721850>

In [None]:
cat pardo-output-00000-of-00001

['1', 'JOHN', 'NY', '22']
['3', 'MARY', 'NY', '30']
['7', 'SHREYA', 'NY', '30']


### [**Side Inputs in ParDo**](https://)

*In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection.*

*Additional data can be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.*

A side input is nothing more nothing less than a PCollection that can be used as an additional input to ParDo transform. However, unlike normal (processed) PCollection, the side input is a global and immutable view of underlaid PCollection. It obviously means that it can't change after computation. It can be used every time when we need to join additional datasets to the processed one or broadcast some common values (e.g. a dictionary) to the processing functions.

**Passing static values as side inputs**

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

# The DoFn to perform on each element in the input PCollection.

class FilterAndTransformCustomer(beam.DoFn):
  def process(self,element,country,age):
    element = element.split(",")
    if element[2]==country and int(element[3])>age:
      element[1] = element[1].upper()
      return [element]


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.ParDo(FilterAndTransformCustomer(),"NY",20)
    |beam.io.WriteToText('pardo-output')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0ba04f250>

In [None]:
! cat pardo-output-00000-of-00001

['1', 'JOHN', 'NY', '22']
['3', 'MARY', 'NY', '30']
['7', 'SHREYA', 'NY', '30']


***Generating Side inputs at runtime from another pipeline***

In the below code
* First, we will generate mean average of ratings
* Second, we will pass that mean average as side input
* Finally, we will compare and filter out those movies whose rating is greater than mean average

In [None]:
import apache_beam as beam


class SplitRow(beam.DoFn):
  def process(self,element):
    return [element.split(',')]

class MoviesAboveAvgRating(beam.DoFn):
  def process(self,element,avg_rating):
      if float(element[2]) > avg_rating:
        return [element]

p1 = beam.Pipeline()

ratings = (p1 
           |'Read ratings ' >> beam.io.ReadFromText('ratings.csv')
           |'Split Rows' >> beam.ParDo(SplitRow()))

mean_rating = beam.pvalue.AsSingleton(
   ratings
    |'Extract ratings' >>  beam.Map(lambda rating: float(rating[2])) 
    |'Find mean ratings' >> beam.combiners.Mean.Globally())

above_average_ratings = (
    ratings 
    |beam.ParDo(MoviesAboveAvgRating(),mean_rating)
    |beam.io.WriteToText('movies-above-avg')
)



p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0b4b77cd0>

***Let's Send both Static and dyanmic values as Side Inputs***

In the below code
* First, we will generate mean average of ratings
* Second, we will pass that mean average and movie_id as side input
* Finally, we will compare and filter out those movies whose rating is greater than mean average

In [None]:
import apache_beam as beam


class SplitRow(beam.DoFn):
  def process(self,element):
    return [element.split(',')]

class MoviesAboveAvgRating(beam.DoFn):
  def process(self,element,avg_rating,movieId):
      if float(element[2]) > avg_rating and int(element[0])>100:
        return [element]

p1 = beam.Pipeline()

ratings = (p1 
           |'Read ratings ' >> beam.io.ReadFromText('ratings.csv')
           |'Split Rows' >> beam.ParDo(SplitRow()))

mean_rating = beam.pvalue.AsSingleton(
   ratings
    |'Extract ratings' >>  beam.Map(lambda rating: float(rating[2])) 
    |'Find mean ratings' >> beam.combiners.Mean.Globally())

above_average_ratings = (
    ratings 
    |beam.ParDo(MoviesAboveAvgRating(),mean_rating,100)
    |beam.io.WriteToText('movies-above-avg')
)



p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa0b42ac1d0>

### **[Side Outputs in ParDo](https://)**

In [None]:
import apache_beam as beam
p1 = beam.Pipeline()

side_list = list()
with open ('customers_exclude.txt','r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())

print(side_list)

class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

class ProcessCustomers(beam.DoFn):
  def process(self,element,country,start_char):
    if(element[2]=='NY'):
      yield  element
    else:
      yield  beam.pvalue.TaggedOutput('Other_Cust',element)
    if(element[1].startswith('J')):
       yield  beam.pvalue.TaggedOutput('Names_J',element)
  


customers = (
    p1
    |beam.io.ReadFromText('Customers_age.txt')
    |beam.ParDo(SplitRow(),side_list)
    |beam.ParDo(ProcessCustomers(),country='NY',start_char='J').with_outputs('Names_J','Other_Cust',main='NewYork_Cust')
)

newyork_customers = customers.NewYork_Cust
other_customers = customers.Other_Cust
customer_withname_J = customers.Names_J

newyork_customers | 'Write Newyork Customers PCollection' >> beam.io.WriteToText("newyork")
other_customers  | 'Write Customers PCollection that lives in other cities' >> beam.io.WriteToText("customers_other_cities")
customer_withname_J  | 'Write Customers names with J PCollection' >> beam.io.WriteToText("customers_names_j")


p1.run()



['3', '7', '10']




<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fca4195ed90>

In [None]:
! cat newyork-00000-of-00001

['1', 'John', 'NY', '22']
['5', 'Samza', 'NY', '18']
['6', 'Maria', 'NY', '15']
['9', 'Mona', 'NY', '18']


In [None]:
!cat customers_names_j-00000-of-00001

['1', 'John', 'NY', '22']
['2', 'Jim', 'LA', '25']


In [None]:
!cat customers_other_cities-00000-of-00001

['2', 'Jim', 'LA', '25']
['4', 'Albert', 'LA', '20']
['8', 'Kavita', 'LA', '20']


## **[Regex Transforms](https://)**



**Regex Find**

Regex.find keeps only the elements that match the regular expression, returning the matched group.

In [None]:
import apache_beam as beam

# Matches a named group 'icon', and then two comma-separated groups.
regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
with beam.Pipeline() as pipeline:
  plants_find_all = (
      pipeline
      | 'Garden plants' >> beam.Create([
          'Strawberry, perennial',
          '# 🥕, Carrot, biennial ignoring trailing words',
          '# 🍆, Eggplant, perennial - 🍌, Banana, perennial',
          '# 🍅, Tomato- 🍉, Watermelon, annual',
          '# 🥔, Potato, perennial',
      ])
      | 'Parse plants' >> beam.Regex.find(regex)
      | beam.Map(print))



🥕, Carrot, biennial
🍆, Eggplant, perennial
🍉, Watermelon, annual
🥔, Potato, perennial


**Regex.find_all**

Regex.find_all returns a list of all the matches of the regular expression, returning the matched group

In [None]:
import apache_beam as beam

# Matches a named group 'icon', and then two comma-separated groups.
regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
with beam.Pipeline() as pipeline:
  plants_find_all = (
      pipeline
      | 'Garden plants' >> beam.Create([
          'Strawberry, perennial',
          '# 🥕, Carrot, biennial ignoring trailing words',
          '# 🍆, Eggplant, perennial - 🍌, Banana, perennial',
          '# 🍅, Tomato- 🍉, Watermelon, annual',
          '# 🥔, Potato, perennial',
      ])
      | 'Parse plants' >> beam.Regex.find_all(regex)
      | beam.Map(print))



[]
['🥕, Carrot, biennial']
['🍆, Eggplant, perennial', '🍌, Banana, perennial']
['🍉, Watermelon, annual']
['🥔, Potato, perennial']


**Regex.replace_all**

Regex.replace_all returns the string with all the occurrences of the regular expression replaced by another string

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants_replace_all = (
      pipeline
      | 'Garden plants' >> beam.Create([
          '🍓 : Strawberry : perennial',
          '🥕 : Carrot : biennial',
          '🍆\t:\tEggplant\t:\tperennial',
          '🍅 : Tomato : annual',
          '🥔 : Potato : perennial',
      ])
      | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',')
      | beam.Map(print))



🍓,Strawberry,perennial
🥕,Carrot,biennial
🍆,Eggplant,perennial
🍅,Tomato,annual
🥔,Potato,perennial


***Regex.replace_first***

Regex.replace_first returns the string with the first occurrence of the regular expression replaced by another string

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants_replace_first = (
      pipeline
      | 'Garden plants' >> beam.Create([
          '🍓, Strawberry, perennial',
          '🥕, Carrot, biennial',
          '🍆,\tEggplant, perennial',
          '🍅, Tomato, annual',
          '🥔, Potato, perennial',
      ])
      | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ')
      | beam.Map(print))



🍓: Strawberry, perennial
🥕: Carrot, biennial
🍆: Eggplant, perennial
🍅: Tomato, annual
🥔: Potato, perennial


***Regex.split***

Regex.split returns the list of strings that were delimited by the specified regular expression.

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants_split = (
      pipeline
      | 'Garden plants' >> beam.Create([
          '🍓 : Strawberry , perennial',
          '🥕 : Carrot : biennial',
          '🍆\t:\tEggplant : perennial',
          '🍅 : Tomato : annual',
          '🥔 : Potato : perennial',
      ])
      | 'Parse plants' >> beam.Regex.split(r'\s*:\s*')
      | beam.Map(print))



['🍓', 'Strawberry , perennial']
['🥕', 'Carrot', 'biennial']
['🍆', 'Eggplant', 'perennial']
['🍅', 'Tomato', 'annual']
['🥔', 'Potato', 'perennial']


## **[Values Transform](https://)**

Takes a collection of key-value pairs, and returns the value of each element



In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Values' >> beam.Values()
      | beam.Map(print))



Strawberry
Carrot
Eggplant
Tomato
Potato


# **Aggregations**

## **[GroupBy](https://)**

**Grouping is way to group data based on some column.**

In below example, we will group customers data based on country

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

ratings = (p1 
           |'Read ratings ' >> beam.io.ReadFromText('Customers_age.txt')
           |'Split Rows' >> beam.Map(lambda c: c.split(","))
           |'GroupBy Ratings' >> beam.GroupBy(lambda c: c[2])
           |'Write Grouped Data' >> beam.io.WriteToText('customer-grouped')
           )


p1.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f36e355a4d0>

In [None]:
! cat /content/customer-grouped-00000-of-00001

('NY', [['1', 'John', 'NY', '22'], ['3', 'Mary', 'NY', '30'], ['5', 'Samza', 'NY', '18'], ['6', 'Maria', 'NY', '15'], ['7', 'Shreya', 'NY', '30'], ['9', 'Mona', 'NY', '18'], ['10', 'Nandita', 'NY', '15']])
('LA', [['2', 'Jim', 'LA', '25'], ['4', 'Albert', 'LA', '20'], ['8', 'Kavita', 'LA', '20']])


## **[GroupByKey](https://)**

GroupByKey is made for key-value pairs, to group the data based on key

In [None]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  produce_counts = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('May', 'John'),
          ('April', 'Jim'),
          ('Feb', 'Samantha'),
          ('April', 'Jack'),
          ('Feb', 'Albert'),
          ('July', 'Wilson'),
          ('May', 'Ruby'),
          ('September', 'Tom'),
          ('October', 'Harry'),
          ('Feb', 'Mary'),
      ])
      | 'Group counts per produce' >> beam.GroupByKey()
      | beam.MapTuple(lambda k, vs: (k, sorted(vs)))  # sort and format
      | beam.Map(print))



('May', ['John', 'Ruby'])
('April', ['Jack', 'Jim'])
('Feb', ['Albert', 'Mary', 'Samantha'])
('July', ['Wilson'])
('September', ['Tom'])
('October', ['Harry'])


# **Combiners**

**Combiners are the PTransform that aggregates a PCollection, this is, from multiple input , they output one element or one element by key**



There are two types of Combiners

* **Global Combiners** : aggregates the input Pcollection into one output (per window if applied in windows)
* **Per Key Combiners**: aggregates the PCollection key value into onw output element per key (per window if applied on windows)

Some of the basic combiner functions are already built-in:

**Count** takes a PCollection and outputs the amount of elements. </br>
**Top** outputs the n largest/smallest of a PCollection given a comparison. <br>
**Mean** outputs the arithmetic mean of a PCollection.


Combiners can aggregate using the whole PCollection or by key using methods:

.**Globally** applies the combiner to the whole PCollection. </br>
.**PerKey** applies the combiner for each key-value in the Pcollection.

## **[Built-in Combiners](https://)**

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten
from apache_beam import CombineGlobally, CombinePerKey
from apache_beam.transforms.combiners import Top, Mean, Count
from apache_beam import pvalue, window, WindowInto

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())

elements = [
    {"country": "China", "population": 1389, "continent": "Asia"},
    {"country": "India", "population": 1311, "continent": "Asia"},
    {"country": "Japan", "population": 126, "continent": "Asia"},        
    {"country": "USA", "population": 331, "continent": "America"},
    {"country": "Ireland", "population": 5, "continent": "Europe"},
    {"country": "Indonesia", "population": 273, "continent": "Asia"},
    {"country": "Brazil", "population": 212, "continent": "America"},
    {"country": "Egypt", "population": 102, "continent": "Africa"},
    {"country": "Spain", "population": 47, "continent": "Europe"},
    {"country": "Ghana", "population": 31, "continent": "Africa"},
    {"country": "Australia", "population": 25, "continent": "Oceania"},
]

create = (p | "Create" >> beam.Create(elements)
            | "Map Keys" >> beam.Map(lambda x: (x['continent'], x['population'])))

element_count_total = create | "Total Count" >> Count.Globally() 

element_count_grouped = create | "Count Per Key" >> Count.PerKey()

top_grouped = create | "Top" >> Top.PerKey(n=1) 

mean_grouped = create | "Mean" >> Mean.PerKey()


ib.show(element_count_total)

ib.show(element_count_grouped)

ib.show(top_grouped)

ib.show(mean_grouped)










## **[CombineGlobally](https://)**

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten
from apache_beam import CombineGlobally, CombinePerKey
from apache_beam.transforms.combiners import Top, Mean, Count
from apache_beam import pvalue, window, WindowInto

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())

elements = ["Love all,",
            "trust a few,",
            "do wrong to none."]

combine = (p | "Create" >> Create(elements)
             | "Join" >> CombineGlobally(lambda x: " ".join(x)))

ib.show(combine)



## **[CombinePerKey](https://)**

In [None]:
import apache_beam as beam
p = beam.Pipeline(InteractiveRunner())

elements = [
            ("US", 2),
            ("India", 3),
            ("US", 4),
            ("India", 5),
            ("US", 3)
]

combine_key = (p | "Create" >> Create(elements)
                 | "Join By Language" >> CombinePerKey(lambda x: sum(x)))

ib.show(combine_key)



## **[Custom Combiner](https://)**

In [None]:
import apache_beam as beam


p = beam.Pipeline(InteractiveRunner())

def average_fn(elements):
  # print(elements)
  list_elements = list(elements)
  return sum(list_elements)/len(list_elements)


average = (p | "Create" >> Create(range(100))
             | CombineGlobally(average_fn))


ib.show(average)




In [None]:
import apache_beam as beam


p = beam.Pipeline(InteractiveRunner())

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    sum=0
    count=0
    return sum,count

  def add_input(self,accumulator,input):
    new_sum= accumulator[0]+input
    new_count=accumulator[1]+1
    return new_sum, new_count
  
  def merge_accumulators(self,accumulators):
    sums = [accumlator[0] for accumlator in accumulators]
    counts = [accumlator[1] for accumlator in accumulators]
    return sum(sums), sum(counts)
  
  def extract_output(self,final_accumulator):
    return final_accumulator[0]/final_accumulator[1]


average = (p | "Create" >> Create(range(100))
             | CombineGlobally(AverageFn()))


ib.show(average)

