##### 01 — EDA (Source MOOC Dataset: XuetangX raw)

This notebook validates and profiles the **source MOOC dataset** (XuetangX raw user activity) and produces **reproducible EDA artifacts**.

Scope (this notebook):
- Verify `data/raw/xuetangx` contents (raw JSON)
- Stream-parse raw JSON → **Parquet shards** (one-time conversion)
- DuckDB EDA: counts, time range, top actions, distributions
- Export **filtered course-level interactions** (for later notebooks)
- Save plots + tables + dataset metadata into `./reports/01_eda_source_mooc/<RUN_TAG>/`

Out of scope (planned notebooks 04/05):
- Session-gap analysis, sessionization, prefix/target building


Imports (single place for shared imports)

In [None]:
# [CELL 01-01] Imports (UPDATED for XuetangX raw JSON -> Parquet -> DuckDB)

import os
import sys
import json
import time
import math
import platform
from pathlib import Path
from datetime import datetime

import yaml
import pandas as pd

def log(msg: str):
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[01] {ts} | {msg}")


Bootstrap: force repo root + sys.path

In [None]:
# [CELL 01-02] Bootstrap: locate repo root reliably (Windows-safe)

CWD = Path.cwd().resolve()
log(f"Initial CWD: {CWD}")

def find_repo_root(start: Path) -> Path:
    """
    Search upward for repo root.
    Priority: look for PROJECT_STATE.md (most specific).
    Fallback: .git folder.
    """
    for p in [start, *start.parents]:
        if (p / "PROJECT_STATE.md").exists():
            return p
    for p in [start, *start.parents]:
        if (p / ".git").exists():
            return p
    raise FileNotFoundError("Could not find repo root (PROJECT_STATE.md or .git not found upward).")

REPO_ROOT = find_repo_root(CWD)
log(f"REPO_ROOT detected: {REPO_ROOT}")

# Ensure imports work regardless of where Jupyter was launched
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

SRC_DIR = REPO_ROOT / "src"
if SRC_DIR.exists() and str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

checks = {
    "src/": SRC_DIR.exists(),
    "notebooks/": (REPO_ROOT / "notebooks").exists(),
    "PROJECT_STATE.md": (REPO_ROOT / "PROJECT_STATE.md").exists(),
    "src/configs/project.yaml": (REPO_ROOT / "src" / "configs" / "project.yaml").exists(),
}

log("Validation checks:")
for name, exists in checks.items():
    status = "✅" if exists else "❌"
    print(f"  {status} {name}")

if not checks["PROJECT_STATE.md"]:
    raise FileNotFoundError("PROJECT_STATE.md not found in detected repo root — wrong working directory?")

if not checks["src/configs/project.yaml"]:
    raise FileNotFoundError("src/configs/project.yaml not found — please ensure the config exists.")


Load project config (YAML)

In [None]:
# [CELL 01-03] Load config (single source of truth): src/configs/project.yaml

CFG_PATH = REPO_ROOT / "src" / "configs" / "project.yaml"
log(f"Loading config: {CFG_PATH}")

with open(CFG_PATH, "r", encoding="utf-8") as f:
    CFG = yaml.safe_load(f)

# Print top-level keys (no assumptions)
log("Config loaded. Top-level keys:")
print(sorted(list(CFG.keys())))

def get_cfg(d, path, default=None):
    """
    Safe nested getter. Example: get_cfg(CFG, "paths.data_raw")
    """
    cur = d
    for part in path.split("."):
        if isinstance(cur, dict) and part in cur:
            cur = cur[part]
        else:
            return default
    return cur

# Attempt to identify source dataset fields without guessing schema
SOURCE_NAME = (
    get_cfg(CFG, "datasets.source.name")
    or get_cfg(CFG, "dataset.source.name")
    or get_cfg(CFG, "source_dataset.name")
    or get_cfg(CFG, "source.name")
    or None
)
SOURCE_RAW_SUBDIR = (
    get_cfg(CFG, "datasets.source.raw_subdir")
    or get_cfg(CFG, "dataset.source.raw_subdir")
    or get_cfg(CFG, "source_dataset.raw_subdir")
    or get_cfg(CFG, "source.raw_subdir")
    or None
)

RAW_DIR_CFG = (
    get_cfg(CFG, "paths.data_raw")
    or get_cfg(CFG, "paths.raw_dir")
    or get_cfg(CFG, "data.raw_dir")
    or None
)

RAW_DIR = Path(RAW_DIR_CFG) if RAW_DIR_CFG else (REPO_ROOT / "data" / "raw")
RAW_DIR = RAW_DIR.expanduser().resolve()

log(f"SOURCE_NAME (from config): {SOURCE_NAME if SOURCE_NAME else 'we don’t know yet'}")
log(f"RAW_DIR resolved: {RAW_DIR}")

# If a dataset subdir is provided, resolve it (but don't assume it exists)
SOURCE_DIR = (RAW_DIR / SOURCE_RAW_SUBDIR).resolve() if SOURCE_RAW_SUBDIR else RAW_DIR
log(f"SOURCE_DIR resolved: {SOURCE_DIR}")


## A) Raw data inventory + selection

In [None]:
# [CELL 01-04] Inventory data/raw (recursive) + size summary (REAL files only)

if RAW_DIR is None:
    raise RuntimeError("RAW_DIR is None. Config loader did not resolve RAW_DIR.")

if not RAW_DIR.exists():
    raise FileNotFoundError(f"RAW_DIR does not exist: {RAW_DIR}")

def human_bytes(n: int) -> str:
    units = ["B", "KB", "MB", "GB", "TB"]
    f = float(n)
    for u in units:
        if f < 1024 or u == units[-1]:
            return f"{f:.2f} {u}"
        f /= 1024

log(f"Scanning RAW_DIR: {RAW_DIR}")

files = []
for p in RAW_DIR.rglob("*"):
    if p.is_file():
        try:
            files.append((p, p.stat().st_size))
        except OSError:
            files.append((p, None))

total = sum(s for _, s in files if isinstance(s, int))
log(f"Found files: {len(files)}")
log(f"Total size: {human_bytes(total)}")

subdirs = sorted([d for d in RAW_DIR.iterdir() if d.is_dir()])
log(f"Top-level subdirs under RAW_DIR ({len(subdirs)}): {[d.name for d in subdirs]}")

# Size by top-level subdir (1 level)
size_by_subdir = {}
for d in subdirs:
    t = 0
    for p, s in files:
        if s is None:
            continue
        try:
            if d in p.parents:
                t += s
        except Exception:
            pass
    size_by_subdir[d.name] = t

print("\nSize by top-level subdir:")
for k, v in sorted(size_by_subdir.items(), key=lambda kv: kv[1], reverse=True):
    print(f"  - {k:20s} {human_bytes(v)}")


In [None]:
# [CELL 01-05] Select source raw subdir for this notebook (XuetangX)

SOURCE_DATASET_SUBDIR = "xuetangx"
SOURCE_DIR = (RAW_DIR / SOURCE_DATASET_SUBDIR).resolve()

log(f"SOURCE_DIR: {SOURCE_DIR}")
log(f"SOURCE_DIR exists: {SOURCE_DIR.exists()}")

if not SOURCE_DIR.exists():
    raise FileNotFoundError("Expected data/raw/xuetangx folder not found. Place XuetangX raw JSON files there.")

json_files = sorted(SOURCE_DIR.glob("*raw_user_activity*.json"))
log(f"Raw XuetangX JSON files found: {len(json_files)}")

for p in json_files:
    print(f"  - {p.name:45s} {human_bytes(p.stat().st_size)}")

if not json_files:
    raise FileNotFoundError("No *raw_user_activity*.json files found in data/raw/xuetangx/")


In [None]:
# [CELL 01-06] Quick HEAD/TAIL check (confirm JSON array + structure)

def head_tail_bytes(path: Path, n=220):
    size = path.stat().st_size
    with open(path, "rb") as f:
        head = f.read(n)
        f.seek(max(0, size - n))
        tail = f.read(n)
    return head, tail

p = json_files[0]
log(f"Inspecting: {p.name}")
head, tail = head_tail_bytes(p)

print("HEAD:", head)
print("TAIL:", tail)

if not head.lstrip().startswith(b"["):
    log("⚠️ File does not appear to start with '['; structure might differ.")
if b"]" not in tail:
    log("⚠️ File does not appear to end with ']'; structure might differ.")

log("Expected structure per top-level item: [course_id (str), user_map (dict)]")


## B) One-time conversion: raw JSON → Parquet shards

In [None]:
# [CELL 01-07] Ensure required libs are available: ijson, pyarrow, duckdb

import importlib
import subprocess

def ensure(pkg: str, import_name: str = None):
    name = import_name or pkg
    try:
        return importlib.import_module(name)
    except Exception:
        log(f"Installing {pkg} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
        return importlib.import_module(name)

ijson = ensure("ijson")
duckdb = ensure("duckdb")
pa = ensure("pyarrow", "pyarrow")
pq = ensure("pyarrow", "pyarrow.parquet")

log("All dependencies ready: ijson, pyarrow, duckdb")


In [None]:
# [CELL 01-08] Stream-sample raw JSON (K course blocks) and summarize actions/time range

from collections import Counter

def iter_course_blocks(path: Path):
    with open(path, "rb") as f:
        for item in ijson.items(f, "item"):
            yield item

def sample_summary(path: Path, K=5, max_users_per_course=50, max_events_per_object=500):
    action_counts = Counter()
    total_events = 0
    total_users = 0
    total_objects = 0
    min_ts = None
    max_ts = None

    seen_courses = 0
    for blk in iter_course_blocks(path):
        if not (isinstance(blk, list) and len(blk) == 2):
            continue

        course_id, user_map = blk[0], blk[1]
        if not (isinstance(course_id, str) and isinstance(user_map, dict)):
            continue

        seen_courses += 1

        for ui, (user_id, obj_map) in enumerate(user_map.items()):
            if ui >= max_users_per_course:
                break
            total_users += 1

            if not isinstance(obj_map, dict):
                continue

            for obj_id, evs in obj_map.items():
                total_objects += 1
                if not isinstance(evs, list):
                    continue

                for ev in evs[:max_events_per_object]:
                    if not (isinstance(ev, list) and len(ev) == 2):
                        continue
                    action, ts = ev[0], ev[1]
                    action_counts[str(action)] += 1
                    total_events += 1

                    dt = pd.to_datetime(ts, errors="coerce")
                    if pd.notna(dt):
                        if min_ts is None or dt < min_ts:
                            min_ts = dt
                        if max_ts is None or dt > max_ts:
                            max_ts = dt

        if seen_courses >= K:
            break

    print(f"\nSampled file: {path.name}")
    print("Sampled courses:", seen_courses)
    print("Sampled users:", total_users)
    print("Sampled objects:", total_objects)
    print("Sampled events:", total_events)
    print("Sample time range:", min_ts, "->", max_ts)
    print("\nTop actions:")
    for a, c in action_counts.most_common(20):
        print(f"  {a:25s} {c}")

sample_summary(json_files[0], K=5)


In [None]:
# [CELL 01-09] Stream parse -> Parquet shards (one-time conversion)

OUT_DIR = (REPO_ROOT / "data" / "processed" / "xuetangx_events_parquet").resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)
log(f"OUT_DIR: {OUT_DIR}")

def write_shard(rows, shard_path: Path):
    df = pd.DataFrame(rows, columns=["course_id", "user_id", "object_id", "action", "ts"])
    df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
    df = df.dropna(subset=["course_id", "user_id", "action", "ts"])
    table = pa.Table.from_pandas(df, preserve_index=False)
    pq.write_table(table, shard_path)
    return len(df)

def convert_file_to_parquet_shards(path: Path, shard_rows=1_000_000, skip_if_exists=True):
    existing = sorted(OUT_DIR.glob(f"{path.stem}_shard_*.parquet"))
    if skip_if_exists and existing:
        log(f"SKIP {path.name} (found {len(existing)} existing shards)")
        return None, len(existing)

    shard_idx = 0
    buf = []
    total_written = 0
    t0 = time.time()

    with open(path, "rb") as f:
        for blk in ijson.items(f, "item"):
            if not (isinstance(blk, list) and len(blk) == 2):
                continue
            course_id, user_map = blk[0], blk[1]
            if not (isinstance(course_id, str) and isinstance(user_map, dict)):
                continue

            for user_id, obj_map in user_map.items():
                if not isinstance(obj_map, dict):
                    continue
                for object_id, evs in obj_map.items():
                    if not isinstance(evs, list):
                        continue
                    for ev in evs:
                        if not (isinstance(ev, list) and len(ev) == 2):
                            continue
                        action, ts = ev[0], ev[1]
                        buf.append([course_id, str(user_id), str(object_id), str(action), ts])

                        if len(buf) >= shard_rows:
                            shard_path = OUT_DIR / f"{path.stem}_shard_{shard_idx:04d}.parquet"
                            n = write_shard(buf, shard_path)
                            total_written += n
                            buf = []
                            shard_idx += 1

                            elapsed = time.time() - t0
                            log(f"{path.name} | shards={shard_idx} | written={total_written:,} | elapsed={elapsed/60:.1f}m")

    if buf:
        shard_path = OUT_DIR / f"{path.stem}_shard_{shard_idx:04d}.parquet"
        n = write_shard(buf, shard_path)
        total_written += n
        shard_idx += 1

    elapsed = time.time() - t0
    log(f"DONE {path.name} | shards={shard_idx} | total_written={total_written:,} | elapsed={elapsed/60:.1f}m")
    return total_written, shard_idx

totals = []
for p in json_files:
    rows_written, shards = convert_file_to_parquet_shards(p, shard_rows=1_000_000, skip_if_exists=True)
    totals.append((p.name, rows_written, shards))

print("\nConversion summary:")
for name, rows_written, shards in totals:
    rows_txt = f"{rows_written:,}" if isinstance(rows_written, int) else "(skipped)"
    print(f"- {name}: rows={rows_txt} shards={shards}")


## C) DuckDB EDA + filtered interactions export

In [None]:
# [CELL 01-10] DuckDB EDA on Parquet shards (fast)

PARQUET_GLOB = str(OUT_DIR / "*.parquet")
log(f"Reading Parquet via DuckDB: {PARQUET_GLOB}")

con = duckdb.connect(database=":memory:")

con.execute(f"""
CREATE OR REPLACE VIEW events AS
SELECT * FROM read_parquet('{PARQUET_GLOB}');
""")

df_counts = con.execute("""
SELECT
  COUNT(*) AS n_events,
  COUNT(DISTINCT user_id) AS n_users,
  COUNT(DISTINCT course_id) AS n_courses,
  COUNT(DISTINCT object_id) AS n_objects,
  MIN(ts) AS min_ts,
  MAX(ts) AS max_ts
FROM events;
""").df()

df_top_actions = con.execute("""
SELECT action, COUNT(*) AS n
FROM events
GROUP BY action
ORDER BY n DESC
LIMIT 30;
""").df()

df_user_quant = con.execute("""
SELECT approx_quantile(cnt, [0.5, 0.9, 0.99]) AS q_events_per_user
FROM (
  SELECT user_id, COUNT(*) cnt
  FROM events
  GROUP BY user_id
);
""").df()

display(df_counts)
display(df_top_actions)
display(df_user_quant)


In [None]:
# [CELL 01-11] Export filtered course-level interactions (for later notebooks)

KEEP_ACTIONS = {
    "click_courseware",
    "load_video",
    "play_video",
    "problem_get",
    "problem_check",
    "click_info",
    "click_about",
    "click_progress",
    "click_forum",
}

actions_list = ",".join([f"'{a}'" for a in sorted(KEEP_ACTIONS)])
filtered_path = (REPO_ROOT / "data" / "processed" / "xuetangx_interactions_course_filtered.parquet").resolve()

con.execute(f"""
COPY (
  SELECT
    user_id,
    course_id AS item_id,
    ts,
    action
  FROM events
  WHERE action IN ({actions_list})
) TO '{str(filtered_path)}' (FORMAT PARQUET);
""")

log(f"Wrote filtered interactions parquet: {filtered_path}")

con.execute(f"CREATE OR REPLACE VIEW inter AS SELECT * FROM read_parquet('{str(filtered_path)}');")


## D) Reports + reproducibility artifacts

In [None]:
# [CELL 01-12] Reports output folder (per-run)

REPORT_DIR = (REPO_ROOT / "reports" / "01_eda_source_mooc").resolve()
RUN_TAG = datetime.now().strftime("%Y%m%d_%H%M%S")
OUT = REPORT_DIR / RUN_TAG
OUT.mkdir(parents=True, exist_ok=True)

log(f"REPORT OUT: {OUT}")


In [None]:
# [CELL 01-13] Save dataset metadata for reproducibility

# Raw files inventory
raw_files = []
for p in sorted(SOURCE_DIR.glob("*.json")):
    st = p.stat()
    raw_files.append({
        "name": p.name,
        "path": str(p),
        "size_bytes": int(st.st_size),
        "mtime": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
    })

# Parquet shards inventory
parquet_files = sorted(OUT_DIR.glob("*.parquet"))
parquet_total_bytes = sum(p.stat().st_size for p in parquet_files)

counts_row = con.execute("""
SELECT
  COUNT(*) AS n_events,
  COUNT(DISTINCT user_id) AS n_users,
  COUNT(DISTINCT course_id) AS n_courses,
  COUNT(DISTINCT object_id) AS n_objects,
  MIN(ts) AS min_ts,
  MAX(ts) AS max_ts
FROM events;
""").fetchone()

top_actions = con.execute("""
SELECT action, COUNT(*) AS n
FROM events
GROUP BY action
ORDER BY n DESC
LIMIT 30;
""").df().to_dict(orient="records")

user_q = con.execute("""
SELECT approx_quantile(cnt, [0.5, 0.9, 0.99]) AS q_events_per_user
FROM (SELECT user_id, COUNT(*) cnt FROM events GROUP BY user_id);
""").fetchone()[0]

meta = {
    "generated_at": datetime.now().isoformat(timespec="seconds"),
    "repo_root": str(REPO_ROOT),
    "source_dir": str(SOURCE_DIR),
    "raw_files": raw_files,
    "events_parquet_dir": str(OUT_DIR),
    "events_parquet_shards": len(parquet_files),
    "events_parquet_total_bytes": int(parquet_total_bytes),
    "duckdb_events_view": str(OUT_DIR / "*.parquet"),
    "counts": {
        "n_events": int(counts_row[0]),
        "n_users": int(counts_row[1]),
        "n_courses": int(counts_row[2]),
        "n_objects": int(counts_row[3]),
        "min_ts": str(counts_row[4]),
        "max_ts": str(counts_row[5]),
    },
    "top_actions": top_actions,
    "events_per_user_quantiles": user_q,
    "filtered_interactions_parquet": str(filtered_path),
    "filtered_actions": sorted(list(KEEP_ACTIONS)),
    "env": {
        "python": sys.version.split()[0],
        "platform": platform.platform(),
        "pandas": getattr(pd, "__version__", "unknown"),
        "duckdb": getattr(duckdb, "__version__", "unknown"),
        "pyarrow": getattr(pa, "__version__", "unknown"),
        "ijson": getattr(ijson, "__version__", "unknown"),
    },
    "notes": [
        "Notebook 01 covers EDA + JSON→Parquet conversion + DuckDB summaries only.",
        "Session-gap analysis and sessionization are planned for notebooks 04/05.",
    ],
}

meta_proc_path = (OUT_DIR / "dataset_metadata.json")
meta_rep_path = (OUT / "dataset_metadata.json")

meta_proc_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
meta_rep_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")

log(f"Saved metadata: {meta_proc_path}")
log(f"Saved metadata copy: {meta_rep_path}")


In [None]:
# [CELL 01-14] Plot: Top actions (save PNG + CSV)

import matplotlib.pyplot as plt

df_top20 = con.execute("""
SELECT action, COUNT(*) AS n
FROM events
GROUP BY action
ORDER BY n DESC
LIMIT 20;
""").df()

df_top20.to_csv(OUT / "top_actions_top20.csv", index=False)

plt.figure(figsize=(10, 6))
plt.barh(df_top20["action"][::-1], df_top20["n"][::-1])
plt.xlabel("Event count")
plt.title("Top 20 actions (XuetangX raw events)")
plt.tight_layout()

png = OUT / "plot_top20_actions.png"
plt.savefig(png, dpi=200)
plt.show()

log(f"Saved: {png}")


In [None]:
# [CELL 01-15] Plot: Daily event volume (save PNG + CSV)

df_daily = con.execute("""
SELECT DATE_TRUNC('day', ts) AS day, COUNT(*) AS n
FROM events
GROUP BY day
ORDER BY day;
""").df()

df_daily.to_csv(OUT / "daily_event_volume.csv", index=False)

plt.figure(figsize=(12, 4))
plt.plot(df_daily["day"], df_daily["n"])
plt.xlabel("Day")
plt.ylabel("Events")
plt.title("Daily event volume (XuetangX raw events)")
plt.tight_layout()

png = OUT / "plot_daily_event_volume.png"
plt.savefig(png, dpi=200)
plt.show()

log(f"Saved: {png}")


In [None]:
# [CELL 01-16] Plot: User activity distribution (log10 bins) (save PNG + CSV)

df_bins = con.execute("""
WITH u AS (
  SELECT user_id, COUNT(*) AS cnt
  FROM events
  GROUP BY user_id
),
b AS (
  SELECT
    CAST(FLOOR(LOG10(cnt)) AS INTEGER) AS log10_bin,
    COUNT(*) AS n_users
  FROM u
  GROUP BY log10_bin
)
SELECT * FROM b
ORDER BY log10_bin;
""").df()

df_bins.to_csv(OUT / "user_activity_log10_bins.csv", index=False)

plt.figure(figsize=(8, 4))
plt.bar(df_bins["log10_bin"], df_bins["n_users"])
plt.xlabel("log10(events per user) bin")
plt.ylabel("Number of users")
plt.title("User activity distribution (binned by log10)")
plt.tight_layout()

png = OUT / "plot_user_activity_log10_bins.png"
plt.savefig(png, dpi=200)
plt.show()

log(f"Saved: {png}")


In [None]:
# [CELL 01-17] Plot: Top courses by event count (save PNG + CSV)

df_courses = con.execute("""
SELECT course_id, COUNT(*) AS n
FROM events
GROUP BY course_id
ORDER BY n DESC
LIMIT 20;
""").df()

df_courses.to_csv(OUT / "top_courses_top20.csv", index=False)

plt.figure(figsize=(10, 7))
plt.barh(df_courses["course_id"][::-1], df_courses["n"][::-1])
plt.xlabel("Event count")
plt.title("Top 20 courses by event count")
plt.tight_layout()

png = OUT / "plot_top20_courses.png"
plt.savefig(png, dpi=200)
plt.show()

log(f"Saved: {png}")


In [None]:
# [CELL 01-18] Write Markdown report (artifact)

md = []
md.append(f"# 01 — EDA Source MOOC (XuetangX raw) — {RUN_TAG}\n")
md.append("## Overview\n")
md.append(df_counts.to_markdown(index=False))
md.append("\n\n## Top actions (top 20)\n")
md.append(df_top20.to_markdown(index=False))
md.append("\n\n## User event quantiles\n")
md.append(df_user_quant.to_markdown(index=False))
md.append("\n\n## Artifacts\n")
md.append("- dataset_metadata.json (also saved under data/processed/xuetangx_events_parquet/)\n")
md.append("- plot_top20_actions.png\n")
md.append("- plot_daily_event_volume.png\n")
md.append("- plot_user_activity_log10_bins.png\n")
md.append("- plot_top20_courses.png\n")

report_path = OUT / "report_01_eda_source_mooc.md"
report_path.write_text("\n".join(md), encoding="utf-8")
log(f"Wrote: {report_path}")
