In [2]:
# Stage 1: I mount Drive and preview 5 rows.
from google.colab import drive
drive.mount('/content/drive')

import os, pandas as pd
FILE_PATH = '/content/drive/MyDrive/e-Shop Click Dataset/e-shop clothing 2008.csv'  # I set the exact path.

assert os.path.exists(FILE_PATH), "Path not found; check the folder/name."
df = pd.read_csv(FILE_PATH, nrows=5, engine='python', sep=None)  # I let pandas infer the delimiter.
print("Using:", FILE_PATH)
print("Columns:", list(df.columns))
df.head()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using: /content/drive/MyDrive/e-Shop Click Dataset/e-shop clothing 2008.csv
Columns: ['year', 'month', 'day', 'order', 'country', 'session ID', 'page 1 (main category)', 'page 2 (clothing model)', 'colour', 'location', 'model photography', 'price', 'price 2', 'page']


Unnamed: 0,year,month,day,order,country,session ID,page 1 (main category),page 2 (clothing model),colour,location,model photography,price,price 2,page
0,2008,4,1,1,29,1,1,A13,1,5,1,28,2,1
1,2008,4,1,2,29,1,1,A16,1,6,1,33,2,1
2,2008,4,1,3,29,1,2,B4,10,2,1,52,1,1
3,2008,4,1,4,29,1,2,B17,6,6,2,38,2,1
4,2008,4,1,5,29,1,2,B8,4,3,2,52,1,1


In [3]:
# Stage 2: I build a small JSONL sample with 'ts' for windowing.
import pandas as pd, json, numpy as np

cols = ['year','month','day','order','country','session ID',
        'page 1 (main category)','page 2 (clothing model)','price','price 2','page']
df = pd.read_csv(FILE_PATH, usecols=cols, engine='python', sep=None)

df = df.rename(columns={'session ID':'session_id',
                        'page 1 (main category)':'category',
                        'page 2 (clothing model)':'model',
                        'price 2':'price2'})
df['price_final'] = df['price'].fillna(df['price2'])
df = df.sample(n=min(5000, len(df)), random_state=1).reset_index(drop=True)

dt = pd.to_datetime(dict(year=df.year, month=df.month, day=df.day))
secs = (df['order'] % (60*30)).astype(int)  # I spread events within 30 mins
df['ts'] = (dt + pd.to_timedelta(secs, unit='s')).dt.tz_localize('UTC').astype(str)

out = '/content/eshop_sample.jsonl'
with open(out, 'w') as f:
  for r in df[['session_id','country','category','model','price_final','page','ts']].itertuples(index=False):
    f.write(json.dumps({"session_id":r[0],"country":r[1],"category":r[2],
                        "model":r[3],"price":None if pd.isna(r[4]) else float(r[4]),
                        "page":int(r[5]),"ts":r[6],"event_type":"view"}) + "\n")

print("Wrote records:", len(df), "->", out)


Wrote records: 5000 -> /content/eshop_sample.jsonl


In [4]:
# Stage 3: I install Beam and do minimal read→write to confirm I/O.
!pip -q install apache-beam==2.58.0

import apache_beam as beam, json, os
from apache_beam.options.pipeline_options import PipelineOptions

SRC = '/content/eshop_sample.jsonl'
OUT_DIR = '/content/beam_out/io'
os.makedirs(OUT_DIR, exist_ok=True)

def parse_json(line):  # I parse each JSON line.
  return json.loads(line)

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  rows = p | 'ReadJSONL' >> beam.io.ReadFromText(SRC)
  objs = rows | 'Parse' >> beam.Map(parse_json)
  (objs
   | 'BackToJSON' >> beam.Map(json.dumps)
   | 'WriteEcho' >> beam.io.WriteToText(f'{OUT_DIR}/echo', num_shards=1))


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.2/15.2 MB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m80.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━



In [5]:
# Stage 4: I apply Map & Filter to clean basic fields.
import apache_beam as beam, json, os
from apache_beam.options.pipeline_options import PipelineOptions

SRC = '/content/eshop_sample.jsonl'
OUT_DIR = '/content/beam_out/mapfilter'
os.makedirs(OUT_DIR, exist_ok=True)

def parse(line): return json.loads(line)  # I parse JSON.
def norm(e):  # I normalize simple fields.
  e['country'] = str(e.get('country','')).upper()
  e['category'] = str(e.get('category','')).strip()
  return e
def is_valid(e):  # I keep rows with essential fields.
  return bool(e.get('session_id')) and bool(e.get('ts')) and int(e.get('page',0))>0

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  (p
   | 'Read' >> beam.io.ReadFromText(SRC)
   | 'Parse' >> beam.Map(parse)
   | 'Normalize' >> beam.Map(norm)
   | 'FilterValid' >> beam.Filter(is_valid)
   | 'ToJSON' >> beam.Map(json.dumps)
   | 'WriteClean' >> beam.io.WriteToText(f'{OUT_DIR}/clean', num_shards=1))


In [6]:
# Stage 4 check: I list output files and print first 5 lines.
import glob, os, itertools

files = sorted(glob.glob('/content/beam_out/mapfilter/clean*'))
print("Files:", [os.path.basename(f) for f in files])
assert files, "No output files found. Re-run Stage 4 cell."

with open(files[0]) as f:
  for line in itertools.islice(f, 5):
    print(line.strip())


Files: ['clean-00000-of-00001']
{"session_id": 13625, "country": "29", "category": "2", "model": "B4", "price": 52.0, "page": 1, "ts": "2008-06-09 00:00:01+00:00", "event_type": "view"}
{"session_id": 23329, "country": "29", "category": "2", "model": "B12", "price": 38.0, "page": 1, "ts": "2008-08-08 00:00:09+00:00", "event_type": "view"}
{"session_id": 19509, "country": "29", "category": "1", "model": "A3", "price": 72.0, "page": 1, "ts": "2008-07-16 00:00:03+00:00", "event_type": "view"}
{"session_id": 14918, "country": "29", "category": "4", "model": "P1", "price": 38.0, "page": 1, "ts": "2008-06-17 00:00:09+00:00", "event_type": "view"}
{"session_id": 1221, "country": "29", "category": "2", "model": "B24", "price": 57.0, "page": 2, "ts": "2008-04-03 00:00:04+00:00", "event_type": "view"}


In [7]:
# Stage 5: I use ParDo to validate and split good vs bad.
import apache_beam as beam, json, os, glob
from apache_beam.options.pipeline_options import PipelineOptions

SRC = '/content/eshop_sample.jsonl'
OUT = '/content/beam_out/pardo'; os.makedirs(OUT, exist_ok=True)

class ValidateDoFn(beam.DoFn):
  def process(self, e):
    ok = bool(e.get('session_id')) and bool(e.get('ts')) and int(e.get('page',0))>0
    if ok: yield e
    else:   yield beam.pvalue.TaggedOutput('bad', e)

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  data = (p | 'Read' >> beam.io.ReadFromText(SRC) | 'Parse' >> beam.Map(json.loads))
  split = data | 'Validate' >> beam.ParDo(ValidateDoFn()).with_outputs('bad', main='good')
  (split.good | 'Good→JSON' >> beam.Map(json.dumps)
              | 'WriteGood' >> beam.io.WriteToText(f'{OUT}/good', num_shards=1))
  (split.bad  | 'Bad→JSON'  >> beam.Map(json.dumps)
              | 'WriteBad'  >> beam.io.WriteToText(f'{OUT}/bad',  num_shards=1))

print(sorted(glob.glob(f'{OUT}/*'))[:4])  # I show the files that were written.


['/content/beam_out/pardo/bad-00000-of-00001', '/content/beam_out/pardo/good-00000-of-00001']


In [8]:
# Stage 6: I define a composite transform and use it.
import apache_beam as beam, json, os
from apache_beam.options.pipeline_options import PipelineOptions

SRC = '/content/eshop_sample.jsonl'
OUT = '/content/beam_out/composite'; os.makedirs(OUT, exist_ok=True)

def norm(e):  # I normalize a couple fields.
  e['country'] = str(e.get('country','')).upper()
  e['category'] = str(e.get('category','')).strip()
  return e

class CleanEnrich(beam.PTransform):
  def expand(self, lines):
    parsed = lines | 'Parse' >> beam.Map(json.loads)
    normed = parsed | 'Normalize' >> beam.Map(norm)
    split = normed | 'Validate' >> beam.ParDo(ValidateDoFn()).with_outputs('bad', main='good')
    return split

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  lines = p | 'ReadJSONL' >> beam.io.ReadFromText(SRC)
  res = lines | 'CleanEnrich' >> CleanEnrich()
  (res.good | 'Good→JSON' >> beam.Map(json.dumps)
            | 'WriteGood' >> beam.io.WriteToText(f'{OUT}/good', num_shards=1))
  (res.bad  | 'Bad→JSON'  >> beam.Map(json.dumps)
            | 'WriteBad'  >> beam.io.WriteToText(f'{OUT}/bad',  num_shards=1))


In [9]:
# Stage 7: I partition by price (high, low, missing).
import apache_beam as beam, json, os
from apache_beam.options.pipeline_options import PipelineOptions

SRC = '/content/eshop_sample.jsonl'; OUT = '/content/beam_out/partition'; os.makedirs(OUT, exist_ok=True)

def part_by_price(e, n):
  p = e.get('price')
  return 2 if p is None else (0 if float(p)>=50 else 1)

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  lines = p | 'Read' >> beam.io.ReadFromText(SRC)
  res = lines | 'CleanEnrich' >> CleanEnrich()
  parts = res.good | 'Partition' >> beam.Partition(part_by_price, 3)
  (parts[0] | 'H→json' >> beam.Map(json.dumps) | 'WriteH' >> beam.io.WriteToText(f'{OUT}/high', num_shards=1))
  (parts[1] | 'L→json' >> beam.Map(json.dumps) | 'WriteL' >> beam.io.WriteToText(f'{OUT}/low', num_shards=1))
  (parts[2] | 'M→json' >> beam.Map(json.dumps) | 'WriteM' >> beam.io.WriteToText(f'{OUT}/missing', num_shards=1))
  (res.bad   | 'WriteBadP' >> beam.io.WriteToText(f'{OUT}/bad', num_shards=1))


In [10]:
# Stage 8: I window by event time (60s) and count per category.
import apache_beam as beam, json, os, datetime
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, TimestampedValue

SRC = '/content/eshop_sample.jsonl'
OUT = '/content/beam_out/window'; os.makedirs(OUT, exist_ok=True)

def to_ts(e):  # I attach event-time from 'ts'.
  dt = datetime.datetime.fromisoformat(e['ts'].replace('Z',''))
  return TimestampedValue(e, dt.timestamp())

with beam.Pipeline(options=PipelineOptions(['--runner=DirectRunner'])) as p:
  good = (p | 'Read' >> beam.io.ReadFromText(SRC)
            | 'CleanEnrich' >> CleanEnrich()).good
  win = (good | 'AttachTS' >> beam.Map(to_ts)
              | 'FixedWin' >> beam.WindowInto(FixedWindows(60)))
  (win
   | 'KeyByCat' >> beam.Map(lambda e: (e['category'], 1))
   | 'Count' >> beam.CombinePerKey(sum)
   | 'ToJSON' >> beam.Map(lambda kv: json.dumps({'category': kv[0], 'count': kv[1]}))
   | 'WriteWin' >> beam.io.WriteToText(f'{OUT}/per_minute_cat', num_shards=1))


In [11]:
# Stage 9: I quickly peek at outputs from all stages.
import glob, itertools

def peek(prefix, n=5):
  files = sorted(glob.glob(prefix + "*"))
  if not files:
    print("No files for", prefix); return
  print("\n==", files[0], "==")
  with open(files[0]) as f:
    for line in itertools.islice(f, n):
      print(line.strip())

peek('/content/beam_out/mapfilter/clean')
peek('/content/beam_out/pardo/good')
peek('/content/beam_out/pardo/bad')
peek('/content/beam_out/partition/high')
peek('/content/beam_out/partition/low')
peek('/content/beam_out/partition/missing')
peek('/content/beam_out/window/per_minute_cat')



== /content/beam_out/mapfilter/clean-00000-of-00001 ==
{"session_id": 13625, "country": "29", "category": "2", "model": "B4", "price": 52.0, "page": 1, "ts": "2008-06-09 00:00:01+00:00", "event_type": "view"}
{"session_id": 23329, "country": "29", "category": "2", "model": "B12", "price": 38.0, "page": 1, "ts": "2008-08-08 00:00:09+00:00", "event_type": "view"}
{"session_id": 19509, "country": "29", "category": "1", "model": "A3", "price": 72.0, "page": 1, "ts": "2008-07-16 00:00:03+00:00", "event_type": "view"}
{"session_id": 14918, "country": "29", "category": "4", "model": "P1", "price": 38.0, "page": 1, "ts": "2008-06-17 00:00:09+00:00", "event_type": "view"}
{"session_id": 1221, "country": "29", "category": "2", "model": "B24", "price": 57.0, "page": 2, "ts": "2008-04-03 00:00:04+00:00", "event_type": "view"}

== /content/beam_out/pardo/good-00000-of-00001 ==
{"session_id": 13625, "country": 29, "category": 2, "model": "B4", "price": 52.0, "page": 1, "ts": "2008-06-09 00:00:01+00

In [12]:
# Stage 10: I zip my outputs so I can download/submit.
import shutil, os

OUT_ROOT = "/content/beam_out"
ZIP_PATH = "/content/beam_results.zip"

# I remove an old zip if present.
if os.path.exists(ZIP_PATH): os.remove(ZIP_PATH)

# I zip the whole output folder (map/filter, pardo, partition, windowing).
shutil.make_archive(ZIP_PATH.replace(".zip",""), 'zip', OUT_ROOT)

print("Zipped:", ZIP_PATH)
print("Folders included:", os.listdir(OUT_ROOT))


Zipped: /content/beam_results.zip
Folders included: ['composite', 'io', 'pardo', 'mapfilter', 'window', 'partition']
