# CT Training Data Pipeline — Validation
## End-to-End Checks (Bronze → Silver)

Purpose:
- Confirm Bronze stream is writing to Delta on MinIO.
- Confirm Silver batch aggregates are present and partitioned.
- Compute core KPIs for a quick health signal.

## 1. Setup — Spark Session
Relies on `spark/apps/spark_config.py` shipped with this repo.

In [None]:
import sys
from pathlib import Path

def add_project_root(marker_rel="spark/apps/spark_config.py", max_up=8):
    cur = Path.cwd().resolve()
    for _ in range(max_up):
        if (cur / marker_rel).is_file():
            if str(cur) not in sys.path:
                sys.path.insert(0, str(cur))
            print("Using project root:", cur)
            return cur
        cur = cur.parent
    raise RuntimeError(f"Couldn't find {marker_rel} within {max_up} levels up from {Path.cwd()}")

PROJECT_ROOT = add_project_root()

In [None]:
# Local Spark with Delta (no spark-master)
# Puts Delta + S3A jars on the *driver* classpath and starts Spark local[*].
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pathlib import Path
from datetime import datetime, timezone
import urllib.request, os, gc

# 0) Clean reset to avoid stale configs
try:
    spark.stop()
except Exception:
    pass
SparkSession._instantiatedSession = None
SparkContext._active_spark_context = None
gc.collect()

# 1) Find project root (matches the layout) and jars dir
def find_project_root(marker_rel="spark/apps/spark_config.py", max_up=8):
    cur = Path.cwd().resolve()
    for _ in range(max_up):
        if (cur / marker_rel).is_file():
            return cur
        cur = cur.parent
    return Path.cwd().resolve()

PROJECT_ROOT = find_project_root()
JARS_DIR = PROJECT_ROOT / "spark" / "jars"
JARS_DIR.mkdir(parents=True, exist_ok=True)

# 2) Ensure required JARs are present (download if missing)
SCALA_BIN = "2.12"
DELTA_VER = "3.3.0"
HADOOP_AWS_VER = "3.3.4"
AWS_SDK_VER = "1.12.300"

maven = "https://repo1.maven.org/maven2"
jars_to_fetch = {
    f"delta-spark_{SCALA_BIN}-{DELTA_VER}.jar":
        f"{maven}/io/delta/delta-spark_{SCALA_BIN}/{DELTA_VER}/delta-spark_{SCALA_BIN}-{DELTA_VER}.jar",
    f"delta-storage-{DELTA_VER}.jar":
        f"{maven}/io/delta/delta-storage/{DELTA_VER}/delta-storage-{DELTA_VER}.jar",
    f"hadoop-aws-{HADOOP_AWS_VER}.jar":
        f"{maven}/org/apache/hadoop/hadoop-aws/{HADOOP_AWS_VER}/hadoop-aws-{HADOOP_AWS_VER}.jar",
    f"aws-java-sdk-bundle-{AWS_SDK_VER}.jar":
        f"{maven}/com/amazonaws/aws-java-sdk-bundle/{AWS_SDK_VER}/aws-java-sdk-bundle-{AWS_SDK_VER}.jar",
}

def ensure_jar(fname, url):
    path = JARS_DIR / fname
    if not path.exists():
        print(f"Downloading {fname} ...")
        urllib.request.urlretrieve(url, path)
    return str(path.resolve())

# Start with required jars, then include any others already in spark/jars
jar_paths = [ensure_jar(n, u) for n, u in jars_to_fetch.items()]
for p in sorted(JARS_DIR.glob("*.jar")):
    sp = str(p.resolve())
    if sp not in jar_paths:
        jar_paths.append(sp)

# Build classpaths (',' for spark.jars, os.pathsep for *extraClassPath)
spark_jars_csv = ",".join(jar_paths)
driver_cp = os.pathsep.join(jar_paths)

# 3) Build Spark local session with Delta extensions
builder = (
    SparkSession.builder
      .appName("validate-pipeline")
      .master("local[*]")
      .config("spark.jars", spark_jars_csv)
      .config("spark.driver.extraClassPath", driver_cp)   # <-- critical for driver
      .config("spark.executor.extraClassPath", driver_cp) # safe for local[*]
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = builder.getOrCreate()

# 4) Sanity checks (must succeed)
_ = spark._jvm.java.lang.Class.forName("io.delta.sql.DeltaSparkSessionExtension")
print("Delta extension loaded")
print("Spark version:", spark.version)
print("Spark master:", spark.sparkContext.master)
print("UTC now:", datetime.now(timezone.utc).isoformat())
print("Extensions:", spark.sparkContext.getConf().get("spark.sql.extensions"))
print("Using", len(jar_paths), "JARs from:", JARS_DIR)

In [None]:
# Bring in `s3a()`
import sys, os
from pathlib import Path

try:
    from spark.apps.spark_config import s3a as _s3a
    s3a = _s3a
    print("Using s3a() from repo: spark/apps/spark_config.py")
except Exception as e:
    print("Repo s3a() unavailable, using simple fallback:", e)
    def s3a(*parts):
        """
        Minimal s3a path builder: s3a://<S3_BUCKET>/<joined parts>
        Uses S3_BUCKET env var (defaults to 'datalake').
        """
        bucket = os.getenv("S3_BUCKET", "datalake")
        key = "/".join(p.strip("/") for p in parts if p)
        return f"s3a://{bucket}/{key}"

In [None]:
# Configure S3A (MinIO) at runtime
import os
hc = spark._jsc.hadoopConfiguration()

endpoint = (
    os.getenv("S3_ENDPOINT")
    or os.getenv("MINIO_ENDPOINT")
    or "http://localhost:9000"
)
access  = os.getenv("S3_ACCESS_KEY") or os.getenv("AWS_ACCESS_KEY_ID") or "minioadmin"
secret  = os.getenv("S3_SECRET_KEY") or os.getenv("AWS_SECRET_ACCESS_KEY") or "minioadmin"
pathsty = os.getenv("S3_PATH_STYLE", "true")
ssl_on  = "false" if endpoint.startswith("http://") else "true"

hc.set("fs.s3a.endpoint", endpoint)
hc.set("fs.s3a.path.style.access", pathsty)
hc.set("fs.s3a.access.key", access)
hc.set("fs.s3a.secret.key", secret)
hc.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hc.set("fs.s3a.connection.ssl.enabled", ssl_on)

print("S3A endpoint      :", endpoint)
print("S3A path-style    :", pathsty)
print("S3A bucket (env)  :", os.getenv("S3_BUCKET", "datalake"))

In [None]:
# Quick library version check
import pyspark
print("PySpark:", pyspark.__version__)
try:
    import importlib.metadata as importlib_metadata
    print("delta-spark:", importlib_metadata.version("delta-spark"))
except Exception as e:
    print("delta-spark version check skipped:", e)

## 2. Paths and Tables

In [None]:
BRONZE      = s3a("bronze", "lms_events")
SILVER      = s3a("silver", "user_progress")
CHECKPOINTS = s3a("_checkpoints")

print("Bronze     :", BRONZE)
print("Silver     :", SILVER)
print("Checkpoints:", CHECKPOINTS)

## 3. Load — Bronze and Silver

In [None]:
from pyspark.sql.functions import max as F_max, countDistinct, col

bronze = spark.read.format("delta").load(BRONZE)
silver = spark.read.format("delta").load(SILVER)

print("Bronze count:", bronze.count())
print("Silver count:", silver.count())

print("\nBronze schema:")
bronze.printSchema()

print("\nSilver schema:")
silver.printSchema()

latest_bronze_ts = bronze.agg(F_max("timestamp_iso").alias("ts")).collect()[0]["ts"]
latest_silver_ts = silver.agg(F_max("processing_timestamp").alias("ts")).collect()[0]["ts"]
print("\nLatest bronze timestamp_iso:", latest_bronze_ts)
print("Latest silver processing_timestamp:", latest_silver_ts)

unique_bronze_events = bronze.select("event_type").distinct().count()
print("Unique bronze event types:", unique_bronze_events)

## 4. Sanity — Required Event Types Present

In [None]:
required = {
    "course_enrolled", "video_started", "video_completed",
    "quiz_attempted", "quiz_submitted", "article_viewed"
}
present = set(r[0] for r in bronze.select("event_type").distinct().collect())
missing = sorted(list(required - present))
print("Missing required event types:", missing)
assert not missing, f"Missing event types: {missing}"

## 5. KPIs — Aggregates from Silver
Daily rollup per user_id. Metrics match the Silver transform contract.

In [None]:
from pyspark.sql.functions import avg as F_avg, round as F_round, desc

kpi = silver.agg(
    F_round(F_avg("quiz_completion_rate_percent"), 1).alias("avg_quiz_completion_%"),
    F_round(F_avg("video_completion_rate_percent"), 1).alias("avg_video_completion_%"),
    F_round(F_avg("completion_score"), 1).alias("avg_completion_score")
)
kpi.show(truncate=False)

print("Top courses by enrollments (from Bronze):")
(
    bronze.where(col("event_type") == "course_enrolled")
          .groupBy("course_id")
          .count()
          .orderBy(desc("count"))
          .show(10, truncate=False)
)

## 6. Engagement Distribution
Counts and percentages by engagement_level (Low/Medium/High).

In [None]:
from pyspark.sql.functions import sum as F_sum, round as F_round, col, desc

dist = silver.groupBy("engagement_level").count()
total = silver.count()
pct = dist.withColumn("percentage", F_round((col("count")/total)*100, 1))
pct.orderBy(desc("count")).show(truncate=False)

## 7. Health — Minimal SLOs
- Bronze has data and all required event types.
- Silver has data for at least one partition date.
- Active users and courses are non-zero.

In [None]:
from pyspark.sql.functions import max as F_max

bronze_cnt = bronze.count()
silver_cnt = silver.count()
event_types_ok = (unique_bronze_events == 6)
active_users = bronze.select("user_id").distinct().count()
active_courses = bronze.select("course_id").distinct().count()

print("Bronze records:", bronze_cnt)
print("Silver records:", silver_cnt)
print("Event types OK:", event_types_ok)
print("Active users:", active_users)
print("Active courses:", active_courses)

status = all([
    bronze_cnt > 0,
    silver_cnt > 0,
    event_types_ok,
    active_users > 0,
    active_courses > 0,
])
print("\nPIPELINE STATUS:", "HEALTHY" if status else "NEEDS_ATTENTION")

## 8. Chart — Events by Type (from Bronze)

In [None]:
try:
    pdf = (bronze.groupBy("event_type").count().orderBy("event_type")).toPandas()
    import matplotlib.pyplot as plt
    plt.figure()
    plt.bar(pdf["event_type"], pdf["count"])  # default colors
    plt.title("Events by Type (Bronze)")
    plt.xticks(rotation=30, ha="right")
    plt.tight_layout()
    plt.show()
except Exception as e:
    print("Chart skipped:", e)

## 9. Business KPIs
Concrete questions derived from Bronze events to complement the validation checks.

In [None]:
# Parameters — set once for this section
COURSE_ID = "ct_anatomy_fundamentals"  # change as needed

### 9.1 Average quiz score per course (Bronze)
Uses `quiz_submitted` events' `payload.score`.

In [None]:
from pyspark.sql.functions import avg as F_avg, round as F_round, col, desc

avg_quiz_score_per_course = (
    spark.read.format("delta").load(BRONZE)
    .where(col("event_type") == "quiz_submitted")
    .select("course_id", col("payload.score").alias("score"))
    .groupBy("course_id")
    .agg(F_round(F_avg("score"), 2).alias("avg_quiz_score"))
    .orderBy(desc("avg_quiz_score"))
)

avg_quiz_score_per_course.show(truncate=False)

### 9.2 Most-watched video (Bronze)
Ranks by total watch time across all `video_completed` events.

In [None]:
from pyspark.sql.functions import sum as F_sum, desc, col

most_watched_videos = (
    spark.read.format("delta").load(BRONZE)
    .where(col("event_type") == "video_completed")
    .select(col("payload.video_id").alias("video_id"),
            col("payload.watch_seconds").alias("watch_seconds"))
    .groupBy("video_id")
    .agg(F_sum("watch_seconds").alias("total_watch_seconds"))
    .orderBy(desc("total_watch_seconds"))
)

most_watched_videos.show(20, truncate=False)

### 9.3 Users who completed a specific course yesterday (UTC)
Two definitions are computed for clarity:
- **Strict**: user submitted a quiz and `passed = true` for that course yesterday.
- **Loose**: user submitted any quiz for that course yesterday.

In [None]:
from pyspark.sql.functions import to_date, col, lit

bronze_df = spark.read.format("delta").load(BRONZE)
YESTERDAY = spark.sql("select date_sub(current_date(), 1) as d").collect()[0]["d"]

completed_users_strict = (
    bronze_df
    .where((col("event_type") == "quiz_submitted") & (col("course_id") == COURSE_ID))
    .withColumn("event_date", to_date(col("timestamp_iso")))
    .where(col("event_date") == lit(YESTERDAY))
    .where(col("payload.passed") == True)
    .select("user_id").distinct()
)

completed_users_loose = (
    bronze_df
    .where((col("event_type") == "quiz_submitted") & (col("course_id") == COURSE_ID))
    .withColumn("event_date", to_date(col("timestamp_iso")))
    .where(col("event_date") == lit(YESTERDAY))
    .select("user_id").distinct()
)

print("YESTERDAY (UTC):", YESTERDAY)
print("Course:", COURSE_ID)
print("Completed (strict, passed=True):", completed_users_strict.count())
print("Completed (loose, any quiz submitted):", completed_users_loose.count())

### 9.4 Persist KPI outputs to CSV in MinIO (S3A)
Writes partitioned CSVs with headers under `s3a://datalake/analytics/...`.

In [None]:
from pyspark.sql.functions import lit
from pyspark.sql import Row

METRIC_DATE = YESTERDAY  # reuse yesterday from 9.3

# 1) Average quiz score per course
(
    avg_quiz_score_per_course
    .withColumn("metric_date", lit(METRIC_DATE))
    .coalesce(1)
    .write
    .mode("append")
    .option("header", "true")
    .partitionBy("metric_date")
    .csv(s3a("analytics", "avg_quiz_score_per_course"))
)

# 2) Most-watched videos (by total watch time)
(
    most_watched_videos
    .withColumn("metric_date", lit(METRIC_DATE))
    .coalesce(1)
    .write
    .mode("append")
    .option("header", "true")
    .partitionBy("metric_date")
    .csv(s3a("analytics", "most_watched_videos"))
)

# 3) Course completion counts (strict vs loose) for the selected course
strict_count = completed_users_strict.count()
loose_count  = completed_users_loose.count()

completion_rows = [
    Row(definition="strict_passed", course_id=COURSE_ID, users=int(strict_count)),
    Row(definition="loose_any_quiz", course_id=COURSE_ID, users=int(loose_count)),
]
completion_df = spark.createDataFrame(completion_rows)

(
    completion_df
    .withColumn("metric_date", lit(METRIC_DATE))
    .coalesce(1)
    .write
    .mode("append")
    .option("header", "true")
    .partitionBy("metric_date", "course_id")
    .csv(s3a("analytics", "course_completion"))
)

print("CSV exports complete:")
print(" -", s3a("analytics", "avg_quiz_score_per_course"))
print(" -", s3a("analytics", "most_watched_videos"))
print(" -", s3a("analytics", "course_completion"))

## 10. Teardown
Stop the Spark session to release resources.

In [None]:
spark.stop()
print("Spark session closed.")