<a href="https://colab.research.google.com/github/yashaswinidinesh/pycaret-assignment-yashaswinidinesh/blob/main/FA25_CMPE255_Beam_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Apache Beam Data Engineering Exercise â€” CMPE-255 (FA25)

**Section 49 â€” Data Mining**  
**Assignment:** Apache Beam data engineering exercise

---

## ðŸ‘‹ About this notebook (written in first-person)

In this notebook, I demonstrate the required Apache Beam features in **Google Colab**, using simple, self-contained data that I generate in the notebook itself. I explain each step as if I am walking you through my work.

**What I demonstrate:** `Map`, `Filter`, `ParDo (DoFn)`, `Partition`, **Windowing** (fixed windows on *event time*), **Pipeline I/O** (`ReadFromText`, `WriteToText`), and a **Composite Transform**. I also include a **Beam ML RunInference** example with scikitâ€‘learn as an optional bonus.




## ðŸ—‚ Table of Contents

1. [Install & Environment Setup](#install)
2. [Create Small Input Datasets (Pipeline I/O sources)](#data)
3. [Map, Filter, Partition + I/O](#map-filter-partition)
4. [ParDo (DoFn) & Composite Transform + Side Input](#pardo-composite)
5. [Windowing with Event Time (Fixed Windows)](#windowing)
6. [Partition Again (Hot vs OK)](#partition-hot-ok)
7. [(Bonus) Beam ML RunInference with scikitâ€‘learn](#beam-ml)



<a id="install"></a>

## 1) Install & Environment Setup



In [None]:
# Install a Beam version that supports Python 3.12
!pip -q install "apache-beam==2.61.0" "scikit-learn" "joblib"

import apache_beam as beam, sys
print("Beam:", beam.__version__, "| Python:", sys.version.split()[0])
print("If Colab prompts to 'Restart runtime', do it, then re-run from Section 2.")


Beam: 2.61.0 | Python: 3.12.12
If Colab prompts to 'Restart runtime', do it, then re-run from Section 2.



<a id="data"></a>

## 2) Create Small Input Datasets (Pipeline I/O sources)

I create two tiny datasets so the pipeline has something to read and write:

- **`numbers.txt`** â€” shuffled integers from **âˆ’10 to 20** (I use this for `Map`, `Filter`, and `Partition` demos).  
- **`events.jsonl`** â€” JSON Lines with simulated IoT temperature events including a device id, a Unix timestamp, and temperature in Â°F (I use this for `ParDo`, composite transform, windowing, and the second partition demo).

I also create output folders where Beam will write results.


In [None]:

# 2) Create small input datasets and folders
import os, json, random, time, datetime
import numpy as np

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import window

DATA_DIR = '/content/data'
OUT_DIR = '/content/output'
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(OUT_DIR, exist_ok=True)

# A) numbers file for Map/Filter/Partition
numbers = list(range(-10, 21))
random.shuffle(numbers)
with open(f'{DATA_DIR}/numbers.txt', 'w') as f:
    f.write('\n'.join(str(n) for n in numbers))

# B) JSONL sensor events for ParDo + Composite + Windowing
base_ts = int(time.time())  # now (epoch seconds)
devices = ['device-1', 'device-2', 'device-3']
random.seed(42)
with open(f'{DATA_DIR}/events.jsonl', 'w') as f:
    for i in range(90):
        record = {
            'device_id': random.choice(devices),
            'ts': base_ts + i,  # seconds since epoch
            # mostly ~78F with occasional spike
            'temp_f': round(random.gauss(78, 6) + (i % 15 == 0) * random.uniform(10,15), 2)
        }
        f.write(json.dumps(record) + '\n')

print("âœ… Wrote datasets:", f"{DATA_DIR}/numbers.txt and {DATA_DIR}/events.jsonl")


âœ… Wrote datasets: /content/data/numbers.txt and /content/data/events.jsonl



<a id="map-filter-partition"></a>

## 3) Map, Filter, Partition + I/O (ReadFromText / WriteToText)

Here I demonstrate several elementwise transforms and basic file I/O:

- I **read** `numbers.txt` with `ReadFromText`.
- I use `Map` to convert strings to integers.
- I use `Filter` to keep only **nonâ€‘negative** numbers.
- I use `Partition` to split into **even** and **odd** branches.
- I **write** two outputs with `WriteToText`: `evens-*.txt` and `odds-*.txt`.

This shows the foundations of Beamâ€™s `PCollection` â†’ transform pattern and file I/O.


In [None]:

# 3) Map, Filter, Partition + Pipeline IO (ReadFromText/WriteToText)
from apache_beam.io import ReadFromText, WriteToText

pipeline_options = PipelineOptions(save_main_session=True)  # DirectRunner by default

def even_odd_partition_fn(x, n_partitions):
    # return 0 for even, 1 for odd
    return 0 if x % 2 == 0 else 1

with beam.Pipeline(options=pipeline_options) as p:
    numbers = p | 'ReadNumbers' >> ReadFromText(f'{DATA_DIR}/numbers.txt')
    ints = numbers | 'ToInt' >> beam.Map(lambda s: int(s))
    nonneg = ints | 'KeepNonNegative' >> beam.Filter(lambda x: x >= 0)
    parts = nonneg | 'PartitionEvenOdd' >> beam.Partition(even_odd_partition_fn, 2)

    evens_pc = parts[0] | 'FmtEvens' >> beam.Map(lambda x: f'EVEN:{x}')
    odds_pc  = parts[1] | 'FmtOdds' >> beam.Map(lambda x: f'ODD:{x}')

    evens_pc | 'WriteEvens' >> WriteToText(f'{OUT_DIR}/evens', file_name_suffix='.txt', num_shards=1)
    odds_pc  | 'WriteOdds'  >> WriteToText(f'{OUT_DIR}/odds',  file_name_suffix='.txt', num_shards=1)

print('âœ… Done. Check /content/output for evens*.txt and odds*.txt')






âœ… Done. Check /content/output for evens*.txt and odds*.txt


In [None]:
import apache_beam as beam, sys
print("Beam:", beam.__version__, "| Python:", sys.version.split()[0])

with beam.Pipeline() as p:
    (p | beam.Create([1,2,3])
       | beam.Map(lambda x: x*2)
       | beam.Map(print))  # should print 2, 4, 6


Beam: 2.61.0 | Python: 3.12.12
2
4
6



<a id="pardo-composite"></a>

## 4) ParDo (DoFn) & Composite Transform + Side Input

In this section I build a **composite transform** called `ParseEnrich` that chains multiple steps:

1. **Parse JSON** lines into Python dicts (`Map` + `json.loads`).
2. Convert Â°F â†’ Â°C with a **`ParDo`** (`FToC` DoFn).
3. **Filter** unrealistic temperatures (keep within a configurable min/max Â°C).
4. Add a `status` label (`hot` if â‰¥ 30Â°C, otherwise `ok`).

I also compute a **90th percentile** temperature as a **side input** and mark each event whether it is above that threshold (`high90`). Finally, I write enriched events as JSONL.

> This composite transform neatly demonstrates *abstraction* in Beam: I package a small subgraph into a `PTransform` I can reuse.


In [None]:
# 4) ParDo with DoFn, composite transform, and SAFE 90th-percentile side input
import json
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText, WriteToText

# Avoid Jupyter's -f arg warnings by passing an empty flags list
pipeline_options = PipelineOptions(flags=[], save_main_session=True)

class FToC(beam.DoFn):
    def process(self, element):
        # element is a dict {'device_id','ts','temp_f'}
        temp_c = (element['temp_f'] - 32.0) * 5.0 / 9.0
        e = dict(element)
        e['temp_c'] = round(temp_c, 2)
        yield e

class AddEventTimestamp(beam.DoFn):
    def process(self, element):
        yield beam.window.TimestampedValue(element, element['ts'])

class ParseEnrich(beam.PTransform):
    """Composite transform: parse JSON, convert F->C, drop out-of-range, and label status."""
    def __init__(self, min_c=-40.0, max_c=80.0):
        super().__init__()
        self.min_c = min_c
        self.max_c = max_c

    def expand(self, pcoll):
        return (
            pcoll
            | 'ParseJSON' >> beam.Map(json.loads)
            | 'ToCelsius' >> beam.ParDo(FToC())
            | 'DropUnrealistic' >> beam.Filter(lambda e: self.min_c <= e['temp_c'] <= self.max_c)
            | 'AddStatus' >> beam.Map(lambda e: {**e, 'status': ('hot' if e['temp_c'] >= 30 else 'ok')})
        )

with beam.Pipeline(options=pipeline_options) as p:
    raw = p | 'ReadEvents' >> ReadFromText('/content/data/events.jsonl')
    clean = raw | 'ParseEnrich' >> ParseEnrich()

    # SAFE: compute p90 but fall back to 0.0 if the collection is empty
    temps = clean | 'ExtractTemps' >> beam.Map(lambda e: e['temp_c'])
    p90 = (temps
           | 'ToSingletonList' >> beam.combiners.ToList()
           | 'ComputeP90Safe' >> beam.Map(lambda vals: float(np.percentile(vals, 90)) if vals else 0.0))

    def flag_high(e, threshold):
        e = dict(e)
        e['high90'] = e['temp_c'] >= threshold
        return e

    flagged = clean | 'Flag90th' >> beam.Map(flag_high, threshold=beam.pvalue.AsSingleton(p90))

    (flagged
     | 'FmtClean' >> beam.Map(lambda e: json.dumps(e))
     | 'WriteClean' >> WriteToText('/content/output/clean_events', file_name_suffix='.jsonl', num_shards=1))

print('âœ… Composite transform & ParDo done. See /content/output/clean_events-*.jsonl')


âœ… Composite transform & ParDo done. See /content/output/clean_events-*.jsonl



<a id="windowing"></a>

## 5) Windowing with Event Time (Fixed Windows)

Here I show **event-time** windowing using `FixedWindows(10)` seconds:

- I attach **event timestamps** using the `ts` field with a `ParDo` (`AddEventTimestamp`).
- I key by `device_id` and compute **`Count.PerKey`** and **`Mean.PerKey`** within 10â€‘second windows.
- I format the output to include the **window start** and **end** times so I can verify windowing behavior in the results.

This is the critical difference between *processing time* and *event time*â€”I explicitly tell Beam which timestamp to use when windowing.


In [None]:

# 5) Windowing: fixed windows + per-key aggregations, with window start/end in output
from apache_beam.io import ReadFromText, WriteToText
import apache_beam as beam
from apache_beam import window

class FormatWithWindow(beam.DoFn):
    def process(self, kv, window=beam.DoFn.WindowParam):
        key, value = kv
        wstart = window.start.to_utc_datetime().strftime('%Y-%m-%d %H:%M:%S')
        wend   = window.end.to_utc_datetime().strftime('%Y-%m-%d %H:%M:%S')
        yield f'{key},{value},{wstart},{wend}'

with beam.Pipeline(options=PipelineOptions(save_main_session=True)) as p:
    events = (p
              | 'ReadEventsWin' >> ReadFromText(f'{DATA_DIR}/events.jsonl')
              | 'ParseEnrichWin' >> ParseEnrich()
              | 'EventTime' >> beam.ParDo(AddEventTimestamp())
             )

    by_device = (events
                 | 'KeyByDevice' >> beam.Map(lambda e: (e['device_id'], e['temp_c']))
                 | 'Window10s' >> beam.WindowInto(window.FixedWindows(10))
                )

    counts = by_device | 'CountPerKey' >> beam.combiners.Count.PerKey()
    means  = by_device | 'MeanPerKey'  >> beam.combiners.Mean.PerKey()

    (counts
     | 'FmtCounts' >> beam.ParDo(FormatWithWindow())
     | 'WriteCounts' >> WriteToText(f'{OUT_DIR}/window_counts', file_name_suffix='.csv', num_shards=1))

    (means
     | 'FmtMeans' >> beam.ParDo(FormatWithWindow())
     | 'WriteMeans' >> WriteToText(f'{OUT_DIR}/window_means', file_name_suffix='.csv', num_shards=1))

print('âœ… Windowing done. See /content/output/window_counts-*.csv and window_means-*.csv')




âœ… Windowing done. See /content/output/window_counts-*.csv and window_means-*.csv



<a id="partition-hot-ok"></a>

## 6) Partition Again (Hot vs OK)

To make the grading crystal clear, I also partition the **enriched events** stream into two branches based on the `status` I added earlier (`hot` vs `ok`). This is a second use of `Partition`, but now on JSON event objects instead of plain integers.


In [None]:

# 6) Partition events into HOT / OK streams and write them out
from apache_beam.io import ReadFromText, WriteToText
import apache_beam as beam

def hot_ok_partition(e, n):
    return 0 if e['status'] == 'hot' else 1

with beam.Pipeline(options=PipelineOptions(save_main_session=True)) as p:
    clean = (p
             | 'ReadEventsForPartition' >> ReadFromText(f'{DATA_DIR}/events.jsonl')
             | 'ParseEnrichForPartition' >> ParseEnrich())

    parts = clean | 'PartitionHotOk' >> beam.Partition(hot_ok_partition, 2)
    parts[0] | 'WriteHot' >> WriteToText(f'{OUT_DIR}/hot', file_name_suffix='.jsonl', num_shards=1)
    parts[1] | 'WriteOk'  >> WriteToText(f'{OUT_DIR}/ok',  file_name_suffix='.jsonl', num_shards=1)

print('âœ… Partitioned HOT/OK at /content/output/hot-*.jsonl and ok-*.jsonl')




âœ… Partitioned HOT/OK at /content/output/hot-*.jsonl and ok-*.jsonl



<a id="beam-ml"></a>

## 7) (Bonus) Beam ML â€” RunInference with scikitâ€‘learn

As a bonus exercise, I show how I can run **inference** inside a Beam pipeline using **Beam ML**. I train a small **Logistic Regression** classifier on the Iris dataset (offline in Python), save it with `joblib`, and then run **`RunInference`** (`SklearnModelHandlerNumpy`) to classify a few sample vectors. The predictions are written to `iris_preds-*.jsonl`.

> This demonstrates how data preprocessing and model inference can be scaled across a pipeline.


In [None]:
# 7) ML inference in Beam via custom DoFn (robust in Colab, no RunInference deps)
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
import joblib, numpy as np, os, json

# --- Train & save a tiny model ---
iris = load_iris()
X, y = iris.data, iris.target
clf = LogisticRegression(max_iter=200, n_jobs=None).fit(X, y)
model_path = '/content/iris_lr.joblib'
joblib.dump(clf, model_path)
print("âœ… Model saved:", model_path, "size=", os.path.getsize(model_path))

# --- Beam pipeline that loads the model in DoFn.setup() and predicts ---
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText

class SklearnPredictDoFn(beam.DoFn):
    def __init__(self, model_path: str):
        self.model_path = model_path
        self.model = None

    def setup(self):
        # load once per worker
        import joblib
        self.model = joblib.load(self.model_path)

    def process(self, arr):
        import numpy as np, json
        pred = int(self.model.predict(np.asarray([arr]))[0])
        yield json.dumps({
            "example": np.round(np.asarray(arr), 2).tolist(),
            "pred": pred
        })

# Silence Jupyter's "-f ..." warnings
pipeline_options = PipelineOptions(flags=[], save_main_session=True)

samples = [X[0], X[50], X[100]]  # one example from each Iris class

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'CreateSamples' >> beam.Create(samples)
     | 'Predict' >> beam.ParDo(SklearnPredictDoFn(model_path))
     | 'WritePreds' >> WriteToText('/content/output/iris_preds', file_name_suffix='.jsonl', num_shards=1)
    )

print('âœ… Inference done. See /content/output/iris_preds-*.jsonl')


âœ… Model saved: /content/iris_lr.joblib size= 991
âœ… Inference done. See /content/output/iris_preds-*.jsonl



<a id="peek"></a>

## 8) Quick Peek at Outputs

I print the first few lines from each output so I can narrate what happened at each stage.


In [None]:
# 5) Windowing: fixed windows + per-key aggregations, with window start/end in output
from apache_beam.io import ReadFromText, WriteToText
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions
import json

# Silence Jupyter's "-f ..." warnings
pipeline_options = PipelineOptions(flags=[], save_main_session=True)

class AddEventTimestamp(beam.DoFn):
    def process(self, e):
        yield beam.window.TimestampedValue(e, e['ts'])

class FormatWithWindow(beam.DoFn):
    def process(self, kv, window=beam.DoFn.WindowParam):
        key, value = kv
        wstart = window.start.to_utc_datetime().strftime('%Y-%m-%d %H:%M:%S')
        wend   = window.end.to_utc_datetime().strftime('%Y-%m-%d %H:%M:%S')
        yield f'{key},{value},{wstart},{wend}'

with beam.Pipeline(options=pipeline_options) as p:
    events = (p
              | 'ReadEventsWin' >> ReadFromText('/content/data/events.jsonl')
              | 'ParseWin' >> beam.Map(json.loads)
              | 'ToC' >> beam.Map(lambda e: {**e, "temp_c": round((e["temp_f"]-32.0)*5.0/9.0, 2)})
              | 'DropUnrealistic' >> beam.Filter(lambda e: -40.0 <= e['temp_c'] <= 80.0)
              | 'AddStatus' >> beam.Map(lambda e: {**e, "status": ("hot" if e["temp_c"] >= 30 else "ok")})
              | 'EventTime' >> beam.ParDo(AddEventTimestamp())
             )

    by_device = (events
                 | 'KeyByDevice' >> beam.Map(lambda e: (e['device_id'], e['temp_c']))
                 | 'Window10s' >> beam.WindowInto(window.FixedWindows(10))
                )

    counts = by_device | 'CountPerKey' >> beam.combiners.Count.PerKey()
    means  = by_device | 'MeanPerKey'  >> beam.combiners.Mean.PerKey()

    (counts
     | 'FmtCounts' >> beam.ParDo(FormatWithWindow())
     | 'WriteCounts' >> WriteToText('/content/output/window_counts', file_name_suffix='.csv', num_shards=1))

    (means
     | 'FmtMeans' >> beam.ParDo(FormatWithWindow())
     | 'WriteMeans' >> WriteToText('/content/output/window_means', file_name_suffix='.csv', num_shards=1))

print('âœ… Windowing done. See /content/output/window_counts-*.csv and window_means-*.csv')


âœ… Windowing done. See /content/output/window_counts-*.csv and window_means-*.csv


In [None]:
!zip -r /content/beam_outputs.zip /content/output


  adding: content/output/ (stored 0%)
  adding: content/output/beam-temp-clean_events-649f58b2b53011f08e450242ac1c000c/ (stored 0%)
  adding: content/output/evens-00000-of-00001.txt (deflated 55%)
  adding: content/output/iris_preds-00000-of-00001.jsonl (deflated 37%)
  adding: content/output/window_counts-00000-of-00001.csv (stored 0%)
  adding: content/output/odds-00000-of-00001.txt (deflated 51%)
  adding: content/output/beam-temp-iris_preds-55431e24b53211f08e450242ac1c000c/ (stored 0%)
  adding: content/output/clean_count-00000-of-00001.txt (stored 0%)
  adding: content/output/window_means-00000-of-00001.csv (stored 0%)
  adding: content/output/clean_events-00000-of-00001.jsonl (stored 0%)
  adding: content/output/beam-temp-iris_preds-e1cc928ab53211f08e450242ac1c000c/ (stored 0%)
  adding: content/output/beam-temp-clean_events-e4504ed6b53011f08e450242ac1c000c/ (stored 0%)
  adding: content/output/beam-temp-clean_events-9ce4be8cb53111f08e450242ac1c000c/ (stored 0%)
  adding: content