<a href="https://colab.research.google.com/github/kat-le/cmpe255-apache-beam/blob/main/apache_beam_data_engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Setup

In [None]:
!pip install --quiet apache-beam

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import window as beam_window

print("Beam version:", beam.__version__)
from dataclasses import dataclass
from datetime import datetime
import csv, io

Beam version: 2.68.0


1) Make a local csv for sales

In [None]:
sales_path = "/content/sales.csv"

rows = [
    # order_id,    ts (ISO),        store, product,     category, qty, unit_price, payment_method, city
    ("o-0001","2025-01-01T00:00:00","SFO","Coffee Beans","Grocery", 1, 18.00,"card","San Francisco"),
    ("o-0002","2025-01-01T00:00:03","SFO","Mug","Houseware",         2, 12.50,"cash","San Francisco"),
    ("o-0003","2025-01-01T00:00:07","LA", "T-shirt","Apparel",        1, 25.00,"card","Los Angeles"),
    ("o-0004","2025-01-01T00:00:12","LA", "Notebook","Stationery",    4, 4.00,"card","Los Angeles"),
    ("o-0005","2025-01-01T00:00:14","NYC","Headphones","Electronics", 1, 79.00,"card","New York"),
    ("o-0006","2025-01-01T00:00:18","SFO","Chocolate","Grocery",      3, 3.50,"cash","San Francisco"),
    ("o-0007","2025-01-01T00:00:21","NYC","Sweater","Apparel",        1, 55.00,"card","New York"),
    ("o-0008","2025-01-01T00:00:27","LA", "Water Bottle","Houseware", 2, 15.00,"card","Los Angeles"),
    ("o-0009","2025-01-01T00:00:32","SFO","Backpack","Apparel",       1, 49.00,"card","San Francisco"),
    ("o-0010","2025-01-01T00:00:38","NYC","Laptop Stand","Electronics",1, 32.00,"cash","New York"),
]

with open(sales_path, "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["order_id","ts","store","product","category","qty","unit_price","payment_method","city"])
    w.writerows(rows)

print("Wrote", sales_path)


Wrote /content/sales.csv


2) ParDo to parse CSV → dicts, then Map & Filter

In [None]:
class ParseSale(beam.DoFn):
    def process(self, line: str):
        # skip header line
        if line.startswith("order_id,"):
            return
        r = list(csv.reader([line]))[0]
        order = {
            "order_id": r[0],
            "ts": r[1],  # ISO8601 string
            "store": r[2],
            "product": r[3],
            "category": r[4],
            "qty": int(r[5]),
            "unit_price": float(r[6]),
            "payment_method": r[7],
            "city": r[8],
        }
        yield order

with beam.Pipeline(options=PipelineOptions()) as p:
    sales = (p
             | "ReadCSV" >> beam.io.ReadFromText(sales_path)
             | "ParseRows(ParDo)" >> beam.ParDo(ParseSale()))

    # Map: compute revenue
    sales_with_revenue = sales | "AddRevenue(Map)" >> beam.Map(
        lambda s: {**s, "revenue": s["qty"] * s["unit_price"]}
    )

    # Filter: high-value orders (revenue > $50)
    high_value = sales_with_revenue | "HighValue(Filter)" >> beam.Filter(lambda s: s["revenue"] > 50.0)

    _ = sales_with_revenue | "PrintAll" >> beam.Map(lambda s: print("SALE:", s))
    _ = high_value           | "PrintHigh" >> beam.Map(lambda s: print("HIGH:", s))






SALE: {'order_id': 'o-0001', 'ts': '2025-01-01T00:00:00', 'store': 'SFO', 'product': 'Coffee Beans', 'category': 'Grocery', 'qty': 1, 'unit_price': 18.0, 'payment_method': 'card', 'city': 'San Francisco', 'revenue': 18.0}
SALE: {'order_id': 'o-0002', 'ts': '2025-01-01T00:00:03', 'store': 'SFO', 'product': 'Mug', 'category': 'Houseware', 'qty': 2, 'unit_price': 12.5, 'payment_method': 'cash', 'city': 'San Francisco', 'revenue': 25.0}
SALE: {'order_id': 'o-0003', 'ts': '2025-01-01T00:00:07', 'store': 'LA', 'product': 'T-shirt', 'category': 'Apparel', 'qty': 1, 'unit_price': 25.0, 'payment_method': 'card', 'city': 'Los Angeles', 'revenue': 25.0}
SALE: {'order_id': 'o-0004', 'ts': '2025-01-01T00:00:12', 'store': 'LA', 'product': 'Notebook', 'category': 'Stationery', 'qty': 4, 'unit_price': 4.0, 'payment_method': 'card', 'city': 'Los Angeles', 'revenue': 16.0}
HIGH: {'order_id': 'o-0005', 'ts': '2025-01-01T00:00:14', 'store': 'NYC', 'product': 'Headphones', 'category': 'Electronics', 'qty':

3) Composite transform: clean + aggregate revenue per category

In [None]:
class CleanParseAndRevenue(beam.PTransform):
    def expand(self, pcoll):
        parsed = pcoll | "CPR_Parse" >> beam.ParDo(ParseSale())
        with_rev = parsed | "CPR_AddRevenue" >> beam.Map(lambda s: {**s, "revenue": s["qty"]*s["unit_price"]})
        return with_rev

class RevenuePerCategory(beam.PTransform):
    def expand(self, sales_with_rev):
        return (
            sales_with_rev
            | "ToKV(category,revenue)" >> beam.Map(lambda s: (s["category"], s["revenue"]))
            | "SumPerCategory" >> beam.CombinePerKey(sum)
        )

with beam.Pipeline() as p:
    cat_revenue = (
        p
        | "ReadCSV2" >> beam.io.ReadFromText(sales_path)
        | "Clean+Revenue" >> CleanParseAndRevenue()
        | "RevenuePerCategory" >> RevenuePerCategory()
    )
    _ = cat_revenue | "PrintCategoryRevenue" >> beam.Map(print)


('Grocery', 28.5)
('Houseware', 55.0)
('Apparel', 129.0)
('Stationery', 16.0)
('Electronics', 111.0)


4) Partition: bucket orders by revenue size (Low/Med/High)

In [None]:
def revenue_bucket(sale, n_partitions):
    rev = sale["qty"] * sale["unit_price"]
    if rev < 20:   return 0  # Low
    if rev < 75:  return 1  # Medium
    return 2                  # High

with beam.Pipeline() as p:
    low, med, high = (
        p
        | "ReadCSV3" >> beam.io.ReadFromText(sales_path)
        | "ParseForPartition" >> beam.ParDo(ParseSale())
        | "PartitionByRevenue" >> beam.Partition(revenue_bucket, 3)
    )

    _ = low  | "PrintLow"  >> beam.Map(lambda s: print("LOW:",  s["order_id"]))
    _ = med  | "PrintMed"  >> beam.Map(lambda s: print("MED:",  s["order_id"]))
    _ = high | "PrintHigh2">> beam.Map(lambda s: print("HIGH:", s["order_id"]))


LOW: o-0001
MED: o-0002
MED: o-0003
LOW: o-0004
HIGH: o-0005
LOW: o-0006
MED: o-0007
MED: o-0008
MED: o-0009
MED: o-0010


5) Windowing (Fixed windows over event time) + per-store revenue

In [None]:
class ShowWithWindow(beam.DoFn):
    def process(self, kv, window=beam.DoFn.WindowParam):
        (key, value) = kv
        yield f"[{window.start.to_utc_datetime()}–{window.end.to_utc_datetime()}] {key}: {value:.2f}"

def to_event_ts(sale):
    # Attach event-time timestamps from the CSV 'ts' field
    return beam.window.TimestampedValue(sale, datetime.fromisoformat(sale["ts"]).timestamp())

with beam.Pipeline() as p:
    windowed_store_rev = (
        p
        | "ReadCSV4" >> beam.io.ReadFromText(sales_path)
        | "ParseForWindow" >> beam.ParDo(ParseSale())
        | "AddRevenue" >> beam.Map(lambda s: {**s, "revenue": s["qty"] * s["unit_price"]})
        | "AttachEventTime" >> beam.Map(to_event_ts)
        | "Fixed10s" >> beam.WindowInto(beam_window.FixedWindows(10))
        | "ToKV(store,revenue)" >> beam.Map(lambda s: (s["store"], s["revenue"]))
        | "SumPerStorePerWindow" >> beam.CombinePerKey(sum)
    )

    # Show window boundaries in output
    _ = (
   windowed_store_rev
    | "FmtWindowed" >> beam.ParDo(ShowWithWindow())
    | "PrintWindowed" >> beam.Map(print)
)


[2025-01-01 00:00:00–2025-01-01 00:00:10] SFO: 43.00
[2025-01-01 00:00:10–2025-01-01 00:00:20] SFO: 10.50
[2025-01-01 00:00:30–2025-01-01 00:00:40] SFO: 49.00
[2025-01-01 00:00:00–2025-01-01 00:00:10] LA: 25.00
[2025-01-01 00:00:10–2025-01-01 00:00:20] LA: 16.00
[2025-01-01 00:00:20–2025-01-01 00:00:30] LA: 30.00
[2025-01-01 00:00:10–2025-01-01 00:00:20] NYC: 79.00
[2025-01-01 00:00:20–2025-01-01 00:00:30] NYC: 55.00
[2025-01-01 00:00:30–2025-01-01 00:00:40] NYC: 32.00


6) Pipeline I/O (read CSV, write results to files)

In [None]:
out_prefix = "/content/beam_output/revenue_by_store_window"

with beam.Pipeline() as p:
    formatted = (
        p
        | "ReadCSV5" >> beam.io.ReadFromText(sales_path)
        | "ParseForIO" >> beam.ParDo(ParseSale())
        | "AddRevIO" >> beam.Map(lambda s: {**s, "revenue": s["qty"] * s["unit_price"]})
        | "AttachTS_IO" >> beam.Map(to_event_ts)
        | "Fixed10s_IO" >> beam.WindowInto(beam_window.FixedWindows(10))
        | "ToKV_IO" >> beam.Map(lambda s: (s["store"], s["revenue"]))
        | "SumPerStorePerWindow_IO" >> beam.CombinePerKey(sum)
        | "FormatLines" >> beam.Map(lambda kv: f"{kv[0]}\t{kv[1]:.2f}")
    )

    _ = formatted | "WriteTxt" >> beam.io.WriteToText(out_prefix, file_name_suffix=".txt", num_shards=1)

print("Wrote files under:", out_prefix + "-00000-of-00001.txt")




Wrote files under: /content/beam_output/revenue_by_store_window-00000-of-00001.txt
