# Apache Beam
* Unified + Portability 

* Open source, portable and unified data processing framework used for both batch  and streaming data ETL and analytics.

* It can work on all execution engines (Spark, Dataflow, Hadoop Mapreduce, Flink, ...)

* It is execution platform, data and programming language agnostic.

# History
* ~2004: Apache Hadoop (Java)
* ~2009 : Apache Spark (Initially Java, Python later)
* Apache Flink, Apache Storm 
* FlumeJava (Google)
* ~2013 : MillWheel (Google)
* Apache Samza, Apache Apex, Apache Heron, Cloud Dataflow(GCP)
* ~2016 : Gearpump, Apache Nemo
* ~2018-19 : Apache Beam 

* Supported languages: Python, Java, Go, etc.
* Based on the conept of pipeline. (Input Data --> Read --> Transform --> Dump Output data )
* Pcollection : The Dataset
* Ptransform : Transformation logic on data.
- Types:
    1. Parallel Processing (ParDo)
    2. GroupBy
    3. Combine
    4. Flatten
    5. Partitioning
* Pcollection1 --(Ptransform1)--> Pcollection2 ---(Ptransform2)---> Pcollection3
* Pcol1 -(Ptr1)-> Pcol2
* Pcol1 -(Ptr2)->Pcol3
* Side input : Addl data to support transformation.
* Runner : Framework on which we want to run the logic (Spark, Dataflow, Flink.)
* Windowing
* Trigger
* Schema

In [None]:
!pip install apache_beam

In [5]:
!pip install apache-beam[interactive]

zsh:1: no matches found: apache-beam[interactive]


In [2]:
import apache_beam as beam

# Create a pipeline

In [3]:
pipe = beam.Pipeline()

In [4]:
ip = (
    pipe
    |beam.io.ReadFromText('./online_retail_mini.csv', skip_header_lines=True)
    |beam.Map(lambda x: x.split(","))
    |beam.Filter(lambda x: x[1]=='21480')
    |beam.combiners.Count.Globally()
    |beam.Map(print)
)



In [28]:
pipe.run()

3


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x147900250>

In [29]:
# Combine the aforementioned 3 code blocks into one.
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./online_retail_mini.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Filter(lambda x: x[1]=='21410')
        |beam.combiners.Count.Globally()
        |beam.Map(print)
    )

2


# Functions

In [32]:
def filter_data(element):
    if element[1] == '21480':
        return element

In [33]:
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./online_retail_mini.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Filter(filter_data)
        |beam.combiners.Count.Globally()
        |beam.Map(print)
    )

3


# Aggregations Per Key

In [5]:
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./online_retail_mini.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Map(lambda x: (x[2], x))
        |beam.combiners.Count.PerKey()
        |beam.Map(print)
    )

('15CM CHRISTMAS GLASS BALL 20 LIGHTS', 3)
('PINK CHERRY LIGHTS', 3)
(' WHITE CHERRY LIGHTS', 4)
('"RECORD FRAME 7"" SINGLE SIZE "', 2)
('STRAWBERRY CERAMIC TRINKET BOX', 4)
('PINK DOUGHNUT TRINKET POT ', 3)
('SAVE THE PLANET MUG', 3)
('FANCY FONT HOME SWEET HOME DOORMAT', 3)
('CAT BOWL ', 2)
('"DOG BOWL ', 2)
('HEART MEASURING SPOONS LARGE', 1)
('LUNCHBOX WITH CUTLERY FAIRY CAKES ', 3)
('DOOR MAT BLACK FLOCK ', 1)
('LOVE BUILDING BLOCK WORD', 3)
('HOME BUILDING BLOCK WORD', 4)
('ASSORTED COLOUR BIRD ORNAMENT', 5)
(' PEACE WOODEN BLOCK LETTERS', 1)
('CHRISTMAS CRAFT WHITE FAIRY ', 4)
('HEART IVORY TRELLIS LARGE', 2)
('HEART FILIGREE DOVE LARGE', 2)
('FULL ENGLISH BREAKFAST PLATE', 2)
('PIZZA PLATE IN BOX', 3)
('BLACK DINER WALL CLOCK', 1)
('SET OF 3 BLACK FLYING DUCKS', 1)
('AREA PATROLLED METAL SIGN', 3)
('PLEASE ONE PERSON  METAL SIGN', 5)
('BATH BUILDING BLOCK WORD', 2)
('CLASSIC WHITE FRAME', 1)
('SMALL MARSHMALLOWS PINK BOWL', 1)
('BISCUITS SMALL BOWL LIGHT BLUE', 1)
('SCOTTIE DOG

# Aggregate with Multiple Keys

In [49]:
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Map(lambda x: (x[3] + "," + x[1], x))
        |beam.combiners.Count.PerKey()
        |beam.Map(print)
    )

('UPI,Narendra', 2)
('Cash,Amit', 1)
('UPI,Smriti', 1)
('UPI,Nitish', 1)
('Cash,Deependra', 1)
('UPI,Jaishankar', 1)
('Cash,Sukanta', 1)


# Combine Per Key

In [54]:
def sum_val_filter(element):
    if sum(element) >= 300:
        return sum(element)
    else:
        return 0
    
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Map(lambda x: (x[3] + "," + x[1], int(x[2])))
        |beam.CombinePerKey(sum_val_filter)
        |beam.Map(print)
    )

# ('UPI,Narendra', 100)
# ('Cash,Amit', 200)
# ('UPI,Smriti', 150)
# ('UPI,Nitish', 300)
# ('Cash,Deependra', 250)
# ('UPI,Jaishankar', 500)
# ('Cash,Sukanta', 400)
# ('UPI,Narendra', 700)

('UPI,Narendra', 800)
('Cash,Amit', 0)
('UPI,Smriti', 0)
('UPI,Nitish', 300)
('Cash,Deependra', 0)
('UPI,Jaishankar', 500)
('Cash,Sukanta', 400)


In [57]:
# Pass arg to function
def sum_val_filter(element, max_val):
    if sum(element) >= max_val:
        return sum(element)
    else:
        return 0
    
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Map(lambda x: (x[3] + "," + x[1], int(x[2])))
        |beam.CombinePerKey(sum_val_filter, 500)
        |beam.Map(print)
    )

('UPI,Narendra', 800)
('Cash,Amit', 0)
('UPI,Smriti', 0)
('UPI,Nitish', 0)
('Cash,Deependra', 0)
('UPI,Jaishankar', 500)
('Cash,Sukanta', 0)


In [60]:
with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Filter(lambda x: int(x[2])<=200)
        |beam.io.WriteToText("./Cust_out.csv")
    )

* ReadFromText
* Map
* Filter
* CombinePerKey
* combiners.Count.PerKey()
* combiners.Count.Globally()

# Composite Transformation

In [20]:
def Filter_trans_type(trans, input_elem):
    return input_elem[3] == trans 

def cap(elem):
    return elem[0], elem[1].title(), elem[2], elem[3]

def key_val(elem):
    return elem[3], elem[1] + "," + str(elem[2])

with beam.Pipeline() as pipe:
    input_collection = (
        pipe
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
    )

    UPI = (
        input_collection
        |beam.Filter(lambda record: Filter_trans_type('UPI', record))
        |"Capitalize the UPI customer names" >> beam.Map(cap)
        |"Generating Key value pairs for UPI" >> beam.Map(key_val)
        |"Write UPI results to file" >> beam.io.WriteToText("./UPI_Result")
    )

    CASH = (
        input_collection
        |beam.Filter(lambda record: Filter_trans_type('Cash', record))
        |"Capitalize the CASH customer name" >> beam.Map(cap)
        |"Generating Key value pairs for CASH" >> beam.Map(key_val)
        |"Write Cash results to file" >> beam.io.WriteToText("./Cash_Result")
    )



In [22]:
class MyTransform(beam.PTransform):
    def expand(self, input_col):
        a = (
            input_col
            |"capital" >> beam.Map(cap)
            |"key val" >> beam.Map(key_val)
        )
        return a

with beam.Pipeline() as pipe:
    input_collection = (
        pipe
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
    )

    UPI = (
        input_collection
        |beam.Filter(lambda record: Filter_trans_type('UPI', record))
        |"My Transformation for UPI" >> MyTransform()
        |"Write UPI results to file" >> beam.io.WriteToText("./new_UPI_Result")
    )

    CASH = (
        input_collection
        |beam.Filter(lambda record: Filter_trans_type('Cash', record))
        |"My Transformation for Cash" >> MyTransform()
        |"Write Cash results to file" >> beam.io.WriteToText("./new_Cash_Result")
    )

# ParDo

Every item is processed independently of the other ones. It is a stateless implementation.

In [41]:
class SplitRow(beam.DoFn):
    def process(self, elem):
        customer = elem.split(",")
        # return [customer]   # Pardo always returns a list
        yield customer

class key_val(beam.DoFn):
    def process(self, elem):
        return [(elem[3] + "," + elem[1] , int(elem[2]))]
    
class cap(beam.DoFn):
    def process(self, elem):
        return [(elem[0], elem[1].title(), elem[2], elem[3])]
    
class filter(beam.DoFn):
    def process(self, elem):
        if elem[3] == 'Cash':
            return [elem]

In [44]:
with beam.Pipeline() as pipe:
    ip = (
        pipe
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.ParDo(SplitRow())
        |beam.ParDo(cap())
        |beam.ParDo(filter())
        |beam.ParDo(key_val())
        |beam.CombinePerKey(sum)
        |beam.Map(print)
    )

('Cash,Amit', 300)
('Cash,Deependra', 450)
('Cash,Sukanta', 400)
('Cash,Nitish', 600)


# Side Input
Passing additional arguments in addition to existing arguments to a function

In [51]:
def sum_val_filter(element, max_val):
    if sum(element) >= max_val:
        return sum(element)
    else:
        return 0
    

with beam.Pipeline() as pipe1:
    ip1 = (
        pipe1
        |beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
        |beam.Map(lambda x: x.split(","))
        |beam.Map(lambda x: (x[3] + "," + x[1], int(x[2])))
        |beam.CombinePerKey(sum_val_filter, 1500)
        |beam.Map(print)
    )

('UPI,Narendra', 800)
('Cash,Amit', 0)
('UPI,Smriti', 0)
('UPI,nitish', 700)
('Cash,Deependra', 0)
('UPI,Jaishankar', 500)
('Cash,Sukanta', 0)
('UPI,Deependra', 1600)
('Cash,nitish', 600)
('UPI,Amit', 0)
('Cash,deependra', 0)


# Side Output

In [52]:
class filters(beam.DoFn):
    def process(self, element):
        if element[3] == 'Cash':
            yield element
        if element[3] == 'UPI':
            yield beam.pvalue.TaggedOutput('upi', element)

with beam.Pipeline() as pipe:
    ip = pipe | beam.io.ReadFromText('./Cust.csv', skip_header_lines=True)
    out = ip | beam.ParDo(SplitRow()) | beam.ParDo(filters()).with_outputs('upi', main='cash')
    cash1 = out.cash 
    upi = out.upi
    upi | "Writing to UPI file" >> beam.io.WriteToText("upi")
    cash1 | "Writing to Cash file" >> beam.io.WriteToText("cash")

In [3]:
import apache_beam as beam 
inputs = [0,1,2,3]

with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | beam.Create(inputs)
        | beam.Map(print)
    )

print(f"Outputs : {outputs}")

0
1
2
3
Outputs : PCollection[[3]: Map(print).None]


# Map

In [5]:
with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | "Create a pipeline" >> beam.Create(inputs)
        | "Multiplying each element by 100" >> beam.Map(lambda x:x*100)
        | beam.Map(print)
    )


0
100
200
300


# FlatMap

In [11]:
inputs = [1,2,3,4]
with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | "Create a pipeline" >> beam.Create(inputs)
        | "Repeating every element that many times and flattening out" >> beam.FlatMap(lambda x: [x for i in range(x)])
        | beam.Map(print)
    )

1
2
2
3
3
3
4
4
4
4


# Filter: One to zero

In [12]:
with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | "Create a pipeline" >> beam.Create(inputs)
        | "Keep only even numbers" >> beam.Filter(lambda x:x%2==0)
        | beam.Map(print)
    )

2
4


# Combine

In [15]:
inputs = [1,2,3,4,5,6]
with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | "Create a pipeline" >> beam.Create(inputs)
        | "Sum all the values together" >> beam.CombineGlobally(sum)
        | beam.Map(print)
    )

21


In [13]:
from functools import reduce 
x = [0,1,2,3,4,5,6,7,8,9,10]

reduce(lambda x,y : x + y,x,0)


55

# GroupByKey

In [17]:
inputs = [
    ('WB', 'Paddy'),
    ('Assam', 'Tea'),
    ('WB', 'Jute'),
    ('Karnataka', "Coffee"),
    ('Hyderabad', 'Chilly'),
    ('Karnataka', 'Coconut')
]
with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | "Create a pipeline" >> beam.Create(inputs)
        | "Grouping states by produce" >> beam.GroupByKey()
        | beam.Map(print)
    )

('WB', ['Paddy', 'Jute'])
('Assam', ['Tea'])
('Karnataka', ['Coffee', 'Coconut'])
('Hyderabad', ['Chilly'])


In [7]:
with beam.Pipeline() as pipeline:
	total_elements = (
		pipeline
		| 'Create fruits' >> beam.Create(
			['mango', 'apple', 'avocado', 'papaya', 'mango', 'pineapple', 'guava', 'banana', 'avocado', 'strawberry', 'banana']
			)
		| 'Count unique elements' >> beam.Distinct()
		|beam.combiners.Count.Globally()
		| beam.Map(print)
		)

8


# RECAP

## Map

In [4]:
import apache_beam as beam 

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus',
            'Rose ',
            'Lotus\n',
            'Tomato  ',
            'Blueberry.'
        ])
        | beam.Map(str.strip)
        | beam.Map(print)
    )

Hibiscus
Rose
Lotus
Tomato
Blueberry.


In [6]:
def do_str_strip(element):
    return element.strip('. \n')

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus',
            'Rose ',
            'Lotus\n',
            'Tomato  ',
            'Blueberry.'
        ])
        | beam.Map(do_str_strip)
        | beam.Map(print)
    )

Hibiscus
Rose
Lotus
Tomato
Blueberry


In [None]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus',
            'Rose ',
            'Lotus\n',
            'Tomato  ',
            'Blueberry.'
        ])
        | beam.Map(lambda x : x.strip())
        | beam.Map(print)
    )

In [4]:
def do_str_strip(element):
    return element.strip('. \n')

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus, Rose , Lotus, Tomato, Blueberry.'
        ])
        | beam.Map(do_str_strip)
        | beam.Map(lambda x: x.split(","))
        | beam.Map(print)
    )

['Hibiscus', ' Rose ', ' Lotus', ' Tomato', ' Blueberry']


In [5]:
def do_str_strip(element):
    return element.strip('. \n')

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus, Rose , Lotus', 
            'Tomato, Blueberry.'
        ])
        | beam.Map(do_str_strip)
        | beam.Map(lambda x: x.split(","))
        | beam.Map(print)
    )

['Hibiscus', ' Rose ', ' Lotus']
['Tomato', ' Blueberry']


In [2]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            (10, 'Apple'),
            (20, 'Mango')
        ])
        | beam.MapTuple(lambda x, y: f'{y} --> {x}')
        | beam.Map(print)
    )



Apple --> 10
Mango --> 20


## FlatMap

In [7]:
def do_str_strip(element):
    return element.strip('. \n')

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ' Hibiscus, Rose , Lotus', 
            'Tomato, Blueberry.'
        ])
        
        | beam.FlatMap(lambda x: x.split(","))
        | beam.Map(do_str_strip)
        | beam.Map(print)
    )

Hibiscus
Rose
Lotus
Tomato
Blueberry


# Filter

In [10]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            1,2,3,4,5
        ])
        | beam.Filter(lambda x: x%2 == 0)
        | beam.Map(print)
    )

2
4


In [12]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            "One", "Two", "Three", "Four", "Five"
        ])
        | beam.Filter(lambda x: len(x) >= 4)
        | beam.Map(print)
    )

Three
Four
Five


In [38]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.io.ReadFromText('Cust.csv', skip_header_lines=True)
        | beam.Map(lambda x: x.split(","))
        # | beam.Filter(lambda x: x[3]=='Cash')
        | beam.Map(lambda x: (x[3], int(x[2])))
        | beam.CombinePerKey(sum)
        | beam.Map(print)
    )

('UPI', 3850)
('Cash', 1750)


# ParDo

In [33]:
class SplitWords(beam.DoFn):
    def __init__(self, delimiter=','):
        self.delimiter = delimiter 
    
    def process(self, element):
        yield element.split(self.delimiter)      # Recommended
        # return [element.split(self.delimiter)]   # Not recommended

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            "One, Two, Three, Four, Five"
        ])
        | beam.ParDo(SplitWords())
        | beam.Map(print)
    )

['One', ' Two', ' Three', ' Four', ' Five']


In [36]:
class SplitRow(beam.DoFn):
    def __init__(self, delimiter=','):
        self.delimiter = delimiter 
    
    def process(self, element):
        yield element.split(self.delimiter) 

class FilterUPI(beam.DoFn):
    def process(self, element):
        if (element[3]=='UPI'):
            yield element 

with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.io.ReadFromText('Cust.csv', skip_header_lines=True)
        | beam.ParDo(SplitRow())
        | beam.ParDo(FilterUPI())
        | beam.Map(print)
    )

['123', 'Narendra', '100', 'UPI']
['125', 'Smriti', '150', 'UPI']
['222', 'nitish', '300', 'UPI']
['444', 'Jaishankar', '500', 'UPI']
['123', 'Narendra', '700', 'UPI']
['222', 'nitish', '400', 'UPI']
['128', 'Deependra', '1600', 'UPI']
['124', 'Amit', '100', 'UPI']


# GroupByKey

In [45]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.Create([
            ('Carnivores', 'Tiger'),
            ('Herbivores', 'Rabbit' ),
            ('Omnivores', 'Comodo Dragon'),
            ('Carnivores', 'Lion' ),
            ('Herbivores', 'Kangaroo' ),
            ('Herbivores', 'Guineapig' ),
        ])
        | beam.GroupByKey()
        | beam.MapTuple(lambda x,y : f'({x},{len(y)}, {y})')
        # | beam.combiners.Count.PerKey()
        | beam.Map(print)
    )

(Carnivores,2, ['Tiger', 'Lion'])
(Herbivores,3, ['Rabbit', 'Kangaroo', 'Guineapig'])
(Omnivores,1, ['Comodo Dragon'])


# CoGroupBy

In [53]:
jobs = [
    ('Subir', 'Software'),
    ('Kalyani', 'Freelancer'),
    ('Subir', 'Businessman'),
    ('Alex', 'Gamer')
]

passion = [
    ('Subir', 'Wanderer'),
    ('Kalyani', 'Artist'),
    ('Kalyani', 'Vlogger'),
    ('Bob', 'Photographer')
]

p = beam.Pipeline()
pc_jobs = p | "Jobs pcollection" >> beam.Create(jobs)
pc_passion = p | "Passion pcollection" >> beam.Create(passion)
(pc_jobs, pc_passion) | beam.CoGroupByKey() | beam.Map(print)

p.run()

('Subir', (['Software', 'Businessman'], ['Wanderer']))
('Kalyani', (['Freelancer'], ['Artist', 'Vlogger']))
('Bob', ([], ['Photographer']))
('Alex', (['Gamer'], []))


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x16c196190>

# Aggregations Per Key

## Count per key

In [57]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.io.ReadFromText('Cust.csv', skip_header_lines=True)
        | beam.Map(lambda x: x.split(","))
        | beam.Map(lambda x: (x[3], int(x[2])))
        | beam.combiners.Count.PerKey()
        | beam.Map(print)
    )

('UPI', 8)
('Cash', 6)


## CombinePerKey

In [59]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.io.ReadFromText('Cust.csv', skip_header_lines=True)
        | beam.Map(lambda x: x.split(","))
        | beam.Map(lambda x: (x[3], int(x[2])))
        | beam.CombinePerKey(sum)
        | beam.Map(print)
    )

('UPI', 3850)
('Cash', 1750)


# CombineGlobally

In [63]:
with beam.Pipeline() as pipeline:
    plants = (
        pipeline 
        | beam.io.ReadFromText('Cust.csv', skip_header_lines=True)
        | beam.Map(lambda x: x.split(","))
        | beam.Map(lambda x:  int(x[2]))
        | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

5600


In [88]:
with beam.Pipeline() as pipeline:
    mcf = (
        pipeline
        | beam.Create([(1,2,3,4,5,6,7)])
        | beam.FlatMap(lambda x: x)
        | beam.Map(lambda x: x**2)
        # | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

1
4
9
16
25
36
49


In [91]:
def list_join(element):
    res_list = []
    for l in element:
        # print(l)
        res_list = res_list + l 
    return res_list

with beam.Pipeline() as pipeline:
    mcf = (
        pipeline
        | beam.Create([
            [1,2,3,4,5,6,7],
            [8,9,10]
                       ])
        | beam.CombineGlobally(list_join)
        | beam.Map(print)
    )

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


# Timestamp by processing time
* beam.DoFn.TimestampParam binds the timestamp info as an `apache_beam.utils.timestamp.Timestamp` object.
* beam.DoFn.WindowParam binds the window info as an appropriate `apache_beam.transforms.window.*Window` object.

In [17]:
import apache_beam as beam 
import time 

class GetTimestamp(beam.DoFn):
    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
        yield f"{timestamp.to_utc_datetime()} - {plant['name']}"

with beam.Pipeline() as pipeline:
    plant_processing_times = (
        pipeline 
        | beam.Create(
            [
                {'name' : 'Strawberry'},
                {'name' : 'Blueberry'},
                {'name' : 'Raspberry'},
                {'name' : 'Cherry'},
                {'name' : 'Avocado'}
            ]
        )
        | beam.Map(lambda plant: beam.window.TimestampedValue(plant, time.time()))
        | beam.ParDo(GetTimestamp())
        | beam.Map(print)
    )

2024-07-30 15:55:57.954463 - Strawberry
2024-07-30 15:55:57.954613 - Blueberry
2024-07-30 15:55:57.954645 - Raspberry
2024-07-30 15:55:57.954672 - Cherry
2024-07-30 15:55:57.954698 - Avocado


In [12]:
import apache_beam as beam 

class AnalyzeElement(beam.DoFn):
    def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        yield '\n'.join([
            '# timestamp',
            'type(timestamp) -> ' + repr(type(timestamp)),
            'timestamp.micros() -> ' + repr(timestamp.micros),
            'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
            'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
            '',
            '#Window',
            'type(window) -> ' + repr(type(window)),
            'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
            'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
            'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
        ])

with beam.Pipeline() as pipeline:
    dofn_params = (
        pipeline
        | beam.Create([':)'])
        | beam.Map(lambda elem: beam.window.TimestampedValue(elem, 1584675660))
        | beam.WindowInto(beam.window.FixedWindows(30))
        | beam.ParDo(AnalyzeElement())
        | beam.Map(print)
    )

# timestamp
type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
timestamp.micros() -> 1584675660000000
timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)

#Window
type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)


# Event Time Windowing

In [14]:
import apache_beam as beam 
from apache_beam.transforms.window import (
    TimestampedValue,
    Sessions,
    Duration
)
from apache_beam.io.textio import WriteToText

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        unix_timestamp = element["timestamp"]
        element = (element["userId"], element["click"])
        yield TimestampedValue(element, unix_timestamp)

with beam.Pipeline() as p:
    events = (
        p
        | beam.Create([
            {"userId" : "Andy", "click" : 2, "timestamp": 1603112520},
            {"userId" : "Sam", "click" : 3, "timestamp": 1603113240},
            {"userId" : "Andy", "click" : 4, "timestamp": 1603115820},
            {"userId" : "Andy", "click" : 5, "timestamp": 1603113600},
        ])
    )
    timestamped_events = events | beam.ParDo(AddTimestampDoFn())
    windowed_events = timestamped_events | beam.WindowInto(
        Sessions(gap_size=30 * 60),
        trigger=None,
        accumulation_mode=None,
        timestamp_combiner=None,
        allowed_lateness=Duration(seconds=1 * 24 * 60 * 60),
    )

    sum_clicks = windowed_events | beam.CombinePerKey(sum)
    sum_clicks | beam.Map(print)

('Andy', 7)
('Andy', 4)
('Sam', 3)


In [11]:
import time 

class GetTimestamp(beam.DoFn):
    def process(self, event, timestamp=beam.DoFn.TimestampParam):
        yield f"{timestamp.to_utc_datetime()} - {event}"

def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:
    time_tuple = time.strptime(time_str, time_format)
    return int(time.mktime(time_tuple))

@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(beam.window.TimestampedValue)
def AstronomicalEvents(pipeline):
    return (
        pipeline 
        | beam.Create([
            ('2021-03-20 03:37:00', 'March Equinox 2021'),
            ('2021-04-26 22:31:00', 'Super Full Moon'),
            ('2021-05-11 13:59:00', 'Micro new Moon'),
            ('2021-05-26 06:13:00', 'Super Full Moon, total lunar eclipse'),
            ('2021-06-20 22:32:00', 'June Solstice 2021'),
            ('2021-08-22 07:01:00', 'September Equinox 2021'),
            ('2021-09-22 14:21:00', 'Super new moon'),
            ('2021-11-19 02:57:00', 'Micro full moon, partial lunar eclipse'),
            ('2021-12-04 01:43:00', 'Super new Moon'),
            ('2021-12-18 10:35:00', 'Micro full moon'),
            ('2021-12-21 09:59:00', 'December Solstice 2021')
        ])
        | beam.MapTuple(
            lambda timestamp, element: beam.window.TimestampedValue(element, to_unix_time(timestamp))
        )
    )

with beam.Pipeline() as pipeline:
    (
        pipeline 
        | AstronomicalEvents()
        | beam.ParDo(GetTimestamp())
        | beam.Map(print)
    )

2021-03-20 07:37:00 - March Equinox 2021
2021-04-27 02:31:00 - Super Full Moon
2021-05-11 17:59:00 - Micro new Moon
2021-05-26 10:13:00 - Super Full Moon, total lunar eclipse
2021-06-21 02:32:00 - June Solstice 2021
2021-08-22 11:01:00 - September Equinox 2021
2021-09-22 18:21:00 - Super new moon
2021-11-19 07:57:00 - Micro full moon, partial lunar eclipse
2021-12-04 06:43:00 - Super new Moon
2021-12-18 15:35:00 - Micro full moon
2021-12-21 14:59:00 - December Solstice 2021


In [9]:
import apache_beam as beam 

def human_readable_window(window) -> str:
    if isinstance(window, beam.window.GlobalWindow):
        return str(window)
    return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'

class PrintElementInfo(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        print(f"[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}")

@beam.ptransform_fn
def PrintWindowInfo(pcollection):
    class PrintCountsInfo(beam.DoFn):
        def process(self, num_elements, window=beam.DoFn.WindowParam):
            print(f">> Window[{human_readable_window(window)}] has {num_elements} elements.")
            yield num_elements

    return (
        pcollection
        | 'Count elements per window' >> beam.combiners.Count.Globally().without_defaults()
        | 'Print counts info' >> beam.ParDo(PrintCountsInfo())
    )

## Global Window

In [10]:
import apache_beam as beam 

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical Events" >> AstronomicalEvents()
        | 'Print element info' >> beam.ParDo(PrintElementInfo())
        | 'Print window info' >> PrintWindowInfo()
    )

[GlobalWindow] 2021-03-20 07:37:00 -- March Equinox 2021
[GlobalWindow] 2021-04-27 02:31:00 -- Super Full Moon
[GlobalWindow] 2021-05-11 17:59:00 -- Micro new Moon
[GlobalWindow] 2021-05-26 10:13:00 -- Super Full Moon, total lunar eclipse
[GlobalWindow] 2021-06-21 02:32:00 -- June Solstice 2021
[GlobalWindow] 2021-08-22 11:01:00 -- September Equinox 2021
[GlobalWindow] 2021-09-22 18:21:00 -- Super new moon
[GlobalWindow] 2021-11-19 07:57:00 -- Micro full moon, partial lunar eclipse
[GlobalWindow] 2021-12-04 06:43:00 -- Super new Moon
[GlobalWindow] 2021-12-18 15:35:00 -- Micro full moon
[GlobalWindow] 2021-12-21 14:59:00 -- December Solstice 2021


## Fixed time window

In [16]:
import apache_beam as beam 
from datetime import timedelta

window_size = timedelta(days=3*30).total_seconds()
print(f"window size : {window_size} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical Events" >> AstronomicalEvents()
        | 'Fixed window' >> beam.WindowInto(beam.window.FixedWindows(window_size))
        | 'Print element info' >> beam.ParDo(PrintElementInfo())
        | 'Print window info' >> PrintWindowInfo()
    )

window size : 7776000.0 seconds
[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 07:37:00 -- March Equinox 2021
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-27 02:31:00 -- Super Full Moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 17:59:00 -- Micro new Moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 10:13:00 -- Super Full Moon, total lunar eclipse
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-06-21 02:32:00 -- June Solstice 2021
[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-08-22 11:01:00 -- September Equinox 2021
[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-09-22 18:21:00 -- Super new moon
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-19 07:57:00 -- Micro full moon, partial lunar eclipse
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-04 06:43:00 -- Super new Moon
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-18 15:35:00 -- Micro full moon
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-21 14:59:00 -- December Solstice

## Sliding time windows

In [17]:
import apache_beam as beam 
from datetime import timedelta

window_size = timedelta(days=3*30).total_seconds()
window_period = timedelta(days=30).total_seconds()
print(f"window size : {window_size} seconds")
print(f"window period : {window_period} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical Events" >> AstronomicalEvents()
        | 'Sliding window' >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))
        | 'Print element info' >> beam.ParDo(PrintElementInfo())
        | 'Print window info' >> PrintWindowInfo()
    )

window size : 7776000.0 seconds
window period : 2592000.0 seconds
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-03-20 07:37:00 -- March Equinox 2021
[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-03-20 07:37:00 -- March Equinox 2021
[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 07:37:00 -- March Equinox 2021
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-27 02:31:00 -- Super Full Moon
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-04-27 02:31:00 -- Super Full Moon
[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-04-27 02:31:00 -- Super Full Moon
[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-11 17:59:00 -- Micro new Moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 17:59:00 -- Micro new Moon
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-05-11 17:59:00 -- Micro new Moon
[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-26 10:13:00 -- Super Full Moon, total lunar eclipse
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 10:13:00 -- Super Full

In [18]:
import apache_beam as beam 
from datetime import timedelta

window_size = timedelta(days=3*30).total_seconds()
window_period = timedelta(days=30).total_seconds()
print(f"window size : {window_size} seconds")
print(f"window period : {window_period} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical Events" >> AstronomicalEvents()
        | 'Print element info' >> beam.ParDo(PrintElementInfo())
        | 'Sliding window' >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))  
        | 'Print window info' >> PrintWindowInfo()
    )

window size : 7776000.0 seconds
window period : 2592000.0 seconds
[GlobalWindow] 2021-03-20 07:37:00 -- March Equinox 2021
[GlobalWindow] 2021-04-27 02:31:00 -- Super Full Moon
[GlobalWindow] 2021-05-11 17:59:00 -- Micro new Moon
[GlobalWindow] 2021-05-26 10:13:00 -- Super Full Moon, total lunar eclipse
[GlobalWindow] 2021-06-21 02:32:00 -- June Solstice 2021
[GlobalWindow] 2021-08-22 11:01:00 -- September Equinox 2021
[GlobalWindow] 2021-09-22 18:21:00 -- Super new moon
[GlobalWindow] 2021-11-19 07:57:00 -- Micro full moon, partial lunar eclipse
[GlobalWindow] 2021-12-04 06:43:00 -- Super new Moon
[GlobalWindow] 2021-12-18 15:35:00 -- Micro full moon
[GlobalWindow] 2021-12-21 14:59:00 -- December Solstice 2021


## Session Window

In [20]:
import apache_beam as beam 
from datetime import timedelta

gap_size = timedelta(days=30).total_seconds()
print(f"window size : {gap_size} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical Events" >> AstronomicalEvents()
        | 'Sliding window' >> beam.WindowInto(beam.window.Sessions(gap_size))
        | 'Print element info' >> beam.ParDo(PrintElementInfo())
        | 'Print window info' >> PrintWindowInfo()
    )

window size : 2592000.0 seconds
[2021-03-20 07:37:00 - 2021-04-19 07:37:00] 2021-03-20 07:37:00 -- March Equinox 2021
[2021-04-27 02:31:00 - 2021-05-27 02:31:00] 2021-04-27 02:31:00 -- Super Full Moon
[2021-05-11 17:59:00 - 2021-06-10 17:59:00] 2021-05-11 17:59:00 -- Micro new Moon
[2021-05-26 10:13:00 - 2021-06-25 10:13:00] 2021-05-26 10:13:00 -- Super Full Moon, total lunar eclipse
[2021-06-21 02:32:00 - 2021-07-21 02:32:00] 2021-06-21 02:32:00 -- June Solstice 2021
[2021-08-22 11:01:00 - 2021-09-21 11:01:00] 2021-08-22 11:01:00 -- September Equinox 2021
[2021-09-22 18:21:00 - 2021-10-22 18:21:00] 2021-09-22 18:21:00 -- Super new moon
[2021-11-19 07:57:00 - 2021-12-19 07:57:00] 2021-11-19 07:57:00 -- Micro full moon, partial lunar eclipse
[2021-12-04 06:43:00 - 2022-01-03 06:43:00] 2021-12-04 06:43:00 -- Super new Moon
[2021-12-18 15:35:00 - 2022-01-17 15:35:00] 2021-12-18 15:35:00 -- Micro full moon
[2021-12-21 14:59:00 - 2022-01-20 14:59:00] 2021-12-21 14:59:00 -- December Solstice