<a href="https://colab.research.google.com/github/twrobel/apache-beam-lab/blob/main/apache-beam-lab-advanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction
## Windows - What are they good for?

When we work on a _batch_ (e.g. a `.csv` file) we are dealing with a _bounded_ input. There are a finite number of records 
in our `PCollection`. Working with a bounded input comes naturally to most people, so much so that there are a number of 
concepts that we probably take for granted when doing so. For example, imagine you are processing a `.csv` file containing
the scores that players recorded on an online game. The `.csv` file is published daily (the number of records in the file
vary from one day to the next). Say you wanted to determine the average score for players aged 16-18? You could perform 
a `GroupByKey` operation on the `PCollection` records where the key `16 >= age <= 18` and calculate the `mean`. In this 
scenario, we usally assume that the `GroupByKey` operation will be performed on _all_ of the matching records in the 
`.csv` file.  

Now consider an alternative scenario where the data arrives as a stream via a Kafka Topic. In this scenario our input is 
_unbounded_; there are an infinite (or potentially infinite) number of records in our `PCollection`. We can't  simply 
declare a `GroupByKey` operation on the stream where `16 >= age <= 18`, because these types of records could keep 
arriving forever (apparently, a lot of teenagers are playing video games these days). Instead, we also need to declare a
_boundary_ on the data we want processed. Windows! Windows let us explicitly define boundaries on our input data. 

## Types of Windows

### Fixed Time Windows (aka Tumbling Windows)

Given a timestamped `PCollection` we declare a window to capture all of the elements whose timestamps lie within the 
specified time range. For example, we might declare a fixed window with a duration of 30 seconds on our stream data. 
Then, any elements with a timestamp in the range `[00:00:00, 00:00:30)` (i.e. up to but not including `00:00:30`) would 
get averaged as part of Window 0. Likewise, any elements with a timestamp in the range `[00:00:30, 00:01:00)` would get 
averaged as part of Window 1, and so on.  

<table class="image">
<caption align="bottom" style="text-align: center">https://beam.apache.org/documentation/programming-guide</caption>
<tr><td><img src="https://beam.apache.org/images/fixed-time-windows.png"></td></tr>
</table>
 
### Sliding Time Windows

Sliding windows can overlap. For example, we could declare a sliding window _duration_ of 60 seconds, and declare that a 
new window should start every 30 seconds (called the _period_). In this case, elements will belong to more than 1 
window. This type of windowing can be used to create rolling averages.

<table class="image">
<caption align="bottom" style="text-align: center">https://beam.apache.org/documentation/programming-guide</caption>
<tr><td><img src="https://beam.apache.org/images/sliding-time-windows.png"></td></tr>
</table>

### Session Windows

A session window creates a boundary around a series of consecutive events separated by a duration of time (i.e. a
_gap_). For example, imagine we are collecting user input data (e.g. keyboard strokes, joystick movement, touch input) 
for the players in our online game. We might expect to see bursts of data for each player, followed by gaps with no 
activity (time for a soda, time for homework). When data arrives after the specified gap duration, a new window is 
created. Note that session windows are applied on a per-key basis.   

<table class="image">
<caption align="bottom" style="text-align: center">
    https://beam.apache.org/documentation/programming-guide
</caption>
<tr><td><img src="https://beam.apache.org/images/session-windows.png"></td></tr>
</table>

### Global Windows

This is the default window if your pipeline doesn't explicitly create one of the aforementioned windows. When we 
considered our batch data example, we relied on a global window. Because our datasource was a `.csv` file, the data was 
bounded so we could safely perform aggregation operations (e.g. `GroupByKey`, `Combine`) operations. Actually, you _can_ 
use a global window on streaming data, under a couple of circumstances:

- You aren't performing any aggregation operations in your pipeline. For example, if your pipeline is performing simple 
transformations on individual `PCollection` elements as they arrive on the stream. 
- You provide a non-default `Trigger` for the global window. Triggers are the mechanism used by Beam to determine when 
to emit the results of a window. So for example, you might use a custom trigger that says to emit the results of your 
global window every time 50 elements arrive.



## Windowing with bounded PCollections
You can use windowing with fixed-size data sets in bounded PCollections. However, note that windowing considers only the implicit timestamps attached to each element of a PCollection, and data sources that create fixed data sets (such as TextIO) assign the same timestamp to every element. This means that all the elements are by default part of a single, global window.

To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a ParDo transform with a DoFn that outputs each element with a new timestamp (for example, the WithTimestamps transform in the Beam SDK for Java).

##  Setup

In [3]:
# Install apache-beam with pip.
!pip install --quiet 'apache-beam[interactive]' 

In [4]:
# Import required libraries
import apache_beam as beam
from datetime import datetime
import pytz

# some convenience functions to help keep the code terse
def dtime(yy=2020, mm=3, day=4, hour=0, min=0, sec=0, ms=0, tzinfo=pytz.UTC):
    return datetime(yy, mm, day, hour, min, sec, ms, tzinfo)

def tstamp(**kwargs):
    return dtime(**kwargs).timestamp()


## In the following exercises, you will learn how to
1. Use a ParDo transform
2. Add timestamps to elements in a PCollection
3. Use fixed windows
4. Use sliding windows
5. Use session windows
6. Use global windows

### Before you start, here are a few things to remember...
* Read the examples carefully before each exercise as they provide you with useful hints to complete the exercises. 

* In the exercises, you must write your code in spots marked by:
> #YOUR CODE HERE

* To see the output of your code, select the code cell you want to run and then click "Run" in the toolbar above or press "Ctrl + Enter".
* Running a markdown cell will display it's HTML output. No harm done. Just move along.

### Example - ParDo Transform
#### A ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection.

In the example below, we compute the length for each word in a PCollection of "words" using the ParDo transform and a DoFn object ComputeWordLengthFn() that can computer word length. 

```python
  pipeline
      | 'Create collection' >> beam.Create(words)
      | 'Compute word length' >> beam.ParDo(ComputeWordLengthFn())
```

#### The DoFn object that you pass to ParDo contains the processing logic that gets applied to the elements in the input collection. When you use Beam, often the most important pieces of code you’ll write are these DoFns - they’re what define your pipeline’s exact data processing tasks.

Inside your DoFn subclass, you’ll must include a method process where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.

```python
def process(self, element):
    return [len(element)]
```

In [None]:
class ComputeWordLengthFn(beam.DoFn):
    def process(self, element):
        return [len(element)]

words = ['Person', 'Woman', 'Man', 'Camera', 'TV']

with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | 'Create values' >> beam.Create(words)
        | 'Compute word length' >> beam.ParDo(ComputeWordLengthFn())
    )
    outputs | beam.Map(print)

### Exercise 1 
### Use a ParDo transform to count vowels in each word

Implement the DoFn object that returns an iterable with number of vowels in each element. Use a ParDo transform to count the number of vowels in a word collection.

#### Expected Output:
```
2
2
1
3
0
```

In [None]:
class CountVowelsFn(beam.DoFn):
    def process(self, element):
        return #YOUR CODE HERE


words = ['Person', 'Woman', 'Man', 'Camera', 'TV']

with beam.Pipeline() as pipeline:
    outputs = (
        pipeline
        | 'Create values' >> beam.Create(words)
        | 'Compute word length' >> beam.ParDo(CountVowelsFn())
    )
    outputs | beam.Map(print)

### Example - Add timestamp to animal sighting events
#### You can assign new timestamps to the elements of a PCollection by applying a ParDo transform that outputs new elements with timestamps that you set.  

In the example below, inputs are a collection of animal sightings events with a timestamp field. Using the timestamp field on the Animal Sighting Event, we can assign a timestamp to the elements of PCollection with the help of AddTimestampFn. We then verify that the timestamps are attached by retrieving them with the help of GetTimestampFn.

#### beam.window.TimestampedValue takes a value and a Unix timestamp in the form of seconds.
```python
class AddTimestampFn(beam.DoFn):
    def process(self, element, **kwargs):
        unix_timestamp = element.timestamp.timestamp()
        yield beam.window.TimestampedValue(element, unix_timestamp)
```

In [None]:
class AnimalSightingEvent:
    def __init__(self, id, animal, timestamp):
        self.id = id
        self.animal = animal
        self.timestamp = timestamp
        
class AddTimestampFn(beam.DoFn):
    def process(self, element, **kwargs):
        unix_timestamp = element.timestamp.timestamp()
        yield beam.window.TimestampedValue(element, unix_timestamp)
        
class GetTimestampFn(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        yield '{} - {} - {}'.format(timestamp.to_utc_datetime(), element.id, element.animal)
        
inputs = [
    AnimalSightingEvent('1', '🐹', dtime(yy=2020, mm=3, day=4)),
    AnimalSightingEvent('2', '🐼', dtime(yy=2020, mm=3, day=5)),
    AnimalSightingEvent('3', '🐰', dtime(yy=2020, mm=3, day=6)),
    AnimalSightingEvent('4', '🐹', dtime(yy=2020, mm=3, day=7)),
    AnimalSightingEvent('5', '🐰', dtime(yy=2020, mm=3, day=8)),
]

with beam.Pipeline() as pipeline:
    outputs = (
      pipeline
      | 'Create animal sightings' >> beam.Create(inputs)
      | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
      | 'Get timestamps' >> beam.ParDo(GetTimestampFn())
    )
    outputs | beam.Map(print)

### Exercise 2 - Add timestamp to food sighting events
Create a pipeline of food sighting events. Use the timestamp field from the food sighting events to add a timestamp to elements of the PCollection. Finally, print out the timestamps for each element.

#### Expected Output
```
2020-03-04 00:00:00 - 1 - 🌽
2020-03-05 00:00:00 - 2 - 🌽
2020-03-06 00:00:00 - 3 - 🥕
2020-03-07 00:00:00 - 4 - 🥕
2020-03-08 00:00:00 - 5 - 🌽
```

In [5]:
class FoodSightingEvent:
    def __init__(self, id, food, timestamp):
        self.id = id
        self.food = food
        self.timestamp = timestamp
        
class AddTimestampFn(beam.DoFn):
    def process(self, element, **kwargs):
        unix_timestamp = element.timestamp.timestamp()
        yield beam.window.TimestampedValue(element, unix_timestamp)
        
class GetTimestampFn(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        yield '{} - {} - {}'.format(timestamp.to_utc_datetime(), element.id, element.food)

In [7]:
inputs = [
    FoodSightingEvent('1', '🌽', dtime(yy=2020, mm=3, day=4)),
    FoodSightingEvent('2', '🌽', dtime(yy=2020, mm=3, day=5)),
    FoodSightingEvent('3', '🥕', dtime(yy=2020, mm=3, day=6)),
    FoodSightingEvent('4', '🥕', dtime(yy=2020, mm=3, day=7)),
    FoodSightingEvent('5', '🌽', dtime(yy=2020, mm=3, day=8)),
]

with beam.Pipeline() as pipeline:
    outputs = (
      pipeline 
      | # YOUR CODE HERE 
      | # YOUR CODE HERE
      | # YOUR CODE HERE
    )
    outputs | beam.Map(print)

2020-03-04 00:00:00 - 1 - 🌽
2020-03-05 00:00:00 - 2 - 🌽
2020-03-06 00:00:00 - 3 - 🥕
2020-03-07 00:00:00 - 4 - 🥕
2020-03-08 00:00:00 - 5 - 🌽


### Exercise 3 - Count events using fixed windows
Count the number of events that happened based on a fixed window with 1-minute duration.

#### Expected Output
```
('🌽', 1)
('🥕', 2)
('🥕', 2)
('🥕', 3)
('🐰', 1)
('🐰', 1)
```
#### Hints
- You need to count the elements. One way to do this is:
`beam.combiners.Count.PerElement()`
- You need to create an appropriate fixed window boundary over your data:
`beam.WindowInto(beam.window.FixedWindows(<duraton in secs>))`
- You need to group the elements in each window according to an appropriate key. You can use the Map fn: 
`beam.Map(lambda element: element.<some_attribute>)`
- You should perform these tasks in the correct order :-)

In [8]:
with beam.Pipeline() as p:
  (p | beam.Create([
          # Window 0:
          FoodSightingEvent('1', '🌽', dtime(hour=1, min=1, sec=5)),
          FoodSightingEvent('2', '🥕', dtime(hour=1, min=1, sec=15)),
          FoodSightingEvent('3', '🥕', dtime(hour=1, min=1, sec=16)),
          FoodSightingEvent('4', '🐰', dtime(hour=1, min=1, sec=59)),
          # Window 1:
          FoodSightingEvent('5', '🥕', dtime(hour=1, min=31, sec=18)),
          FoodSightingEvent('6', '🥕', dtime(hour=1, min=31, sec=36)),
          # Window 2
          FoodSightingEvent('7', '🥕', dtime(hour=1, min=34, sec=1)),
          FoodSightingEvent('8', '🥕', dtime(hour=1, min=34, sec=18)),
          FoodSightingEvent('9', '🥕', dtime(hour=1, min=34, sec=51)),
          # Window 3
          FoodSightingEvent('10', '🐰', dtime(hour=2, min=34, sec=51)),
       ])
     | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
     | # YOUR CODE HERE 
     | # YOUR CODE HERE 
     | # YOUR CODE HERE 
     | beam.Map(print)) 

SyntaxError: invalid syntax (<ipython-input-8-3bb321509d61>, line 20)

### Exercise 4 - Count events using sliding windows

Now lets consider sliding windows. After every 30 seconds (the period) we want to calculate the number of elements that 
arrived over the last minute (i.e. the duration). We are going to use a different input on this example to help ensure 
the output is easier to understand. 

#### Hints

This solution here should look a lot like the previous, except you need to use a Sliding Window:
`beam.window.SlidingWindows(<duration>, <period>)`

#### Expected Output
```
('🌽', 1)
('🌽', 1)
('🍏', 1)
('🍏', 1)
('🍕', 1)
('🍕', 1)
('🐰', 1)
('🐰', 1)
```
Assuming the window is firing on the half minute, then the '🌽' is included in the first 2 windows:

- Window 0 [01:00:30, 01:01:30) <=> '🌽' 01:01:05
- Window 1 [01:01:00, 01:02:00) <=> '🌽' 01:01:05
- Window 2 [01:01:30, 01:02:30) 

Try experimenting with different periods to the result on the output! 

In [18]:
with beam.Pipeline() as p:
  (p | beam.Create([
          FoodSightingEvent('1', '🌽', dtime(hour=1, min=1, sec=5)),
      
          FoodSightingEvent('2', '🍏', dtime(hour=1, min=31, sec=36)),
      
          FoodSightingEvent('3', '🍕', dtime(hour=1, min=34, sec=1)),
      
          FoodSightingEvent('4', '🐰', dtime(hour=2, min=34, sec=51)),
       ])

     | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
     | # YOUR CODE HERE 
     | # YOUR CODE HERE 
     | # YOUR CODE HERE
     | beam.Map(print))

('🌽', 1)
('🌽', 1)
('🍏', 1)
('🍏', 1)
('🍕', 1)
('🍕', 1)
('🐰', 1)
('🐰', 1)


### Exercise 5 - Count events using session windows

In this example, lets try counting events that occur in a given session window. Recall that a new Session window is 
created whenever Beam sees a specifed gap size (i.e. a period of time with no data elements). Let's use a gap size of 5
minutes.   

#### Hints

You need to use a Session Window: beam.window.Sessions(<gap_size in seconds>)

#### Expected Output
```
('🌽', 4)
('🥕', 5)
('🐰', 1)
```

In [20]:
with beam.Pipeline() as p:
  (p | beam.Create([
          FoodSightingEvent('1', '🌽', dtime(hour=1, min=1, sec=5)),
          FoodSightingEvent('2', '🌽', dtime(hour=1, min=1, sec=15)),
          FoodSightingEvent('3', '🌽', dtime(hour=1, min=1, sec=16)),
          FoodSightingEvent('4', '🌽', dtime(hour=1, min=1, sec=59)),
      
          FoodSightingEvent('5', '🥕', dtime(hour=1, min=31, sec=18)),
          FoodSightingEvent('6', '🥕', dtime(hour=1, min=31, sec=36)),
      
          FoodSightingEvent('7', '🥕', dtime(hour=1, min=34, sec=1)),
          FoodSightingEvent('8', '🥕', dtime(hour=1, min=34, sec=18)),
          FoodSightingEvent('9', '🥕', dtime(hour=1, min=34, sec=51)),
      
          FoodSightingEvent('10', '🐰', dtime(hour=2, min=34, sec=51)),
       ])
  
     | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
     # | # YOUR CODE HERE 
     # | # YOUR CODE HERE 
     # | # YOUR CODE HERE 
     | beam.Map(print))

('🌽', 4)
('🥕', 5)
('🐰', 1)


### Exercise 6 - Count events using global windows
In this example, lets try counting events that occur in a global window. 

#### Hints

You need to use a Glboal Window: beam.window.GlobalWindows()

#### Expected Output
```
('🌽', 1)
('🥕', 7)
('🐰', 2)
```

In [21]:
with beam.Pipeline() as p:
  (p | beam.Create([
          FoodSightingEvent('1', '🌽', dtime(hour=1, min=1, sec=5)),
          FoodSightingEvent('2', '🥕', dtime(hour=1, min=1, sec=15)),
          FoodSightingEvent('3', '🥕', dtime(hour=1, min=1, sec=16)),
          FoodSightingEvent('4', '🐰', dtime(hour=1, min=1, sec=59)),
      
          FoodSightingEvent('5', '🥕', dtime(hour=1, min=31, sec=18)),
          FoodSightingEvent('6', '🥕', dtime(hour=1, min=31, sec=36)),
      
          FoodSightingEvent('7', '🥕', dtime(hour=1, min=34, sec=1)),
          FoodSightingEvent('8', '🥕', dtime(hour=1, min=34, sec=18)),
          FoodSightingEvent('9', '🥕', dtime(hour=1, min=34, sec=51)),
      
          FoodSightingEvent('10', '🐰', dtime(hour=2, min=34, sec=51)),
       ])
   
   | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
   | # YOUR CODE HERE 
   | # YOUR CODE HERE 
   | # YOUR CODE HERE 
   | beam.Map(print))

('🌽', 1)
('🥕', 7)
('🐰', 2)


### References
Apache Beam Katas

https://beam.apache.org/documentation/programming-guide

In [None]:
with beam.Pipeline() as p:
  (p | beam.Create([
          FoodSightingEvent('1', '🌽', dtime(hour=1, min=1, sec=5)),
          FoodSightingEvent('2', '🥕', dtime(hour=1, min=1, sec=15)),
          FoodSightingEvent('3', '🥕', dtime(hour=1, min=1, sec=16)),
          FoodSightingEvent('4', '🐰', dtime(hour=1, min=1, sec=59)),
      
          FoodSightingEvent('5', '🥕', dtime(hour=1, min=31, sec=18)),
          FoodSightingEvent('6', '🥕', dtime(hour=1, min=31, sec=36)),
      
          FoodSightingEvent('7', '🥕', dtime(hour=1, min=34, sec=1)),
          FoodSightingEvent('8', '🥕', dtime(hour=1, min=34, sec=18)),
          FoodSightingEvent('9', '🥕', dtime(hour=1, min=34, sec=51)),
      
          FoodSightingEvent('10', '🐰', dtime(hour=2, min=34, sec=51)),
       ])
   
   | 'Add timestamps' >> beam.ParDo(AddTimestampFn())
   | # YOUR CODE HERE 
   | # YOUR CODE HERE 
   | # YOUR CODE HERE 
   | beam.Map(print))

### References
Apache Beam Katas

https://beam.apache.org/documentation/programming-guide