In [1]:
## Implementing windows

In [2]:
import re
import datetime

import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount, Repeatedly
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

In [3]:
input_subscription = "projects/engineering-demo-0/subscriptions/input-sub"
output_topic = "projects/engineering-demo-0/topics/output"

In [4]:
def encode_byte_string(element: tuple):
    return ','.join([str(e) for e in element]).encode("utf-8")

def assign_timestamp(element):
    return beam.window.TimestampedValue(element, int(datetime.datetime.now().timestamp() / 1000))
    # return beam.window.TimestampedValue(element, int(datetime.datetime.utcnow().timestamp() / 1000))
    # return beam.window.TimestampedValue(element, int(element[7]))

def calculate_profit(element):
    buy_rate = element[5]
    sell_price = element[6]
    products_count = int(element[4])
    profit = (int(sell_price) - int(buy_rate)) * products_count
    element.append(str(profit))
    return element

class BuildRecordFn(beam.DoFn):
    def process(self, element, win=beam.DoFn.WindowParam):
        window_start = win.start.to_utc_datetime()
        window_end = win.end.to_utc_datetime()
        return [element + (window_start, window_end)]

In [5]:
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

## Global window requires to apply timestamp
## throws the following error - 
##             ERROR:apache_beam.runners.common:date value out of range [while running '[5]: Add window ts']

pubsub_data = (
    p
    | "Read from pub sub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
    | "Remove extra characters" >> beam.Map(lambda e: re.sub(r"/r/n", "", e.decode("utf-8")))
    | "Split element" >> beam.Map(lambda e: e.split(","))
    | "Fitler by city" >> beam.Filter(lambda elements : (elements[1] == "Mumbai" or elements[1] == "Bangalore"))
    | "Create profit" >> beam.Map(calculate_profit)
    # | 'Apply custom timestamp' >> beam.Map(assign_timestamp)
    | "Form Key Value pair" >> beam.Map(lambda elements : (elements[1], int(elements[8])))
    # | "Tumbling window" >> beam.WindowInto(window.FixedWindows(5))
    | "Tumbling window with triggers" >> beam.WindowInto(window.FixedWindows(10), trigger=Repeatedly(AfterProcessingTime(3)), accumulation_mode=AccumulationMode.ACCUMULATING, allowed_lateness=2)
    # | "Sliding window" >> beam.WindowInto(window.SlidingWindows(6, 3))
    # | "Session window" >> beam.WindowInto(window.Sessions(5))
    # | "Global window" >> beam.WindowInto(window.GlobalWindows(), trigger=Repeatedly(AfterCount(5)), accumulation_mode=AccumulationMode.DISCARDING)
    | "Sum by key" >> beam.CombinePerKey(sum)
    | "Add window ts" >> beam.ParDo(BuildRecordFn())
)

ib.options.recording_duration = "120s"
ib.show(pubsub_data)



<IPython.core.display.Javascript object>