<a href="https://colab.research.google.com/github/samipn/apache-beam-demo/blob/main/apache_beam_colab_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Apache Beam in Colab — end‑to‑end demo
**What you'll learn**

- Pipeline IO: `ReadFromText`, `WriteToText`

- Elementwise transforms: `Map`, `Filter`, `ParDo`

- Partitioning with `beam.Partition`

- Windowing with `FixedWindows` using `TestStream` (simulated streaming)

- Composite transforms with custom `PTransform`

- *(Bonus)* Beam ML `RunInference` with scikit‑learn



**How to use**

Run the cells from top to bottom. You can re-run individual sections safely.


### Useful links
- Getting started (interactive): https://beam.apache.org/get-started/an-interactive-overview-of-beam/

- Beam Playground: https://beam.apache.org/get-started/try-beam-playground

- Primary Beam Colab (official): https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/interactive-overview/getting-started.ipynb

- Python transforms (elementwise): https://beam.apache.org/documentation/transforms/python/elementwise/overview/

- Windowing concepts: https://beam.apache.org/documentation/programming-guide/#windowing

- RunInference (sklearn): https://beam.apache.org/documentation/transforms/python/elementwise/runinference-sklearn/

- Beam ML: https://beam.apache.org/documentation/ml/overview/



In [1]:
# Install deps (Colab-friendly)
# If you run into version issues, remove version pins or restart runtime.
!pip -q install apache-beam[gcp] scikit-learn

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.8/88.8 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m29.0 MB/s[0m eta [3

In [2]:
# Imports & sanity checks
import os, re, json, random, string, textwrap, time
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window as beam_window

print("Beam version:", beam.__version__)

Beam version: 2.68.0


In [3]:
# Create a tiny input dataset for IO demos
os.makedirs('/content/data', exist_ok=True)
with open('/content/data/input.txt', 'w') as f:
    f.write("""To be, or not to be: that is the Question.
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them. To die—to sleep,
No more; and by a sleep to say we end
The heart-ache and the thousand natural shocks
That flesh is heir to: 'tis a consummation
Devoutly to be wish'd. To die, to sleep—
To sleep—perchance to dream: ay, there's the rub.
""")

!head -n 10 /content/data/input.txt

To be, or not to be: that is the Question.
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them. To die—to sleep,
No more; and by a sleep to say we end
The heart-ache and the thousand natural shocks
That flesh is heir to: 'tis a consummation
Devoutly to be wish'd. To die, to sleep—
To sleep—perchance to dream: ay, there's the rub.


## 1) Batch pipeline: IO + Map/Filter + Combine

In [4]:
# Basic batch pipeline with IO, Map, Filter
# - Reads from text
# - Cleans and tokenizes
# - Filters short tokens
# - Counts words
# - Writes to text

def tokenize(line):
    # lowercase + keep letters only, split on non-letters
    return [w for w in re.split(r'[^A-Za-z]+', line.lower()) if w]

def is_long_enough(word, min_len=3):
    return len(word) >= min_len

output_dir = '/content/output/basic_wordcount'
!rm -rf "$output_dir"
os.makedirs(output_dir, exist_ok=True)

with beam.Pipeline(options=PipelineOptions()) as p:
    counts = (
        p
        | "ReadLines" >> beam.io.ReadFromText('/content/data/input.txt')
        | "Tokenize" >> beam.FlatMap(tokenize)       # Map that returns iterables
        | "FilterShort" >> beam.Filter(is_long_enough)
        | "PairWithOne" >> beam.Map(lambda w: (w, 1)) # Map
        | "Count" >> beam.CombinePerKey(sum)
        | "Format" >> beam.Map(lambda kv: f"{kv[0]}	{kv[1]}")
    )
    counts | "WriteToText" >> beam.io.WriteToText(os.path.join(output_dir, 'part'))

print('Wrote:', output_dir)
!ls -l "$output_dir"
!echo 'Top 10 lines:'
!head -n 10 "$output_dir"/part-00000-of-00001





Wrote: /content/output/basic_wordcount
total 4
-rw-r--r-- 1 root root 337 Oct 27 00:38 part-00000-of-00001
Top 10 lines:
not	1
that	2
the	6
question	1
whether	1
tis	2
nobler	1
mind	1
suffer	1
slings	1


## 2) ParDo (DoFn) + Partition

In [5]:
# ParDo (DoFn) and Partition demo

class TagLengthDoFn(beam.DoFn):
    def process(self, element):
        # element is a word; emit a dict with length metadata
        result = {
            'word': element,
            'length': len(element),
            'is_long': len(element) >= 6
        }
        yield result

# We will replace the by_length function and beam.Partition with Filters
# def by_length(num_partitions, element):
#     # 0: short (<5), 1: medium (5-7), 2: long (>=8)
#     l = len(element['word'])
#     if l < 5:
#         return 0
#     elif l < 8:
#         return 1
#     else:
#         return 2

with beam.Pipeline(options=PipelineOptions()) as p:
    words = (
        p
        | beam.io.ReadFromText('/content/data/input.txt')
        | beam.FlatMap(tokenize)
        | beam.Filter(is_long_enough)
    )

    tagged = (words | "TagWithParDo" >> beam.ParDo(TagLengthDoFn()))
    # Convert dict to tuple before Distinct
    distinct_tagged_tuple = tagged | "ToTuple" >> beam.Map(lambda d: tuple(d.items()))
    distinct_tagged = distinct_tagged_tuple | "Distinct" >> beam.Distinct()
    # Convert tuple back to dict after Distinct
    distinct_tagged_dict = distinct_tagged | "ToDict" >> beam.Map(dict)

    # Replace Partition with Filters
    short = distinct_tagged_dict | "FilterShort" >> beam.Filter(lambda d: d['length'] < 5)
    medium = distinct_tagged_dict | "FilterMedium" >> beam.Filter(lambda d: 5 <= d['length'] < 8)
    long = distinct_tagged_dict | "FilterLong" >> beam.Filter(lambda d: d['length'] >= 8)


    short_fmt  = short  | beam.Map(lambda d: f"SHORT    {d['word']}     {d['length']}")
    medium_fmt = medium | beam.Map(lambda d: f"MED      {d['word']}     {d['length']}")
    long_fmt   = long   | beam.Map(lambda d: f"LONG     {d['word']}     {d['length']}")

    all_parts = ((short_fmt, medium_fmt, long_fmt) | beam.Flatten())
    all_parts | beam.io.WriteToText('/content/output/partitions/part')

!echo 'Sample partitioned results:'
!head -n 15 /content/output/partitions/part-00000-of-00001



Sample partitioned results:
SHORT    not     3
SHORT    that     4
SHORT    the     3
SHORT    tis     3
SHORT    mind     4
SHORT    and     3
SHORT    take     4
SHORT    arms     4
SHORT    sea     3
SHORT    end     3
SHORT    them     4
SHORT    die     3
SHORT    more     4
SHORT    say     3
SHORT    ache     4


## 3) Composite transforms (custom PTransform)

In [6]:
# Composite transform (custom PTransform)
class CleanTokenize(beam.PTransform):
    def __init__(self, min_len=3, stopwords=None):
        super().__init__()
        self.min_len = min_len
        self.stopwords = set(stopwords or [])

    def expand(self, pcoll):
        return (
            pcoll
            | "CT:Tokenize" >> beam.FlatMap(tokenize)
            | "CT:FilterLen" >> beam.Filter(lambda w: len(w) >= self.min_len)
            | "CT:FilterStop" >> beam.Filter(lambda w: w not in self.stopwords)
        )

class CountWords(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "CW:PairOne" >> beam.Map(lambda w: (w, 1))
            | "CW:Count" >> beam.CombinePerKey(sum)
        )

STOPWORDS = {'the', 'and', 'to', 'of', 'a', 'in', 'is', 'be'}

with beam.Pipeline(options=PipelineOptions()) as p:
    results = (
        p
        | beam.io.ReadFromText('/content/data/input.txt')
        | CleanTokenize(min_len=3, stopwords=STOPWORDS)
        | CountWords()
        | beam.Map(lambda kv: f"{kv[0]}	{kv[1]}")
        | beam.io.WriteToText('/content/output/composite/part')
    )

!echo 'Composite transform output (first 12 lines):'
!head -n 12 /content/output/composite/part-00000-of-00001



Composite transform output (first 12 lines):
not	1
that	2
question	1
whether	1
tis	2
nobler	1
mind	1
suffer	1
slings	1
arrows	1
outrageous	1
fortune	1


## 4) Windowing with `FixedWindows` via `TestStream`

In [7]:
# Windowing with TestStream (simulated streaming)
# We'll generate (user, amount) events across timestamps and sum per 10-second windows.

import apache_beam as beam # Added import
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue # Corrected import
from apache_beam.transforms import window as beam_window # Added import

def format_window(kv, w=beam.DoFn.WindowParam):
    (key, amount) = kv
    return f"window[{int(w.start)}s-{int(w.end)}s)      {key}   {amount}"

# Simulate a tiny stream
events = [
    TimestampedValue(('u1', 5),  1),
    TimestampedValue(('u2', 7),  2),
    TimestampedValue(('u1', 3),  8),
    TimestampedValue(('u2', 4), 12),  # falls in next window
    TimestampedValue(('u3', 6), 15),
    TimestampedValue(('u1', 1), 18),
]

ts = (TestStream()
      .advance_watermark_to(0)
      .add_elements(events[:3])
      .advance_watermark_to(10)
      .add_elements(events[3:])
      .advance_watermark_to_infinity())

out_dir = '/content/output/windowing'
!rm -rf "$out_dir"
os.makedirs(out_dir, exist_ok=True)

with beam.Pipeline(options=PipelineOptions()) as p:
    wins = (
        p
        | "TestStream" >> ts
        | "KV" >> beam.Map(lambda kv: (kv[0], kv[1]))
        | "Window10s" >> beam.WindowInto(beam_window.FixedWindows(10))
        | "SumPerKey" >> beam.CombinePerKey(sum)
        | "Fmt" >> beam.Map(format_window)
    )
    wins | beam.io.WriteToText(os.path.join(out_dir, 'part'))

!echo 'Windowed results:'
!ls "$out_dir"
!cat "$out_dir"/part-*



Windowed results:
'part-[0.0, 10.0)-00000-of-00001'  'part-[10.0, 20.0)-00000-of-00001'
window[0s-10s)      u1   8
window[0s-10s)      u2   7
window[10s-20s)      u1   1
window[10s-20s)      u2   4
window[10s-20s)      u3   6


## 5) Bonus: Beam ML RunInference (scikit‑learn)

In [8]:
# (Bonus) Beam ML RunInference with scikit-learn
# Train a tiny classifier on Iris and run inference inside a Beam pipeline.
# If this section errors due to version mismatches, try restarting runtime and reinstalling Beam.

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import pickle # Import pickle instead of joblib
import os # Import os for path joining

from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)

clf = LogisticRegression(max_iter=400).fit(X_train, y_train)

# Save the trained model to a file using pickle
model_path = '/content/iris_model.pkl' # Change file extension to pkl
with open(model_path, 'wb') as f:
    pickle.dump(clf, f)

# Pass the model_uri to the handler
handler = SklearnModelHandlerNumpy(model_uri=model_path)

out_dir = '/content/output/iris_inference'
!rm -rf "$out_dir"
os.makedirs(out_dir, exist_ok=True)

def fmt_pred(pred):
    # pred.inference is typically a numpy scalar or array; make it int
    try:
        label = int(pred.inference)
    except Exception:
        # fall back if it's an array
        label = int(np.array(pred.inference).item())
    return f"pred_label={label}"

with beam.Pipeline(options=PipelineOptions()) as p:
    _ = (
        p
        | beam.Create(X_test.tolist())
        | "Infer" >> RunInference(handler)
        | "Fmt" >> beam.Map(fmt_pred)
        | beam.io.WriteToText(out_dir + "/part")
    )

!echo 'First 10 predictions:'
!head -n 10 "$out_dir"/part-00000-of-00001



First 10 predictions:
pred_label=1
pred_label=0
pred_label=2
pred_label=1
pred_label=1
pred_label=0
pred_label=1
pred_label=2
pred_label=1
pred_label=1
