
# Apache Beam on Vertex AI Workbench — Clean Demo with `./artifacts`

This notebook demonstrates the required Beam features on **Vertex AI Workbench** using the local **DirectRunner**, with **all inputs/outputs kept under a single `./artifacts` folder** for tidy execution and easy cleanup.

**Features covered**
- Pipeline I/O (`ReadFromText`, `WriteToText`)
- `Map`, `Filter`
- `ParDo` (custom `DoFn` with tagged side outputs)
- `Partition`
- Windowing (`FixedWindows`) + `CombinePerKey` and `WindowParam`
- Composite transform (`PTransform`)

**Run order**: top → bottom. Each section writes its results to `./artifacts`.



## 0) Environment setup & conventions

- We install a compatible set known to work with Beam 2.69: `pyarrow==18.1.0`, `pandas<3`.
- We **ignore Jupyter argv** so Beam doesn't warn about `-f kernel-....json`.
- All file reads/writes use **`ARTIFACTS = Path.cwd() / "artifacts"`**.
- Optional: install interactive extras if you want Beam's in-notebook visualizations.


In [1]:

# --- Environment pins (Vertex-friendly) ---
!pip install -q "apache-beam[gcp,interactive]==2.69.0" "pandas<3" "pyarrow==18.1.0"

import logging, sys, platform
import apache_beam as beam
import pyarrow as pa
from pathlib import Path

# Make a clean artifacts dir
ARTIFACTS = Path.cwd() / "artifacts"
ARTIFACTS.mkdir(exist_ok=True)

# Silence noisy arg parsing warnings from Jupyter:
# Provide an empty argv list to PipelineOptions later.
print("Beam:", beam.__version__)
print("PyArrow:", pa.__version__)
print("Python:", sys.version.split()[0], "|", platform.platform())
print("Artifacts directory:", ARTIFACTS)


Beam: 2.69.0
PyArrow: 18.1.0
Python: 3.10.19 | Linux-5.10.0-36-cloud-amd64-x86_64-with-glibc2.31
Artifacts directory: /home/jupyter/ApacheBeam/artifacts



### Helper utilities

Small helpers to list files and show the contents of single-shard outputs.


In [2]:

from glob import glob

def list_artifacts(pattern="*"):
    base = str(ARTIFACTS / pattern)
    files = sorted(glob(base))
    print(f"[artifacts/{pattern}] -> {len(files)} files")
    for f in files:
        print(" -", Path(f).name)

def first_match_text(prefix):
    matches = sorted(glob(str(ARTIFACTS / f"{prefix}-*-of-*.csv"))) or               sorted(glob(str(ARTIFACTS / f"{prefix}-*-of-*.txt")))
    if not matches:
        print(f"[no files for prefix={prefix}]")
        return
    p = Path(matches[0])
    print(f"== {p.name} ==")
    print(p.read_text())



## 1) Pipeline I/O + `Map` + `Filter` (clean CSV)

We create a tiny input file in `./artifacts`, normalize rows with `Map`, drop blanks with `Filter`, and write a **single-shard** CSV to `./artifacts/cleaned-00000-of-00001.csv`.


In [3]:

from apache_beam.options.pipeline_options import PipelineOptions

# Create input file
(ARTIFACTS / "input.txt").write_text("\n".join([
    "Alpha,10",
    "Beta,7",
    "",
    "Gamma,  13  ",
    "Delta, 5"
]))

def parse_row(line: str):
    parts = [p.strip() for p in line.split(",")]
    if len(parts) != 2:
        return None
    name, num = parts[0], parts[1]
    try:
        return name, int(num)
    except ValueError:
        return None

# Tell Beam to ignore Jupyter argv
pipeline_args = []
opts = PipelineOptions(pipeline_args)

with beam.Pipeline(options=opts) as p:
    rows = (
        p
        | "ReadFromText" >> beam.io.ReadFromText(str(ARTIFACTS / "input.txt"))
        | "DropBlanks" >> beam.Filter(lambda line: line.strip() != "")
        | "Parse" >> beam.Map(parse_row)
        | "DropNone" >> beam.Filter(lambda kv: kv is not None)
    )
    _ = (
        rows
        | "ToCSV" >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}")
        | "WriteCleaned" >> beam.io.WriteToText(
            str(ARTIFACTS / "cleaned"), file_name_suffix=".csv", num_shards=1
        )
    )

list_artifacts("cleaned*")
first_match_text("cleaned")




[artifacts/cleaned*] -> 1 files
 - cleaned-00000-of-00001.csv
== cleaned-00000-of-00001.csv ==
Alpha,10
Beta,7
Gamma,13
Delta,5




## 2) `ParDo` with tagged side outputs (big vs. small)

We classify values ≥10 as **big** and others as **small** using a custom `DoFn` that emits **tagged outputs**. Results are written to `./artifacts/big-*.csv` and `./artifacts/small-*.csv`.


In [4]:

from apache_beam import DoFn, pvalue

class SizeClassifier(DoFn):
    def process(self, element):
        name, val = element
        if val >= 10:
            yield pvalue.TaggedOutput("big", (name, val))
        else:
            yield pvalue.TaggedOutput("small", (name, val))

with beam.Pipeline(options=opts) as p:
    rows = (
        p
        | "ReadCleaned" >> beam.io.ReadFromText(str(ARTIFACTS / "cleaned-*-of-*.csv"))
        | "ParseCleaned" >> beam.Map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
    )
    outputs = rows | "ClassifyBySize" >> beam.ParDo(SizeClassifier()).with_outputs("big", "small")
    big, small = outputs.big, outputs.small

    _ = big   | "WriteBig"   >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}") >> beam.io.WriteToText(str(ARTIFACTS / "big"),   file_name_suffix=".csv", num_shards=1)
    _ = small | "WriteSmall" >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}") >> beam.io.WriteToText(str(ARTIFACTS / "small"), file_name_suffix=".csv", num_shards=1)

list_artifacts("big*")
first_match_text("big")
list_artifacts("small*")
first_match_text("small")


[artifacts/big*] -> 1 files
 - big-00000-of-00001.csv
== big-00000-of-00001.csv ==
('Alpha', 10)
('Gamma', 13)

[artifacts/small*] -> 1 files
 - small-00000-of-00001.csv
== small-00000-of-00001.csv ==
('Beta', 7)
('Delta', 5)




## 3) `Partition` (split into 3 branches)

We split the stream into **3 partitions** by `value % 3` and write outputs as CSV in `./artifacts/mod0|1|2-*.csv`.


In [5]:

def by_mod3(elem, n_partitions):
    # elem: (name, value) -> route by value % 3
    return elem[1] % 3

with beam.Pipeline(options=opts) as p:
    rows = (
        p
        | "ReadCleanedCSV" >> beam.io.ReadFromText(str(ARTIFACTS / "cleaned-*-of-*.csv"))
        | "Parse2" >> beam.Map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
    )

    p0, p1, p2 = rows | "PartitionByMod3" >> beam.Partition(by_mod3, 3)

    _ = p0 | "W0" >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}") >> beam.io.WriteToText(str(ARTIFACTS / "mod0"), file_name_suffix=".csv", num_shards=1)
    _ = p1 | "W1" >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}") >> beam.io.WriteToText(str(ARTIFACTS / "mod1"), file_name_suffix=".csv", num_shards=1)
    _ = p2 | "W2" >> beam.Map(lambda kv: f"{kv[0]},{kv[1]}") >> beam.io.WriteToText(str(ARTIFACTS / "mod2"), file_name_suffix=".csv", num_shards=1)

for prefix in ("mod0", "mod1", "mod2"):
    list_artifacts(f"{prefix}*")
    first_match_text(prefix)


[artifacts/mod0*] -> 1 files
 - mod0-00000-of-00001.csv
== mod0-00000-of-00001.csv ==

[artifacts/mod1*] -> 1 files
 - mod1-00000-of-00001.csv
== mod1-00000-of-00001.csv ==
('Alpha', 10)
('Beta', 7)
('Gamma', 13)

[artifacts/mod2*] -> 1 files
 - mod2-00000-of-00001.csv
== mod2-00000-of-00001.csv ==
('Delta', 5)




## 4) Windowing: `FixedWindows(10s)` + `CombinePerKey`

We simulate timestamped events, apply **10-second fixed windows**, sum values per user per window, and write results to `./artifacts/windowed-*.txt`. We also format the output to display window bounds.


In [6]:

import time
from apache_beam.transforms.window import FixedWindows, TimestampedValue

now = int(time.time())
events = [
    ("u1", 3, 0),
    ("u1", 4, 5),
    ("u2", 6, 12),
    ("u1", 7, 13),
    ("u2", 1, 19),
]

def to_timestamped(e):
    user, val, offset = e
    return TimestampedValue((user, val), now + offset)

class FormatWithWindow(DoFn):
    def process(self, element, window=DoFn.WindowParam):
        user, total = element
        yield f"user={user}, total={total}, window=[{int(window.start)}..{int(window.end)})"

with beam.Pipeline(options=opts) as p:
    formatted = (
        p
        | "CreateEvents" >> beam.Create(events)
        | "AttachTimestamps" >> beam.Map(to_timestamped)
        | "WindowInto10s" >> beam.WindowInto(FixedWindows(10))
        | "KeyByUser" >> beam.Map(lambda uv: (uv[0], uv[1]))  # (user, value)
        | "SumPerUserPerWindow" >> beam.CombinePerKey(sum)
        | "FormatRows" >> beam.ParDo(FormatWithWindow())
    )
    _ = formatted | "WriteWindowed" >> beam.io.WriteToText(str(ARTIFACTS / "windowed"), file_name_suffix=".txt", num_shards=1)

list_artifacts("windowed*")
first_match_text("windowed")


[artifacts/windowed*] -> 1 files
 - windowed-00000-of-00001.txt
== windowed-00000-of-00001.txt ==
user=u1, total=7, window=[1761702950..1761702960)
user=u1, total=7, window=[1761702960..1761702970)
user=u2, total=6, window=[1761702960..1761702970)
user=u2, total=1, window=[1761702970..1761702980)




## 5) Composite transform (`PTransform`)

We bundle cleaning, parsing, and tagging into a reusable `PTransform`, then write to `./artifacts/composite-*.txt`.


In [7]:

from apache_beam import PTransform

class CleanParseAndTag(PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "DropBlanks2" >> beam.Filter(lambda line: line.strip() != "")
            | "Parse2" >> beam.Map(lambda line: (line.split(",")[0].strip(), int(line.split(",")[1].strip())))
            | "TagScore" >> beam.Map(lambda kv: {"name": kv[0], "score": kv[1], "is_big": kv[1] >= 10})
        )

with beam.Pipeline(options=opts) as p:
    result = (
        p
        | "ReadRawAgain" >> beam.io.ReadFromText(str(ARTIFACTS / "input.txt"))
        | "CompositeTransform" >> CleanParseAndTag()
    )
    _ = (
        result
        | "ToStr" >> beam.Map(lambda d: str(d))
        | "WriteCompositeOut" >> beam.io.WriteToText(str(ARTIFACTS / "composite"), file_name_suffix=".txt", num_shards=1)
    )

list_artifacts("composite*")
first_match_text("composite")


[artifacts/composite*] -> 1 files
 - composite-00000-of-00001.txt
== composite-00000-of-00001.txt ==
{'name': 'Alpha', 'score': 10, 'is_big': True}
{'name': 'Beta', 'score': 7, 'is_big': False}
{'name': 'Gamma', 'score': 13, 'is_big': True}
{'name': 'Delta', 'score': 5, 'is_big': False}




## Cleanup

Remove the entire `./artifacts` directory to reset the workspace.


In [None]:

# WARNING: this deletes all generated files for this demo.
# Uncomment to clean up.
import shutil
shutil.rmtree(ARTIFACTS)
print("Removed:", ARTIFACTS)


### ✅ Coverage Recap

- **Pipeline I/O**: `ReadFromText`, `WriteToText` (Sections 1–5)  
- **Map**: cleaning, parsing, formatting (all sections)  
- **Filter**: removing blanks/invalid rows (Sections 1 & 5)  
- **ParDo**: `SizeClassifier` (tagged outputs); `FormatWithWindow` using `WindowParam` (Sections 2 & 4)  
- **Partition**: `beam.Partition` by modulo (Section 3)  
- **Windowing**: `WindowInto(FixedWindows(10))` + `CombinePerKey` (Section 4)  
- **Composite Transform**: `CleanParseAndTag` (`PTransform`) (Section 5)
