# Apache Beam Data Engineering Exercise (Colab-ready)

Demonstrates core Beam transforms: Map, Filter, ParDo, composite transforms, Partition, windowing, and simple pipeline I/O. Optional: Beam ML RunInference stub.

In [1]:
# Ensure dependencies are available (Beam + sklearn)
%pip install -q -r requirements.txt


Note: you may need to restart the kernel to use updated packages.


In [2]:
# Install Beam (commented by default; uncomment in a fresh Colab runtime)
# !pip install apache-beam[interactive]

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile, os, json
from datetime import datetime
import pandas as pd

print("Beam version:", beam.__version__)

Beam version: 2.56.0


## Sample data
We'll work with small, in-memory records to keep things fast.

In [3]:
records = [
    {"user": "alice", "score": 5, "ts": 0},
    {"user": "bob", "score": 7, "ts": 10},
    {"user": "carol", "score": 2, "ts": 20},
    {"user": "dave", "score": 9, "ts": 80},
    {"user": "eve", "score": 4, "ts": 95},
]
records

[{'user': 'alice', 'score': 5, 'ts': 0},
 {'user': 'bob', 'score': 7, 'ts': 10},
 {'user': 'carol', 'score': 2, 'ts': 20},
 {'user': 'dave', 'score': 9, 'ts': 80},
 {'user': 'eve', 'score': 4, 'ts': 95}]

## Composite transform
Combines map + filter into a reusable `PTransform`.

In [4]:
class CleanAndFilterScores(beam.PTransform):
    def __init__(self, min_score: int = 5):
        super().__init__()
        self.min_score = min_score

    def expand(self, pcoll):
        return (
            pcoll
            | "NormalizeUser" >> beam.Map(lambda r: {**r, "user": r["user"].strip().lower()})
            | "FilterByScore" >> beam.Filter(lambda r: r["score"] >= self.min_score)
        )

## Main pipeline: Map, Filter, ParDo, Partition, I/O
- Apply composite transform
- ParDo to tag pass/fail
- Partition even/odd scores
- Write to text
- Collect results for inspection

In [5]:
tmp_dir = tempfile.mkdtemp()
output_path = os.path.join(tmp_dir, "beam_output")
print("Temp output dir:", tmp_dir)

class TagPassFail(beam.DoFn):
    def process(self, element):
        tag = "pass" if element["score"] >= 6 else "fail"
        element["tag"] = tag
        yield element

def split_even_odd(element, num_partitions):
    return 0 if element["score"] % 2 == 0 else 1

with beam.Pipeline(options=PipelineOptions(flags=[])) as p:
    pcoll = p | "CreateRecords" >> beam.Create(records)

    cleaned = pcoll | "CleanAndFilter" >> CleanAndFilterScores(min_score=3)
    tagged = cleaned | "TagPassFail" >> beam.ParDo(TagPassFail())

    partitions = tagged | "PartitionEvenOdd" >> beam.Partition(split_even_odd, 2)
    even_scores = partitions[0]
    odd_scores = partitions[1]

    # Simple write to text
    _ = tagged | "WriteAll" >> beam.io.WriteToText(output_path)

    # Gather side results for display
    tagged_res = tagged | "TaggedToList" >> beam.combiners.ToList().without_defaults()
    even_res = even_scores | "EvenToList" >> beam.combiners.ToList().without_defaults()
    odd_res = odd_scores | "OddToList" >> beam.combiners.ToList().without_defaults()

    results = {
        "tagged": tagged_res,
        "even": even_res,
        "odd": odd_res,
    }

from pprint import pprint
pprint(results)

print("Sample output files:")
for f in os.listdir(tmp_dir):
    print(" -", f)



Temp output dir: /var/folders/86/nr8r5c99659ggl44tw4xs62h0000gn/T/tmphwvw4tg0


{'even': <PCollection[[6]: EvenToList/[6]: EvenToList/UnKey.None] at 0x12c78da90>,
 'odd': <PCollection[[6]: OddToList/[6]: OddToList/UnKey.None] at 0x12c6822d0>,
 'tagged': <PCollection[[6]: TaggedToList/[6]: TaggedToList/UnKey.None] at 0x12c3eea50>}
Sample output files:
 - beam_output-00000-of-00001


## Windowing example (FixedWindows + aggregation)
We assign timestamps to elements, window into 60s buckets, and count records per window.

In [6]:
from apache_beam import window

timestamped = [beam.window.TimestampedValue(r, r["ts"]) for r in records]

with beam.Pipeline(options=PipelineOptions(flags=[])) as p:
    counts = (
        p
        | "CreateTimestamped" >> beam.Create(timestamped)
        | "WindowInto60s" >> beam.WindowInto(window.FixedWindows(60))
        | "MapToOnes" >> beam.Map(lambda r: ("window", 1))
        | "CountPerWindow" >> beam.CombinePerKey(sum)
    )

    windowed = counts | "WindowedToList" >> beam.combiners.ToList().without_defaults()

print("Windowed counts (per 60s window):", windowed)

Windowed counts (per 60s window): PCollection[[7]: WindowedToList/[7]: WindowedToList/UnKey.None]


## Inspect written output
Read a sample of the written text files.

In [7]:
from glob import glob
txt_files = sorted(glob(output_path + "*") )
print("Text outputs:", txt_files)

if txt_files:
    with open(txt_files[0], "r") as f:
        print("\nSample file contents:\n", f.read())

Text outputs: ['/var/folders/86/nr8r5c99659ggl44tw4xs62h0000gn/T/tmphwvw4tg0/beam_output-00000-of-00001']

Sample file contents:
 {'user': 'alice', 'score': 5, 'ts': 0, 'tag': 'fail'}
{'user': 'bob', 'score': 7, 'ts': 10, 'tag': 'pass'}
{'user': 'dave', 'score': 9, 'ts': 80, 'tag': 'pass'}
{'user': 'eve', 'score': 4, 'ts': 95, 'tag': 'fail'}



## Beam ML RunInference (sklearn)
Train a tiny LogisticRegression model, save with joblib, and run Beam RunInference on two example points.

In [8]:
from sklearn.linear_model import LogisticRegression
import numpy as np
import joblib

model = LogisticRegression().fit([[0, 0], [1, 1]], [0, 1])
# Save as joblib and instruct handler accordingly
with tempfile.NamedTemporaryFile(delete=False, suffix=".joblib") as f:
    joblib.dump(model, f.name)
    model_path = f.name

from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy, ModelFileType
from apache_beam.ml.inference.base import ModelHandler, RunInference

handler: ModelHandler = SklearnModelHandlerNumpy(
    model_uri=model_path,
    model_file_type=ModelFileType.JOBLIB,
)
examples = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
with beam.Pipeline(options=PipelineOptions(flags=[])) as p:
    _ = (
        p
        | beam.Create(examples)
        | RunInference(handler)
        | beam.Map(lambda res: float(np.atleast_1d(res.inference)[0]))
        | beam.Map(lambda pred: print(f"Prediction: {pred}") or pred)
    )


Prediction: 0.0
Prediction: 1.0
Prediction: 1.0


## Notes for your screencast
- Show Map/Filter/ParDo/Partition outputs.
- Explain how the composite transform bundles steps.
- Highlight windowed counts and where files are written.
- (Optional) RunInference demo if you enable the cell above.