In [1]:
# If you re-run this cell, restart the runtime after install finishes.
!pip -q install apache-beam[gcp]


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.8/88.8 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m32.4 MB/s[0m eta [3

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam import window
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.utils.timestamp import Timestamp
from datetime import datetime
import json, os, shutil


In [3]:
# Make a small CSV with a header row + some rows having an event_time (ISO8601)
os.makedirs("/content/data", exist_ok=True)
csv_path = "/content/data/sales.csv"

rows = [
    "order_id,sku,country,quantity,unit_price,event_time",
    "1,A100,US,1,10.0,2025-10-26T12:00:02",
    "2,A100,US,2,10.0,2025-10-26T12:00:15",
    "3,B200,CA,3,7.5,2025-10-26T12:00:34",
    "4,C300,US,1,12.5,2025-10-26T12:01:03",
    "5,B200,CA,5,7.5,2025-10-26T12:01:18",
    "6,A100,MX,2,10.0,2025-10-26T12:01:42",
]

with open(csv_path, "w") as f:
    f.write("\n".join(rows))

print("Wrote:", csv_path)
print(open(csv_path).read())


Wrote: /content/data/sales.csv
order_id,sku,country,quantity,unit_price,event_time
1,A100,US,1,10.0,2025-10-26T12:00:02
2,A100,US,2,10.0,2025-10-26T12:00:15
3,B200,CA,3,7.5,2025-10-26T12:00:34
4,C300,US,1,12.5,2025-10-26T12:01:03
5,B200,CA,5,7.5,2025-10-26T12:01:18
6,A100,MX,2,10.0,2025-10-26T12:01:42


In [4]:
def parse_csv(line: str):
    # skip header
    if line.startswith("order_id"):
        return None
    parts = line.split(",")
    return {
        "order_id": int(parts[0]),
        "sku": parts[1],
        "country": parts[2],
        "quantity": int(parts[3]),
        "unit_price": float(parts[4]),
        "event_time": parts[5],  # string for now
    }

def compute_revenue(row):
    row = dict(row)
    row["revenue"] = row["quantity"] * row["unit_price"]
    return row

def is_multi_item(row):
    return row["quantity"] > 1

def partition_by_country(row, n_partitions):
    # 0: US, 1: CA, 2: Other
    if row["country"] == "US": return 0
    if row["country"] == "CA": return 1
    return 2

class AddDiscountDoFn(beam.DoFn):
    """Example ParDo that tags a simple discount for bulk quantity."""
    def process(self, row):
        row = dict(row)
        row["discount"] = 0.1 if row["quantity"] >= 3 else 0.0
        row["final_revenue"] = row["revenue"] * (1 - row["discount"])
        yield row


In [5]:
class CleanAndEnrich(beam.PTransform):
    """
    Composite transform: parse -> filter -> enrich -> pardo
    Demonstrates chaining multiple steps under a single, reusable transform.
    """
    def expand(self, pcoll):
        return (
            pcoll
            | "ParseCSV" >> beam.Map(parse_csv)
            | "DropNone" >> beam.Filter(lambda x: x is not None)
            | "KeepMultiItem" >> beam.Filter(is_multi_item)        # filter
            | "ComputeRevenue" >> beam.Map(compute_revenue)        # map
            | "AddDiscount" >> beam.ParDo(AddDiscountDoFn())       # ParDo
        )


In [9]:
# Clean previous outputs
shutil.rmtree("/content/out", ignore_errors=True)
os.makedirs("/content/out", exist_ok=True)

pipeline_options = PipelineOptions(
    streaming=False,
    save_main_session=True,
)

with beam.Pipeline(options=pipeline_options) as p:
    # --- READ (Pipeline IO): ReadFromText
    lines = p | "ReadCSV" >> ReadFromText(csv_path)

    # --- Use composite transform (includes map, filter, ParDo)
    enriched = lines | "CleanAndEnrich" >> CleanAndEnrich()

    # --- PARTITION by country
    parts = enriched | "PartitionByCountry" >> beam.Partition(partition_by_country, 3)
    us_rows, ca_rows, other_rows = parts[0], parts[1], parts[2]

    # --- WRITE (Pipeline IO): WriteToText (one file per partition)
    (us_rows
     | "US->JSON" >> beam.Map(json.dumps)
     | "WriteUS" >> WriteToText("/content/out/us", file_name_suffix=".json"))
    (ca_rows
     | "CA->JSON" >> beam.Map(json.dumps)
     | "WriteCA" >> WriteToText("/content/out/ca", file_name_suffix=".json"))
    (other_rows
     | "Other->JSON" >> beam.Map(json.dumps)
     | "WriteOther" >> WriteToText("/content/out/other", file_name_suffix=".json"))

print("Written files under /content/out :")
!find /content/out -maxdepth 1 -type f -print -exec sh -c 'echo "----"; tail -n +1 "{}"' \;




Written files under /content/out :
/content/out/other-00000-of-00001.json
----
{"order_id": 6, "sku": "A100", "country": "MX", "quantity": 2, "unit_price": 10.0, "event_time": "2025-10-26T12:01:42", "revenue": 20.0, "discount": 0.0, "final_revenue": 20.0}
/content/out/us-00000-of-00001.json
----
{"order_id": 2, "sku": "A100", "country": "US", "quantity": 2, "unit_price": 10.0, "event_time": "2025-10-26T12:00:15", "revenue": 20.0, "discount": 0.0, "final_revenue": 20.0}
/content/out/ca-00000-of-00001.json
----
{"order_id": 3, "sku": "B200", "country": "CA", "quantity": 3, "unit_price": 7.5, "event_time": "2025-10-26T12:00:34", "revenue": 22.5, "discount": 0.1, "final_revenue": 20.25}
{"order_id": 5, "sku": "B200", "country": "CA", "quantity": 5, "unit_price": 7.5, "event_time": "2025-10-26T12:01:18", "revenue": 37.5, "discount": 0.1, "final_revenue": 33.75}


In [12]:
# Fix: make event_time tz-aware (UTC) before converting to Beam Timestamp
from datetime import datetime, timezone
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp

def to_timestamped(row):
    t = datetime.fromisoformat(row["event_time"])
    if t.tzinfo is None:
        t = t.replace(tzinfo=timezone.utc)  # treat as UTC
    return beam.window.TimestampedValue(row, Timestamp.from_utc_datetime(t))


In [14]:
from datetime import datetime, timezone
import json, os, shutil, glob
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.utils.timestamp import Timestamp

# --- helpers (same as before) ---
def parse_csv(line: str):
    if line.startswith("order_id"):
        return None
    parts = line.split(",")
    return {
        "order_id": int(parts[0]),
        "sku": parts[1],
        "country": parts[2],
        "quantity": int(parts[3]),
        "unit_price": float(parts[4]),
        "event_time": parts[5],
    }

def compute_revenue(row):
    row = dict(row)
    row["revenue"] = row["quantity"] * row["unit_price"]
    return row

def is_multi_item(row):
    return row["quantity"] > 1

class AddDiscountDoFn(beam.DoFn):
    def process(self, row):
        row = dict(row)
        row["discount"] = 0.1 if row["quantity"] >= 3 else 0.0
        row["final_revenue"] = row["revenue"] * (1 - row["discount"])
        yield row

class CleanAndEnrich(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "ParseCSV" >> beam.Map(parse_csv)
            | "DropNone" >> beam.Filter(lambda x: x is not None)
            | "KeepMultiItem" >> beam.Filter(is_multi_item)
            | "ComputeRevenue" >> beam.Map(compute_revenue)
            | "AddDiscount" >> beam.ParDo(AddDiscountDoFn())
        )

# FIX: make timestamps tz-aware (UTC) before converting
def to_timestamped(row):
    t = datetime.fromisoformat(row["event_time"])
    if t.tzinfo is None:
        t = t.replace(tzinfo=timezone.utc)
    return beam.window.TimestampedValue(row, Timestamp.from_utc_datetime(t))

def to_kv_sku_rev(row):
    return (row["sku"], row["final_revenue"])

def format_with_window(el, w=beam.DoFn.WindowParam):
    sku, rev = el
    return json.dumps({
        "window_start": w.start.to_utc_datetime().isoformat(),
        "window_end": w.end.to_utc_datetime().isoformat(),
        "sku": sku,
        "window_revenue": rev
    })

# --- paths ---
csv_path = "/content/data/sales.csv"
assert os.path.exists(csv_path), f"CSV not found at {csv_path}. Run the earlier cell that creates it."

out_root = "/content/out"
window_dir = os.path.join(out_root, "windowed")
# Beam writes shards from a *file prefix*, not a directory. We'll use window_dir + '/part'
window_prefix = os.path.join(window_dir, "part")

# ensure parents exist so listing doesn't fail
os.makedirs(out_root, exist_ok=True)
shutil.rmtree(window_dir, ignore_errors=True)  # clean previous run
os.makedirs(window_dir, exist_ok=True)

pipeline_options = PipelineOptions(flags=[], streaming=False, save_main_session=True)

with beam.Pipeline(options=pipeline_options) as p:
    lines = p | "ReadCSV-win" >> ReadFromText(csv_path)

    enriched = (
        lines
        | "Composite-win" >> CleanAndEnrich()
        | "AttachTS" >> beam.Map(to_timestamped)
        | "WindowInto30s" >> beam.WindowInto(FixedWindows(30))
        | "KVskuRev" >> beam.Map(to_kv_sku_rev)
        | "SumBySKU" >> beam.CombinePerKey(sum)
    )

    _ = (enriched
         | "FmtWindowJSON" >> beam.Map(format_with_window)
         | "WriteWindowed" >> WriteToText(window_prefix, file_name_suffix=".json"))

# list written shards
shards = sorted(glob.glob(os.path.join(window_dir, "*.json*")))
print("Windowed results shards:", shards)
for fp in shards:
    print("----", os.path.basename(fp))
    with open(fp) as f:
        print(f.read())


Windowed results shards: ['/content/out/windowed/part-00000-of-00001.json']
---- part-00000-of-00001.json
{"window_start": "2025-10-26T12:00:00", "window_end": "2025-10-26T12:00:30", "sku": "A100", "window_revenue": 20.0}
{"window_start": "2025-10-26T12:01:30", "window_end": "2025-10-26T12:02:00", "sku": "A100", "window_revenue": 20.0}
{"window_start": "2025-10-26T12:00:30", "window_end": "2025-10-26T12:01:00", "sku": "B200", "window_revenue": 20.25}
{"window_start": "2025-10-26T12:01:00", "window_end": "2025-10-26T12:01:30", "sku": "B200", "window_revenue": 33.75}

