Bootstrap: repo root, paths, logger

In [1]:
# [CELL 02-00] Bootstrap: repo root + paths + logger

import json
import time
import uuid
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd

t0 = datetime.now()
print(f"[CELL 02-00] start={t0.isoformat(timespec='seconds')}")
print("[CELL 02-00] CWD:", Path.cwd().resolve())

def find_repo_root(start: Path) -> Path:
    start = start.resolve()
    for p in [start, *start.parents]:
        if (p / "PROJECT_STATE.md").exists():
            return p
    raise RuntimeError("Could not find PROJECT_STATE.md. Open notebook from within the repo.")

REPO_ROOT = find_repo_root(Path.cwd())
print("[CELL 02-00] REPO_ROOT:", REPO_ROOT)

PATHS = {
    "PROJECT_STATE": REPO_ROOT / "PROJECT_STATE.md",
    "META_REGISTRY": REPO_ROOT / "meta.json",
    "DATA_RAW": REPO_ROOT / "data" / "raw",
    "DATA_INTERIM": REPO_ROOT / "data" / "interim",
    "DATA_PROCESSED": REPO_ROOT / "data" / "processed",
    "REPORTS": REPO_ROOT / "reports",
}
for k, v in PATHS.items():
    print(f"[CELL 02-00] {k}={v}")

def cell_start(cell_id: str, title: str, **kwargs: Any) -> float:
    t = time.time()
    print(f"\n[{cell_id}] {title}")
    print(f"[{cell_id}] start={datetime.now().isoformat(timespec='seconds')}")
    for k, v in kwargs.items():
        print(f"[{cell_id}] {k}={v}")
    return t

def cell_end(cell_id: str, t0: float, **kwargs: Any) -> None:
    for k, v in kwargs.items():
        print(f"[{cell_id}] {k}={v}")
    print(f"[{cell_id}] elapsed={time.time()-t0:.2f}s")
    print(f"[{cell_id}] done")

print("[CELL 02-00] done")


[CELL 02-00] start=2026-01-06T22:11:27
[CELL 02-00] CWD: C:\anonymous-users-mooc-session-meta\notebooks
[CELL 02-00] REPO_ROOT: C:\anonymous-users-mooc-session-meta
[CELL 02-00] PROJECT_STATE=C:\anonymous-users-mooc-session-meta\PROJECT_STATE.md
[CELL 02-00] META_REGISTRY=C:\anonymous-users-mooc-session-meta\meta.json
[CELL 02-00] DATA_RAW=C:\anonymous-users-mooc-session-meta\data\raw
[CELL 02-00] DATA_INTERIM=C:\anonymous-users-mooc-session-meta\data\interim
[CELL 02-00] DATA_PROCESSED=C:\anonymous-users-mooc-session-meta\data\processed
[CELL 02-00] REPORTS=C:\anonymous-users-mooc-session-meta\reports
[CELL 02-00] done


JSON IO + hashing + safe artifact hash

In [2]:
# [CELL 02-01] JSON IO + hashing + safe hash (Windows locks) + JSON serializer for Timestamp

t0 = cell_start("CELL 02-01", "JSON IO + hashing helpers (Timestamp-safe)")

import json
import uuid
import hashlib
from pathlib import Path
from typing import Any, Dict

def _json_default(o):
    # pandas Timestamp
    try:
        import pandas as pd
        if isinstance(o, (pd.Timestamp,)):
            return o.isoformat()
    except Exception:
        pass
    # numpy scalars
    try:
        import numpy as np
        if isinstance(o, (np.integer,)):
            return int(o)
        if isinstance(o, (np.floating,)):
            return float(o)
        if isinstance(o, (np.bool_,)):
            return bool(o)
    except Exception:
        pass
    # python datetime/date
    try:
        from datetime import datetime, date
        if isinstance(o, (datetime, date)):
            return o.isoformat()
    except Exception:
        pass
    # fallback
    return str(o)

def write_json_atomic(path: Path, obj: Any, indent: int = 2) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + f".tmp_{uuid.uuid4().hex}")
    with tmp.open("w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=indent, default=_json_default)
    tmp.replace(path)

def read_json(path: Path) -> Any:
    if not path.exists():
        raise RuntimeError(f"Missing JSON file: {path}")
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)

def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            b = f.read(chunk_size)
            if not b:
                break
            h.update(b)
    return h.hexdigest()

def safe_artifact_record(path: Path) -> Dict[str, Any]:
    rec = {"path": str(path), "bytes": int(path.stat().st_size), "sha256": None, "sha256_error": None}
    try:
        rec["sha256"] = sha256_file(path)
    except PermissionError as e:
        rec["sha256_error"] = f"PermissionError: {e}"
        print("[CELL 02-01] WARN: locked, cannot hash now:", path)
    return rec

cell_end("CELL 02-01", t0)



[CELL 02-01] JSON IO + hashing helpers (Timestamp-safe)
[CELL 02-01] start=2026-01-06T22:11:27
[CELL 02-01] elapsed=0.00s
[CELL 02-01] done


Start run + init report/config/manifest + meta.json append

In [3]:
# [CELL 02-02] Start run + init files + meta.json append-only

t0 = cell_start("CELL 02-02", "Start run")

NOTEBOOK_NAME = "02_sessionize_mars"
RUN_TAG = datetime.now().strftime("%Y%m%d_%H%M%S")
RUN_ID = uuid.uuid4().hex

OUT_DIR = PATHS["REPORTS"] / NOTEBOOK_NAME / RUN_TAG
OUT_DIR.mkdir(parents=True, exist_ok=True)

REPORT_PATH = OUT_DIR / "report.json"
CONFIG_PATH = OUT_DIR / "config.json"
MANIFEST_PATH = OUT_DIR / "manifest.json"

# Inputs from Notebook 01 outputs (fixed)
DUCKDB_PATH = PATHS["DATA_INTERIM"] / "mars.duckdb"
RAW_VIEW = "mars_events_raw"  # created in Notebook 01

# Outputs (fixed structure)
SESS_DIR = PATHS["DATA_PROCESSED"] / "mars" / "sessions"
SESS_DIR.mkdir(parents=True, exist_ok=True)

CFG = {
    "notebook": NOTEBOOK_NAME,
    "run_id": RUN_ID,
    "run_tag": RUN_TAG,
    "inputs": {"duckdb_path": str(DUCKDB_PATH), "raw_view": RAW_VIEW},
    "outputs": {"sessions_dir": str(SESS_DIR), "reports_out_dir": str(OUT_DIR)},
    "sessionization": {
        "gap_candidates_minutes": [5, 10, 30, 60],
        "chosen_gap_minutes": 30,  # default; we will justify using stats in this notebook
    }
}

write_json_atomic(CONFIG_PATH, CFG)

report = {
    "run_id": RUN_ID,
    "notebook": NOTEBOOK_NAME,
    "run_tag": RUN_TAG,
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "repo_root": str(REPO_ROOT),
    "metrics": {},
    "key_findings": [],
    "sanity_samples": {},
    "data_fingerprints": {},
    "notes": [],
}
write_json_atomic(REPORT_PATH, report)

manifest = {"run_id": RUN_ID, "notebook": NOTEBOOK_NAME, "run_tag": RUN_TAG, "artifacts": []}
write_json_atomic(MANIFEST_PATH, manifest)

# meta.json append-only
META_PATH = PATHS["META_REGISTRY"]
if not META_PATH.exists():
    write_json_atomic(META_PATH, {"schema_version": 1, "runs": []})
meta = read_json(META_PATH)
meta["runs"].append({
    "run_id": RUN_ID,
    "notebook": NOTEBOOK_NAME,
    "run_tag": RUN_TAG,
    "out_dir": str(OUT_DIR),
    "created_at": datetime.now().isoformat(timespec="seconds"),
})
write_json_atomic(META_PATH, meta)

cell_end("CELL 02-02", t0, out_dir=str(OUT_DIR), report=str(REPORT_PATH))



[CELL 02-02] Start run
[CELL 02-02] start=2026-01-06T22:11:27
[CELL 02-02] out_dir=C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127
[CELL 02-02] report=C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\report.json
[CELL 02-02] elapsed=0.02s
[CELL 02-02] done


DuckDB open + verify raw view exists

In [4]:
# [CELL 02-03] DuckDB open + verify raw view exists (writeable for CREATE VIEW later)

t0 = cell_start("CELL 02-03", "Open DuckDB + verify view (read_only=False)", duckdb=str(DUCKDB_PATH), view=RAW_VIEW)

import duckdb

if not DUCKDB_PATH.exists():
    raise RuntimeError(f"Missing DuckDB file: {DUCKDB_PATH}. Run Notebook 01 first.")

con = duckdb.connect(str(DUCKDB_PATH), read_only=False)

# Verify view exists and count rows
n = con.execute(f"SELECT COUNT(*) FROM {RAW_VIEW}").fetchone()[0]
schema_df = con.execute(f"DESCRIBE {RAW_VIEW}").fetchdf()

print("[CELL 02-03] rows:", int(n))
print("[CELL 02-03] schema head:")
print(schema_df.head(40).to_string(index=False))

cell_end("CELL 02-03", t0, rows=int(n), n_cols=int(schema_df.shape[0]))



[CELL 02-03] Open DuckDB + verify view (read_only=False)
[CELL 02-03] start=2026-01-06T22:11:27
[CELL 02-03] duckdb=C:\anonymous-users-mooc-session-meta\data\interim\mars.duckdb
[CELL 02-03] view=mars_events_raw
[CELL 02-03] rows: 3659
[CELL 02-03] schema head:
     column_name column_type null  key default extra
         user_id      BIGINT  YES None    None  None
         item_id      BIGINT  YES None    None  None
watch_percentage      BIGINT  YES None    None  None
      created_at     VARCHAR  YES None    None  None
          rating      BIGINT  YES None    None  None
   __source_file     VARCHAR  YES None    None  None
[CELL 02-03] rows=3659
[CELL 02-03] n_cols=6
[CELL 02-03] elapsed=0.06s
[CELL 02-03] done


Auto-detect user/item/timestamp/rating columns (no assumptions)

In [5]:
# [CELL 02-04] Auto-detect key columns (user/item/timestamp/rating)

t0 = cell_start("CELL 02-04", "Detect user/item/timestamp/rating columns")

cols = schema_df["column_name"].tolist()

def guess_col(candidates):
    for pat in candidates:
        for c in cols:
            if pat in c.lower():
                return c
    return None

guess = {
    "user": guess_col(["user", "learner", "student", "uid"]),
    "item": guess_col(["item", "course", "resource", "content", "cid", "iid"]),
    "rating": guess_col(["rating", "rate", "score", "stars"]),
    "ts": guess_col(["timestamp", "time", "date", "created", "ts"]),
}

print("[CELL 02-04] guessed:", guess)

missing = [k for k in ["user", "item", "ts"] if guess[k] is None]
if missing:
    raise RuntimeError(f"Cannot sessionize: missing required columns {missing}. Columns={cols}")

USER_COL = guess["user"]
ITEM_COL = guess["item"]
TS_COL = guess["ts"]
RATING_COL = guess["rating"]  # optional

print("[CELL 02-04] USER_COL:", USER_COL)
print("[CELL 02-04] ITEM_COL:", ITEM_COL)
print("[CELL 02-04] TS_COL:", TS_COL)
print("[CELL 02-04] RATING_COL:", RATING_COL)

cell_end("CELL 02-04", t0)



[CELL 02-04] Detect user/item/timestamp/rating columns
[CELL 02-04] start=2026-01-06T22:11:27
[CELL 02-04] guessed: {'user': 'user_id', 'item': 'item_id', 'rating': 'rating', 'ts': 'created_at'}
[CELL 02-04] USER_COL: user_id
[CELL 02-04] ITEM_COL: item_id
[CELL 02-04] TS_COL: created_at
[CELL 02-04] RATING_COL: rating
[CELL 02-04] elapsed=0.00s
[CELL 02-04] done


Normalize timestamps into a reliable ts (TIMESTAMP) + validate parse

In [6]:
# [CELL 02-05] Normalize timestamps -> ts (TIMESTAMP) + validate parse

t0 = cell_start("CELL 02-05", "Timestamp normalization + parse validation")

# DuckDB normalization strategy (tries multiple paths):
# 1) TRY_CAST to TIMESTAMP
# 2) If string digits -> TRY_CAST BIGINT -> to_timestamp (sec or ms)
# 3) If numeric -> treat as sec or ms

norm_view = "mars_events_norm"

con.close()
con = duckdb.connect(str(DUCKDB_PATH), read_only=False)
con.execute(f"DROP VIEW IF EXISTS {norm_view};")

ts_expr = f"""
CASE
  -- already timestamp-ish
  WHEN TRY_CAST({TS_COL} AS TIMESTAMP) IS NOT NULL THEN TRY_CAST({TS_COL} AS TIMESTAMP)

  -- string digits -> epoch sec/ms
  WHEN TRY_CAST({TS_COL} AS BIGINT) IS NOT NULL THEN
    CASE
      WHEN TRY_CAST({TS_COL} AS BIGINT) > 1000000000000 THEN to_timestamp(TRY_CAST({TS_COL} AS BIGINT) / 1000)
      ELSE to_timestamp(TRY_CAST({TS_COL} AS BIGINT))
    END

  -- fallback: NULL
  ELSE NULL
END
"""

rating_select = f", {RATING_COL} AS rating" if RATING_COL is not None else ""

con.execute(f"""
CREATE VIEW {norm_view} AS
SELECT
  {USER_COL} AS user_id,
  {ITEM_COL} AS item_id
  {rating_select},
  {TS_COL} AS ts_raw,
  {ts_expr} AS ts
FROM {RAW_VIEW}
""")

parse_stats = con.execute(f"""
SELECT
  COUNT(*) AS n,
  SUM(CASE WHEN ts IS NULL THEN 1 ELSE 0 END) AS n_ts_null,
  SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS n_user_null,
  SUM(CASE WHEN item_id IS NULL THEN 1 ELSE 0 END) AS n_item_null
FROM {norm_view}
""").fetchdf().iloc[0].to_dict()

print("[CELL 02-05] parse_stats:", {k:int(v) for k,v in parse_stats.items()})

if int(parse_stats["n_user_null"]) > 0 or int(parse_stats["n_item_null"]) > 0:
    raise RuntimeError("Found NULL user_id or item_id. Cannot proceed safely.")

# We require timestamp to sessionize; if failures exist, show examples and stop.
n_ts_null = int(parse_stats["n_ts_null"])
if n_ts_null > 0:
    bad = con.execute(f"""
    SELECT user_id, item_id, ts_raw
    FROM {norm_view}
    WHERE ts IS NULL
    LIMIT 20
    """).fetchdf()
    print("[CELL 02-05] ts_parse_failed_examples (first 20):")
    print(bad.to_string(index=False))
    raise RuntimeError(f"Timestamp parse failed for {n_ts_null} rows. Fix TS parsing before sessionization.")

minmax = con.execute(f"SELECT MIN(ts) AS min_ts, MAX(ts) AS max_ts FROM {norm_view}").fetchdf().iloc[0].to_dict()
print("[CELL 02-05] ts_minmax:", minmax)

cell_end("CELL 02-05", t0, min_ts=str(minmax["min_ts"]), max_ts=str(minmax["max_ts"]))



[CELL 02-05] Timestamp normalization + parse validation
[CELL 02-05] start=2026-01-06T22:11:27
[CELL 02-05] parse_stats: {'n': 3659, 'n_ts_null': 0, 'n_user_null': 0, 'n_item_null': 0}
[CELL 02-05] ts_minmax: {'min_ts': Timestamp('2018-09-28 14:38:15+0800', tz='Asia/Singapore'), 'max_ts': Timestamp('2021-09-20 16:26:06+0800', tz='Asia/Singapore')}
[CELL 02-05] min_ts=2018-09-28 14:38:15+08:00
[CELL 02-05] max_ts=2021-09-20 16:26:06+08:00
[CELL 02-05] elapsed=0.11s
[CELL 02-05] done


Gap sensitivity (5/10/30/60m): sessions + mean length

In [7]:
# [CELL 02-06] Gap sensitivity analysis (correct n_events + session stats)

t0 = cell_start("CELL 02-06", "Gap sensitivity analysis (5/10/30/60m)")



gap_minutes_list = CFG["sessionization"]["gap_candidates_minutes"]
rows = []

for gm in gap_minutes_list:
    gap_sec = int(gm * 60)

    q = f"""
    WITH ordered AS (
      SELECT
        user_id, item_id, ts,
        LAG(ts) OVER (PARTITION BY user_id ORDER BY ts, item_id) AS prev_ts
      FROM {norm_view}
    ),
    flags AS (
      SELECT
        user_id, item_id, ts,
        CASE
          WHEN prev_ts IS NULL THEN 1
          WHEN EXTRACT(EPOCH FROM (ts - prev_ts)) > {gap_sec} THEN 1
          ELSE 0
        END AS new_sess
      FROM ordered
    ),
    sess AS (
      SELECT
        user_id,
        item_id,
        ts,
        SUM(new_sess) OVER (PARTITION BY user_id ORDER BY ts, item_id ROWS UNBOUNDED PRECEDING) AS sess_num
      FROM flags
    ),
    sess_sizes AS (
      SELECT user_id, sess_num, COUNT(*) AS cnt
      FROM sess
      GROUP BY 1,2
    )
    SELECT
      {gm} AS gap_min,
      (SELECT COUNT(*) FROM sess) AS n_events,
      (SELECT COUNT(DISTINCT user_id) FROM sess) AS n_users,
      (SELECT COUNT(*) FROM sess_sizes) AS n_sessions,
      AVG(cnt) AS avg_events_per_session,
      approx_quantile(cnt, 0.50) AS p50_events_per_session,
      approx_quantile(cnt, 0.90) AS p90_events_per_session,
      approx_quantile(cnt, 0.99) AS p99_events_per_session
    FROM sess_sizes
    """
    out = con.execute(q).fetchdf().iloc[0].to_dict()
    rows.append(out)

    print(f"[CELL 02-06] gap_min {gm} -> "
          f"n_sessions={out['n_sessions']}, avg={out['avg_events_per_session']:.4f}, "
          f"p50={out['p50_events_per_session']}, p90={out['p90_events_per_session']}")

sens = pd.DataFrame(rows).sort_values("gap_min").reset_index(drop=True)

# Sanity: n_events should be constant across gaps
if sens["n_events"].nunique() != 1:
    raise RuntimeError(f"n_events differs across gaps (unexpected). Values={sens['n_events'].tolist()}")

print("\n[CELL 02-06] sensitivity table (correct n_events):")
print(sens.to_string(index=False))

# Decision rule (elbow/plateau heuristic)
# We choose the smallest gap where further increases yield small session-count reduction.
sens2 = sens.copy()
sens2["delta_sessions_vs_prev"] = sens2["n_sessions"].diff()
sens2["rel_drop_vs_prev"] = (sens2["n_sessions"].diff() / sens2["n_sessions"].shift(1)).abs()
print("\n[CELL 02-06] deltas:")
print(sens2[["gap_min", "n_sessions", "delta_sessions_vs_prev", "rel_drop_vs_prev"]].to_string(index=False))

# Default decision: 30m (config will be updated later if you decide otherwise)
GAP_RECOMMENDED = 30
print(f"\n[CELL 02-06] recommended_gap_minutes={GAP_RECOMMENDED} (stable region beyond 30m)")

cell_end("CELL 02-06", t0, recommended_gap_minutes=GAP_RECOMMENDED)



[CELL 02-06] Gap sensitivity analysis (5/10/30/60m)
[CELL 02-06] start=2026-01-06T22:11:27
[CELL 02-06] gap_min 5 -> n_sessions=1836.0, avg=1.9929, p50=1.0, p90=4.0
[CELL 02-06] gap_min 10 -> n_sessions=1523.0, avg=2.4025, p50=1.0, p90=5.0
[CELL 02-06] gap_min 30 -> n_sessions=1322.0, avg=2.7678, p50=1.0, p90=6.0
[CELL 02-06] gap_min 60 -> n_sessions=1275.0, avg=2.8698, p50=1.0, p90=6.0

[CELL 02-06] sensitivity table (correct n_events):
 gap_min  n_events  n_users  n_sessions  avg_events_per_session  p50_events_per_session  p90_events_per_session  p99_events_per_session
     5.0    3659.0    822.0      1836.0                1.992919                     1.0                     4.0                    13.0
    10.0    3659.0    822.0      1523.0                2.402495                     1.0                     5.0                    18.0
    30.0    3659.0    822.0      1322.0                2.767776                     1.0                     6.0                    26.0
    60.0    3

Report cell: write Gap Sensitivity analysis into report.json

In [8]:
# [CELL 02-06A] Report: Gap sensitivity analysis (write rationale + table)

t0 = cell_start("CELL 02-06A", "Write Gap sensitivity analysis section to report.json")

report = read_json(REPORT_PATH)

# Convert to plain python types for JSON
sens_records = sens.copy()
for c in sens_records.columns:
    # ensure JSON-serializable
    if sens_records[c].dtype.kind in ["i", "u"]:
        sens_records[c] = sens_records[c].astype(int)
    elif sens_records[c].dtype.kind == "f":
        sens_records[c] = sens_records[c].astype(float)

gap_section = {
    "what_we_tested": "Inactivity-gap sessionization sensitivity on MARS (user-level) using candidate gaps in minutes.",
    "candidates_minutes": CFG["sessionization"]["gap_candidates_minutes"],
    "table": sens_records.to_dict(orient="records"),
    "interpretation": (
        "We examine how the number of inferred sessions and session-length statistics change with the gap threshold. "
        "A very small gap fragments behavior into many micro-sessions; a very large gap risks merging separate intents. "
        "We choose a gap in the stable/plateau region where increasing the gap further yields only small reductions in "
        "session count and similar session-length quantiles."
    ),
    "decision": {
        "recommended_gap_minutes": 30,
        "reasoning": (
            "The session count drops substantially from 5→10 minutes, and still decreases from 10→30, "
            "but changes only slightly from 30→60 minutes (near-plateau). "
            "Therefore 30 minutes is a defensible default that avoids excessive fragmentation (5/10m) "
            "and avoids unnecessary over-merging (60m)."
        ),
        "reporting_commitment": "We will use 30m as primary and report sensitivity at 10m and 60m."
    }
}

report["sanity_samples"]["mars_gap_sensitivity"] = gap_section
report["key_findings"].append("Completed gap sensitivity analysis; recommend 30-minute inactivity threshold for MARS sessionization (with sensitivity reporting).")

write_json_atomic(REPORT_PATH, report)
print("[CELL 02-06A] updated_report:", REPORT_PATH)

cell_end("CELL 02-06A", t0)



[CELL 02-06A] Write Gap sensitivity analysis section to report.json
[CELL 02-06A] start=2026-01-06T22:11:28
[CELL 02-06A] updated_report: C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\report.json
[CELL 02-06A] elapsed=0.01s
[CELL 02-06A] done


Choose gap (default 30m) and sessionize events + sessions tables

In [10]:
# [CELL 02-07] Sessionize with recommended gap (lock + persist) + build event/session views (UPDATED)

t0 = cell_start("CELL 02-07", "Sessionize using recommended gap (lock + persist)")

# Lock the gap decision (from sensitivity analysis)
GAP_MIN = int(GAP_RECOMMENDED)  # set in CELL 02-06
GAP_SEC = int(GAP_MIN * 60)

# Persist choice into CFG + config.json (reproducibility)
CFG["sessionization"]["chosen_gap_minutes"] = GAP_MIN
write_json_atomic(Path(CONFIG_PATH), CFG)
print("[CELL 02-07] Updated config.json chosen_gap_minutes:", GAP_MIN)

# Output paths (deterministic)
events_out = SESS_DIR / f"events_gap{GAP_MIN}m.parquet"
sessions_out = SESS_DIR / f"sessions_gap{GAP_MIN}m.parquet"

# Names for internal views (optional; for debugging)
events_view = f"mars_events_sessionized_gap{GAP_MIN}m"
sessions_view = f"mars_sessions_gap{GAP_MIN}m"

# Drop old views if exist
con.execute(f"DROP VIEW IF EXISTS {events_view};")
con.execute(f"DROP VIEW IF EXISTS {sessions_view};")

# Build event-level sessionization view
con.execute(f"""
CREATE VIEW {events_view} AS
WITH ordered AS (
  SELECT
    user_id, item_id, ts
    {", rating" if RATING_COL is not None else ""},
    ts_raw,
    LAG(ts) OVER (PARTITION BY user_id ORDER BY ts, item_id) AS prev_ts
  FROM {norm_view}
),
flags AS (
  SELECT
    *,
    CASE
      WHEN prev_ts IS NULL THEN 1
      WHEN EXTRACT(EPOCH FROM (ts - prev_ts)) > {GAP_SEC} THEN 1
      ELSE 0
    END AS new_sess
  FROM ordered
),
sess AS (
  SELECT
    *,
    SUM(new_sess) OVER (
      PARTITION BY user_id ORDER BY ts, item_id ROWS UNBOUNDED PRECEDING
    ) AS sess_num
  FROM flags
),
with_pos AS (
  SELECT
    *,
    ROW_NUMBER() OVER (PARTITION BY user_id, sess_num ORDER BY ts, item_id) AS pos_in_sess,
    COUNT(*) OVER (PARTITION BY user_id, sess_num) AS sess_len
  FROM sess
)
SELECT
  user_id,
  item_id
  {", rating" if RATING_COL is not None else ""},
  ts,
  EXTRACT(EPOCH FROM ts)::BIGINT AS ts_epoch,
  sess_num,
  (CAST(user_id AS VARCHAR) || '_' || LPAD(CAST(sess_num AS VARCHAR), 6, '0')) AS session_id,
  pos_in_sess,
  sess_len,
  ts_raw
FROM with_pos
""")

# Build session-level view
con.execute(f"""
CREATE VIEW {sessions_view} AS
SELECT
  session_id,
  user_id,
  sess_num,
  MIN(ts) AS session_start_ts,
  MAX(ts) AS session_end_ts,
  COUNT(*) AS n_events,
  (EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))))::BIGINT AS duration_sec,
  COUNT(DISTINCT item_id) AS n_unique_items
FROM {events_view}
GROUP BY 1,2,3
""")

# Materialize to Parquet (real artifacts)
SESS_DIR.mkdir(parents=True, exist_ok=True)

events_out_sql = str(events_out).replace("'", "''")
sessions_out_sql = str(sessions_out).replace("'", "''")

con.execute(f"COPY (SELECT * FROM {events_view}) TO '{events_out_sql}' (FORMAT PARQUET);")
con.execute(f"COPY (SELECT * FROM {sessions_view}) TO '{sessions_out_sql}' (FORMAT PARQUET);")

print("[CELL 02-07] wrote:", events_out)
print("[CELL 02-07] wrote:", sessions_out)

# Register stable views pointing to the Parquet outputs (repo-wide downstream dependency)
con.execute("DROP VIEW IF EXISTS mars_events_sessionized;")
con.execute("DROP VIEW IF EXISTS mars_sessions;")

con.execute(f"""
CREATE VIEW mars_events_sessionized AS
SELECT * FROM read_parquet('{events_out_sql}')
""")

con.execute(f"""
CREATE VIEW mars_sessions AS
SELECT * FROM read_parquet('{sessions_out_sql}')
""")

# Quick checks
n_events = int(con.execute("SELECT COUNT(*) FROM mars_events_sessionized").fetchone()[0])
n_sessions = int(con.execute("SELECT COUNT(*) FROM mars_sessions").fetchone()[0])
n_users = int(con.execute("SELECT COUNT(DISTINCT user_id) FROM mars_events_sessionized").fetchone()[0])

print("[CELL 02-07] n_events:", n_events)
print("[CELL 02-07] n_sessions:", n_sessions)
print("[CELL 02-07] n_users:", n_users)

cell_end("CELL 02-07", t0, gap_minutes=GAP_MIN, n_events=n_events, n_sessions=n_sessions, n_users=n_users)



[CELL 02-07] Sessionize using recommended gap (lock + persist)
[CELL 02-07] start=2026-01-06T22:14:00
[CELL 02-07] Updated config.json chosen_gap_minutes: 30
[CELL 02-07] wrote: C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\events_gap30m.parquet
[CELL 02-07] wrote: C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\sessions_gap30m.parquet
[CELL 02-07] n_events: 3659
[CELL 02-07] n_sessions: 1322
[CELL 02-07] n_users: 822
[CELL 02-07] gap_minutes=30
[CELL 02-07] n_events=3659
[CELL 02-07] n_sessions=1322
[CELL 02-07] n_users=822
[CELL 02-07] elapsed=0.15s
[CELL 02-07] done


Save processed Parquet outputs (events + sessions)

In [11]:
# [CELL 02-08] Save processed Parquet outputs

t0 = cell_start("CELL 02-08", "Write processed parquet (events + sessions)")

events_out = SESS_DIR / f"events_gap{GAP_MIN}m.parquet"
sessions_out = SESS_DIR / f"sessions_gap{GAP_MIN}m.parquet"

# Export via DuckDB COPY (fast + consistent)
con.execute(f"COPY (SELECT * FROM {events_view}) TO '{str(events_out)}' (FORMAT PARQUET);")
con.execute(f"COPY (SELECT * FROM {sessions_view}) TO '{str(sessions_out)}' (FORMAT PARQUET);")

print("[CELL 02-08] wrote:", events_out)
print("[CELL 02-08] wrote:", sessions_out)

cell_end("CELL 02-08", t0, events_parquet=str(events_out), sessions_parquet=str(sessions_out))



[CELL 02-08] Write processed parquet (events + sessions)
[CELL 02-08] start=2026-01-06T22:14:04
[CELL 02-08] wrote: C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\events_gap30m.parquet
[CELL 02-08] wrote: C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\sessions_gap30m.parquet
[CELL 02-08] events_parquet=C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\events_gap30m.parquet
[CELL 02-08] sessions_parquet=C:\anonymous-users-mooc-session-meta\data\processed\mars\sessions\sessions_gap30m.parquet
[CELL 02-08] elapsed=0.09s
[CELL 02-08] done


Create DuckDB views over processed Parquet (processed → duckdb)

In [12]:
# [CELL 02-09] Register processed parquet as views (writeable connection, then close)

t0 = cell_start("CELL 02-09", "Register processed parquet as DuckDB views")

con.close()  # close read-only
con = duckdb.connect(str(DUCKDB_PATH), read_only=False)

# Views pointing to processed parquet (no duplication)
con.execute(f"DROP VIEW IF EXISTS mars_events_sessionized;")
con.execute(f"DROP VIEW IF EXISTS mars_sessions;")

con.execute(f"""
CREATE VIEW mars_events_sessionized AS
SELECT * FROM read_parquet('{str(events_out).replace("'", "''")}')
""")
con.execute(f"""
CREATE VIEW mars_sessions AS
SELECT * FROM read_parquet('{str(sessions_out).replace("'", "''")}')
""")

# quick check
chk = con.execute("SELECT COUNT(*) AS n_events FROM mars_events_sessionized").fetchone()[0]
chk2 = con.execute("SELECT COUNT(*) AS n_sessions FROM mars_sessions").fetchone()[0]
print("[CELL 02-09] registered views ok:", int(chk), int(chk2))

con.close()
print("[CELL 02-09] closed DuckDB connection (avoid Windows lock)")

cell_end("CELL 02-09", t0)



[CELL 02-09] Register processed parquet as DuckDB views
[CELL 02-09] start=2026-01-06T22:14:07
[CELL 02-09] registered views ok: 3659 1322
[CELL 02-09] closed DuckDB connection (avoid Windows lock)
[CELL 02-09] elapsed=0.11s
[CELL 02-09] done


Plots: sensitivity + session length histogram (Matplotlib)

In [14]:
# [CELL 02-10] Plots (matplotlib only) + save under run artifacts (re-open DuckDB)

t0 = cell_start("CELL 02-10", "Plots: gap sensitivity + session length distribution")

import matplotlib.pyplot as plt
import duckdb

PLOTS_DIR = OUT_DIR / "plots"
PLOTS_DIR.mkdir(parents=True, exist_ok=True)

# Plot 1: sessions vs gap
plt.figure()
plt.plot(sens["gap_min"], sens["n_sessions"], marker="o")
plt.xlabel("Gap (minutes)")
plt.ylabel("# sessions")
plt.title("MARS: session count vs gap threshold")
plt.tight_layout()
p1 = PLOTS_DIR / "gap_sensitivity_sessions.png"
plt.savefig(p1, dpi=200)
plt.close()
print("[CELL 02-10] saved:", p1)

# Plot 2: session length histogram (chosen gap) — open a fresh connection
con_plot = duckdb.connect(str(DUCKDB_PATH), read_only=True)

lens = con_plot.execute(
    "SELECT sess_len FROM mars_events_sessionized"
).fetchdf()["sess_len"].astype(int)

plt.figure()
plt.hist(lens, bins=50)
plt.xlabel("Session length (#events)")
plt.ylabel("Count")
plt.title(f"MARS: session length distribution (gap={GAP_MIN}m)")
plt.tight_layout()
p2 = PLOTS_DIR / f"session_length_hist_gap{GAP_MIN}m.png"
plt.savefig(p2, dpi=200)
plt.close()
print("[CELL 02-10] saved:", p2)

con_plot.close()
print("[CELL 02-10] closed DuckDB connection (plots)")

cell_end("CELL 02-10", t0, plots_dir=str(PLOTS_DIR))



[CELL 02-10] Plots: gap sensitivity + session length distribution
[CELL 02-10] start=2026-01-06T22:14:46
[CELL 02-10] saved: C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\plots\gap_sensitivity_sessions.png
[CELL 02-10] saved: C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\plots\session_length_hist_gap30m.png
[CELL 02-10] closed DuckDB connection (plots)
[CELL 02-10] plots_dir=C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\plots
[CELL 02-10] elapsed=0.35s
[CELL 02-10] done


Update report + manifest + close out

In [16]:
# [CELL 02-11] Write report + manifest (self-contained; opens its own DuckDB connection)

t0 = cell_start("CELL 02-11", "Write report + manifest")

import duckdb

report = read_json(Path(REPORT_PATH))
manifest = read_json(Path(MANIFEST_PATH))

# Add artifacts (parquets + plots)
manifest["artifacts"].append(safe_artifact_record(events_out))
manifest["artifacts"].append(safe_artifact_record(sessions_out))
manifest["artifacts"].append(safe_artifact_record(PLOTS_DIR / "gap_sensitivity_sessions.png"))
manifest["artifacts"].append(safe_artifact_record(PLOTS_DIR / f"session_length_hist_gap{GAP_MIN}m.png"))

# Key findings / sanity
report["key_findings"].append(
    f"Sessionized MARS with gap={GAP_MIN} minutes; wrote processed parquets and registered DuckDB views "
    f"mars_events_sessionized, mars_sessions."
)
report["sanity_samples"]["gap_sensitivity_table"] = sens.to_dict(orient="records")
report["sanity_samples"]["chosen_gap_minutes"] = int(GAP_MIN)
report["sanity_samples"]["outputs"] = {
    "events_parquet": str(events_out),
    "sessions_parquet": str(sessions_out),
}

# Open a fresh connection just for this cell
con_rep = duckdb.connect(str(DUCKDB_PATH), read_only=True)

sample_sessions_df = con_rep.execute("""
SELECT *
FROM mars_sessions
ORDER BY n_events DESC
LIMIT 5
""").fetchdf()

report["sanity_samples"]["top5_sessions_by_events"] = sample_sessions_df.to_dict(orient="records")

con_rep.close()
print("[CELL 02-11] closed DuckDB connection (report)")

write_json_atomic(Path(REPORT_PATH), report)
write_json_atomic(Path(MANIFEST_PATH), manifest)

print("[CELL 02-11] updated:", REPORT_PATH)
print("[CELL 02-11] updated:", MANIFEST_PATH)

cell_end("CELL 02-11", t0, n_artifacts=len(manifest["artifacts"]))



[CELL 02-11] Write report + manifest
[CELL 02-11] start=2026-01-06T22:16:47
[CELL 02-11] closed DuckDB connection (report)
[CELL 02-11] updated: C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\report.json
[CELL 02-11] updated: C:\anonymous-users-mooc-session-meta\reports\02_sessionize_mars\20260106_221127\manifest.json
[CELL 02-11] n_artifacts=4
[CELL 02-11] elapsed=0.09s
[CELL 02-11] done
