In [None]:
!pip install --upgrade google-cloud-pubsub
!pip install --upgrade 'apache-beam[gcp]'
!pip install --upgrade google-apitools


In [None]:
PROJECT_ID = %env GOOGLE_CLOUD_PROJECT

In [None]:
import apache_beam as beam
import sys
import time

from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.trigger import AccumulationMode, AfterCount

sys.argv = ["11_tumbling_window.ipynb"]
encoding = "utf-8"
input_subscription = "projects/{}/subscriptions/ratings-sub".format(PROJECT_ID)

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

p = beam.Pipeline(options=options)

rating_count = "projects/{}/topics/ratings_count_window".format(PROJECT_ID)


def format_output(r):
    (movie_id, rating) = r
    return "{r} ratings for movieId {id}".format(r=rating, id=movie_id)


pubsub_pipeline = (
    p
    | "Read from PubSub topic" >> beam.io.ReadFromPubSub(subscription=input_subscription)
    | "Split the records by comma" >> beam.Map(lambda row: row.decode(encoding).split(","))
    | "With Custom timestamp" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))
    | "Form KV pair" >> beam.Map(lambda r: (r[1], float(r[2])))
    # | "Window with Triggers" >> beam.WindowInto(beam.window.FixedWindows(20), trigger=AfterProcessingTime(5), accumulation_mode=AccumulationMode.DISCARDING)
    # | "Window with Triggers" >> beam.WindowInto(beam.window.FixedWindows(20), trigger=AfterCount(5), accumulation_mode=AccumulationMode.DISCARDING)
    | "Window with Triggers" >> beam.WindowInto(beam.window.FixedWindows(20), trigger=AfterCount(5), accumulation_mode=AccumulationMode.DISCARDING)
    | "Count the ratings" >> beam.transforms.combiners.Count.PerKey()
    | "Format output" >> beam.Map(format_output)
    | "Write to PubSub" >> beam.io.WriteStringsToPubSub(rating_count)
)

result = p.run()
result.wait_until_finish()
