This colab, we are implementing an example of dealing with a streaming data source that sends events with timestamps. These events represent user purchases on an e-commerce platform, and we want to calculate the sum of purchases within every 1-hour window, but only trigger the result after 2 purchases are seen in that window.

In [1]:
# Install Apache Beam
!pip install apache-beam



In [2]:
# Import necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window

# Define a simple DoFn that collects the contents of the PCollection
class CollectFn(beam.DoFn):
    def __init__(self, output_list):
        self.output_list = output_list

    def process(self, element):
        self.output_list.append(element)
        yield element

# Define a composite transform
class CalculateTotalPurchase(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'PairWithOne' >> beam.Map(lambda x: (x[0], x[1]))
            | 'GroupByTimestamp' >> beam.GroupByKey()
            | 'SumValues' >> beam.Map(lambda x: (x[0], sum(x[1])))
        )

For simplicity, we'll use a list of tuples where each tuple contains the timestamp (in hours) and the purchase amount:

In [3]:
# Define the data
data = [
    (1, 50),
    (1.5, 40),
    (2, 60),
    (3, 20),
    (3.5, 100),
    (4, 30)
]


In [4]:
# Create the pipeline options
pipeline_options = PipelineOptions()
pipeline_options._all_options['allow_unsafe_triggers'] = True

# Create the pipeline
p = beam.Pipeline(options=pipeline_options)

# Build and run the pipeline
output_list = []
(
    p
    | 'ReadFromMemory' >> beam.Create(data)
    | 'AssignTimestamps' >> beam.Map(lambda x: window.TimestampedValue(x, x[0]*3600))
    | 'Windowing' >> beam.WindowInto(window.FixedWindows(3600), trigger=beam.trigger.AfterCount(2), accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
    | 'CalculateTotal' >> CalculateTotalPurchase()
    | 'CollectResults' >> beam.ParDo(CollectFn(output_list))
)

# Run the pipeline
p.run().wait_until_finish()

# Display the results
output_list





[]