In [0]:
import json
import requests
import os, uuid, pathlib, shutil
from datetime import datetime
from typing import List, Dict, Tuple

from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
env = dbutils.widgets.get("env").strip()
match_id = dbutils.widgets.get("match_id").strip()
if not env:
    raise ValueError("Widget 'env' is required, e.g. 'dev', staging, prod")

In [0]:
CATALOG = f"skillcorner_{env}"
SCHEMA = f"{CATALOG}.raw"

In [0]:
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
spark.sql(f"USE {SCHEMA}")

In [0]:
# Generic Spark setting for safe partition overwrites (engine-agnostic)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

In [0]:
base_url = "https://raw.githubusercontent.com/SkillCorner/opendata/master/data"
matches_index_url = f"{base_url}/matches.json"

In [0]:
def list_match_ids() -> List[str]:
    data = requests.get(matches_index_url, timeout=60).json()
    return [str(x["id"]) for x in data if "id" in x]

def urls_for_match(mid: str) -> Dict[str, str]:
    return {
        "meta":     f"{base_url}/matches/{mid}/{mid}_match.json",
        "tracking": f"{base_url}/matches/{mid}/{mid}_tracking_extrapolated.jsonl",
        "events":   f"{base_url}/matches/{mid}/{mid}_dynamic_events.csv",
        "phases":   f"{base_url}/matches/{mid}/{mid}_phases_of_play.csv",
    }

def table_exists(ident: str) -> bool:
    return spark._jsparkSession.catalog().tableExists(ident)

def ensure_table(table_ident: str, schema: T.StructType, partition_by: str = None):
    """
    Create a managed table with the given schema if it doesn't exist.
    No storage format is specified (portable).
    """
    if table_exists(table_ident):
        return
    df_empty = spark.createDataFrame([], schema)
    w = df_empty.write.mode("error")
    if partition_by:
        w = w.partitionBy(partition_by)
    w.saveAsTable(table_ident)

def overwrite_partition_via_insert(df, table_ident: str, partition_col: str, partition_value: str):
    """
    Portable single-partition overwrite using INSERT OVERWRITE PARTITION.
    Avoids engine-specific features; uses only generic Spark SQL.
    """
    non_part_cols = [c for c in df.columns if c != partition_col]
    tmp_view = f"_tmp_{partition_col}_{uuid.uuid4().hex}"

    df.filter(F.col(partition_col) == partition_value).select(*non_part_cols).createOrReplaceTempView(tmp_view)
    cols_csv = ", ".join(non_part_cols)
    spark.sql(f"""
        INSERT OVERWRITE TABLE {table_ident}
        PARTITION ({partition_col}='{partition_value}')
        SELECT {cols_csv} FROM {tmp_view}
    """)
    spark.catalog.dropTempView(tmp_view)

In [0]:

# Ensure RAW tables (portable)

# 1. Catalog of matches (from matches.json) - one row per match entry (json string kept verbatim)
ensure_table(
    f"{SCHEMA}.raw_matches_catalog",
    T.StructType([
        T.StructField("match_id",   T.StringType(), True),
        T.StructField("json",       T.StringType(), True),
        T.StructField("source_url", T.StringType(), True),
        T.StructField("ingested_at",T.TimestampType(), True),
    ]),
    partition_by="match_id"
)

# 2. Per-match match.json (verbatim blob string)
ensure_table(
    f"{SCHEMA}.raw_match_json",
    T.StructType([
        T.StructField("match_id",   T.StringType(), True),
        T.StructField("json",       T.StringType(), True),
        T.StructField("source_url", T.StringType(), True),
        T.StructField("ingested_at",T.TimestampType(), True),
    ]),
    partition_by="match_id"
)

# 3. Per-match tracking JSONL (one JSON line per row)
ensure_table(
    f"{SCHEMA}.raw_tracking_jsonl",
    T.StructType([
        T.StructField("match_id",    T.StringType(), True),
        T.StructField("line_number", T.LongType(),   True),
        T.StructField("json_line",   T.StringType(), True),
        T.StructField("source_url",  T.StringType(), True),
        T.StructField("ingested_at", T.TimestampType(), True),
    ]),
    partition_by="match_id"
)

# 4. Per-match events CSV (keep columns as STRING in RAW)
ensure_table(
    f"{SCHEMA}.raw_events_csv",
    T.StructType([
        T.StructField("match_id",   T.StringType(), True),
        # CSV columns will be supplied on write (all cast to STRING)
        T.StructField("source_url", T.StringType(), True),
        T.StructField("ingested_at",T.TimestampType(), True),
    ]),
    partition_by="match_id"
)

# 5. Per-match phases CSV (keep columns as STRING in RAW)
ensure_table(
    f"{SCHEMA}.raw_phases_csv",
    T.StructType([
        T.StructField("match_id",   T.StringType(), True),
        T.StructField("source_url", T.StringType(), True),
        T.StructField("ingested_at",T.TimestampType(), True),
    ]),
    partition_by="match_id"
)


In [0]:
# Tiny downloader (to /tmp)

TMP_DIR = "/tmp/skillcorner_raw"
pathlib.Path(TMP_DIR).mkdir(parents=True, exist_ok=True)

def _tmp(name: str) -> str:
    return os.path.join(TMP_DIR, f"{uuid.uuid4().hex}_{name}")

def download(url: str, to_path: str) -> str:
    r = requests.get(url, stream=True, timeout=60)
    r.raise_for_status()
    with open(to_path, "wb") as f:
        shutil.copyfileobj(r.raw, f)
    return to_path


In [0]:

# Ingest: matches catalog (all)

matches = requests.get(matches_index_url, timeout=60).json()
now = datetime.utcnow()

rows = []
for obj in matches:
    mid = str(obj.get("id", ""))
    if mid:
        rows.append((mid, json.dumps(obj, ensure_ascii=False), matches_index_url, now))

df_catalog = spark.createDataFrame(rows, schema="match_id string, json string, source_url string, ingested_at timestamp")

# Overwrite each partition (safe to re-run)
for mid in {r[0] for r in rows}:
    overwrite_partition_via_insert(
        df_catalog.withColumn("match_id", F.lit(mid)),
        f"{SCHEMA}.raw_matches_catalog",
        partition_col="match_id",
        partition_value=mid
    )

# Ingest: per-match artifacts

target_ids = [match_id] if match_id else [str(x["id"]) for x in matches if "id" in x]

summary = []
for mid in target_ids:
    urls = urls_for_match(mid)
    now_ts = F.current_timestamp()

    # 2) match.json -> single row
    meta_path = _tmp("match.json"); download(urls["meta"], meta_path)
    meta_text = open(meta_path, "r", encoding="utf-8").read()
    df_meta = (spark.createDataFrame([(meta_text, urls["meta"])], "json string, source_url string")
               .withColumn("ingested_at", now_ts)
               .withColumn("match_id", F.lit(mid))
               .select("json","source_url","ingested_at","match_id"))
    overwrite_partition_via_insert(
        df=df_meta,
        table_ident=f"{SCHEMA}.raw_match_json",
        partition_col="match_id",
        partition_value=mid
    )

    # 3) tracking JSONL -> (line_number, json_line)
    trk_path = _tmp("tracking.jsonl"); download(urls["tracking"], trk_path)
    rdd = (spark.sparkContext.textFile(f"file:{trk_path}")
           .zipWithIndex()
           .map(lambda x: (x[1] + 1, x[0])))
    df_trk = (spark.createDataFrame(rdd, "line_number long, json_line string")
              .withColumn("source_url", F.lit(urls["tracking"]))
              .withColumn("ingested_at", now_ts)
              .withColumn("match_id", F.lit(mid))
              .select("line_number","json_line","source_url","ingested_at","match_id"))
    overwrite_partition_via_insert(
        df=df_trk,
        table_ident=f"{SCHEMA}.raw_tracking_jsonl",
        partition_col="match_id",
        partition_value=mid
    )

    # 4) events CSV -> all columns as STRING
    evt_path = _tmp("events.csv"); download(urls["events"], evt_path)
    df_evt = spark.read.csv(f"file:{evt_path}", header=True, inferSchema=False)
    # Cast everything to STRING for RAW
    df_evt = df_evt.select([F.col(c).cast("string").alias(c) for c in df_evt.columns]) \
                   .withColumn("source_url", F.lit(urls["events"])) \
                   .withColumn("ingested_at", now_ts) \
                   .withColumn("match_id", F.lit(mid))
    # Reorder to put non-partition columns first (optional)
    non_part_cols = [c for c in df_evt.columns if c != "match_id"]
    df_evt = df_evt.select(*non_part_cols, "match_id")
    overwrite_partition_via_insert(
        df=df_evt,
        table_ident=f"{SCHEMA}.raw_events_csv",
        partition_col="match_id",
        partition_value=mid
    )

    # 5) phases CSV -> all columns as STRING
    pha_path = _tmp("phases.csv"); download(urls["phases"], pha_path)
    df_pha = spark.read.csv(f"file:{pha_path}", header=True, inferSchema=False)
    df_pha = df_pha.select([F.col(c).cast("string").alias(c) for c in df_pha.columns]) \
                   .withColumn("source_url", F.lit(urls["phases"])) \
                   .withColumn("ingested_at", now_ts) \
                   .withColumn("match_id", F.lit(mid))
    non_part_cols_p = [c for c in df_pha.columns if c != "match_id"]
    df_pha = df_pha.select(*non_part_cols_p, "match_id")
    overwrite_partition_via_insert(
        df=df_pha,
        table_ident=f"{SCHEMA}.raw_phases_csv",
        partition_col="match_id",
        partition_value=mid
    )

    summary.append((mid, df_trk.count(), df_evt.count(), df_pha.count()))

spark.createDataFrame(summary, schema="match_id string, tracking_lines long, events_rows long, phases_rows long").show(truncate=False)
print(f"Loaded {len(summary)} match(es) into {SCHEMA}")