In [0]:
import requests

def build_session(api_key: str) -> requests.Session:
    s = requests.Session()
    s.headers.update({"Authorization": f"Bearer {api_key}"})
    s.timeout = 60
    return s

def fetch_json(session: requests.Session, base_url: str, path: str):
    r = session.get(f"{base_url}{path}")
    r.raise_for_status()
    data = r.json()
    return data if isinstance(data, list) else [data]


In [0]:
import json, datetime
from pyspark.sql import functions as F

#AUDIT_SCHEMA = "raw string,  _season int, _week int, _source string"
AUDIT_SCHEMA = "raw string, _ingest_ts string, _season int, _week int, _source string"

def write_bronze_raw(spark, records, endpoint_name: str, bronze_schema: str, season: int, week: int):
    ingest_ts = datetime.datetime.utcnow().isoformat()
    rows = [(json.dumps(rec),  ingest_ts, int(season), int(week), endpoint_name) for rec in records]
    df = spark.createDataFrame(rows, AUDIT_SCHEMA)
    target = f"{bronze_schema}.{endpoint_name}_raw"
    if not spark.catalog.tableExists(target):
        (df.write
           .format("delta")
           .mode("overwrite")
           .partitionBy("_season","_week")
           .saveAsTable(target))
        return
    (df.write
       .format("delta")
       .mode("overwrite")
       .option("replaceWhere", f"_season = {season} AND _week = {week}")
       .saveAsTable(target))
    return 


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, ArrayType

def infer_schema_from_sample(spark, raw_tbl: str, season: int, week: int):
    sample = (spark.table(raw_tbl)
                  .where((F.col("_season")==season) & (F.col("_week")==week))
                  .select("raw").limit(1).collect())
    if not sample: return None
    sample_json = sample[0]["raw"]
    return (spark.range(1)
                 .select(F.schema_of_json(F.lit(sample_json)).alias("s"))
                 .collect()[0]["s"])

def parse_raw_to_cols(spark, raw_tbl: str, season: int, week: int, schema: StructType):
    return (spark.table(raw_tbl)
            .where((F.col("_season")==season) & (F.col("_week")==week))
            .withColumn("obj", F.from_json(F.col("raw"), schema))
            .select("obj.*","_ingest_ts","_season","_week","_source"))

def flatten_structs_until_done(df, sep="_"):
    changed = True
    while changed:
        changed = False
        new_cols = []
        for f in df.schema.fields:
            if isinstance(f.dataType, StructType):
                for sub in f.dataType.fields:
                    new_cols.append(F.col(f"{f.name}.{sub.name}").alias(f"{f.name}{sep}{sub.name}"))
                changed = True
            else:
                new_cols.append(F.col(f.name))
        df = df.select(*new_cols)
    return df

def flatten_and_explode_all(df, sep="_"):
    """
    Recursively flatten structs, then explode one array at a time,
    until only primitive columns remain.
    """
    while True:
        # flatten one struct
        flattened = False
        for f in df.schema.fields:
            if isinstance(f.dataType, StructType):
                keep = [c for c in df.columns if c != f.name]
                exp  = [F.col(f"{f.name}.{s.name}").alias(f"{f.name}{sep}{s.name}") for s in f.dataType.fields]
                df = df.select(*keep, *exp)
                flattened = True
                break
        if flattened: continue

        # explode one array
        exploded = False
        for f in df.schema.fields:
            if isinstance(f.dataType, ArrayType):
                df = df.withColumn(f.name, F.explode_outer(F.col(f.name)))
                exploded = True
                break
        if exploded: continue

        return df

from delta.tables import DeltaTable

def upsert_merge(df, target_table: str, keys: list[str], partition_by: list[str] | None = None):
    """
    Upsert df into target_table on 'keys'. Creates the table if it doesn't exist.
    - Keeps all columns from df (update all / insert all)
    - Preserves partitioning you choose on the first create
    """
    spark = df.sparkSession
    table_exists = spark.catalog.tableExists(target_table)

    if not table_exists:
        w = df.write.format("delta").mode("overwrite")
        if partition_by:
            w = w.partitionBy(*partition_by)
        w.saveAsTable(target_table)
        return

    cond = " AND ".join([f"t.{k} = s.{k}" for k in keys])

    non_keys = [c for c in df.columns if c not in keys]
    set_map = {c: F.col(f"s.{c}") for c in non_keys}

    print(cond)
    print (set_map)

    target = DeltaTable.forName(spark, target_table)
    (target.alias("t")
           .merge(df.alias("s"), cond)
           .whenMatchedUpdate(set=set_map)
           .whenNotMatchedInsertAll()
           .execute())



In [0]:
#from http_connector import build_session, fetch_json
#from bronze import write_bronze_raw
#from silver import infer_schema_from_sample, parse_raw_to_cols, flatten_structs_until_done, flatten_and_explode_all, upsert_merge
#from endpoints import ENDPOINTS
from pyspark.sql import functions as F

BASE_URL = "https://api.collegefootballdata.com"

def run_ingest_to_bronze(spark, api_key: str, bronze_schema: str, endpoint: str, season: int, week: int):
    conf = ENDPOINTS[endpoint]
    url_path = conf["url"]({"season": season, "week": week})
    print(url_path)
    session = build_session(api_key)
    records = fetch_json(session, BASE_URL, url_path)
    if not records:
        print(f"[{endpoint}] no rows for season={season} week={week}")
        return f"{bronze_schema}.{endpoint}_raw"
    return write_bronze_raw(spark, records, conf["target"], bronze_schema, season, week)

def run_build_silver_flat(spark, silver_schema: str, bronze_schema: str, endpoint: str, season: int, week: int, mode: str = "explode_all"):
    conf = ENDPOINTS[endpoint]
    raw_tbl = f"{bronze_schema}.{conf['target']}_raw"
    silver_tbl = f"{silver_schema}.{conf['target']}_flat_rows"
    schema = infer_schema_from_sample(spark, raw_tbl, season, week)
    if not schema:
        print(f"[{endpoint}] no sample to infer schema; skipping")
        return None

    df = parse_raw_to_cols(spark, raw_tbl, season, week, schema)

    if mode == "explode_all":
        flat = flatten_and_explode_all(df)
    else:
        flat = flatten_structs_until_done(df)

    # OPTIONAL: clean up column prefixes for common arrays/structs
    for old in list(flat.columns):
        for p in ("teams_", "stats_", "linescores_", "plays_", "events_"):
            if old.startswith(p):
                flat = flat.withColumnRenamed(old, old[len(p):])
    '''
    target = f"{silver_schema}.{conf['target']}_{'flat' if mode=='flatten_only' else 'flat_rows'}"
    (flat.write
         .format("delta")
         .mode("overwrite")
         .option("overwriteSchema", True)
         .saveAsTable(target))
    return target'''

    upsert_merge(flat, silver_tbl, keys=["id","teamId","stats_category"], partition_by=["_season","_week"])


In [0]:
from typing import Dict, Callable

def url_games(p):   return f"/games?year={p['season']}&week={p['week']}"
def url_games_teams(p):   return f"/games/teams?year={p['season']}&week={p['week']}"
def url_games_players(p):return f"/games/players?year={p['season']}&week={p['week']}"

ENDPOINTS: Dict[str, Dict] = {
    "games": {
        "url": url_games,
        "target": "games",             # base table name
        "mode": "explode_all",               # per season/week slice
    },
    "games_teams": {
        "url": url_games_teams,
        "target": "games_teams",
        "mode": "explode_all",              # per season slice
    },
    "games_players": {
        "url": url_games_players,
        "target": "games_players",
        "mode": "explode_all",
    },
}


In [0]:


# Databricks notebook
dbutils.widgets.text("endpoint", "games_teams")
dbutils.widgets.text("season", "2024")
dbutils.widgets.text("week", "1")
dbutils.widgets.text("division", "fbs")
dbutils.widgets.text("catalog", "cfbd")

endpoint = dbutils.widgets.get("endpoint")
season   = int(dbutils.widgets.get("season"))
week     = int(dbutils.widgets.get("week"))
division = dbutils.widgets.get("division")
catalog  = dbutils.widgets.get("catalog")

bronze_schema = f"{catalog}.bronze"
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog} MANAGED LOCATION 's3://databricks-workspace-stack-fed2d-bucket/unity-catalog/455082732875091'")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {bronze_schema}")

#from cfbd.jobs import run_ingest_to_bronze

api_key = dbutils.secrets.get("cfbd", "api-key")  # scope 'cfbd', key 'api_key'

tbl = run_ingest_to_bronze(spark, api_key, bronze_schema, endpoint, season, week)
print(f"Bronze table: {tbl}")


In [0]:

# Databricks notebook
dbutils.widgets.text("endpoint", "games_teams")
dbutils.widgets.text("season", "2024")
dbutils.widgets.text("week", "1")
dbutils.widgets.text("catalog", "cfbd")
dbutils.widgets.text("mode", "explode_all")  # "flatten_only" or "explode_all"

endpoint = dbutils.widgets.get("endpoint")
season   = int(dbutils.widgets.get("season"))
week     = int(dbutils.widgets.get("week"))
catalog  = dbutils.widgets.get("catalog")
mode     = dbutils.widgets.get("mode")

bronze_schema = f"{catalog}.bronze"
silver_schema = f"{catalog}.silver"
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog} MANAGED LOCATION 's3://databricks-workspace-stack-fed2d-bucket/unity-catalog/455082732875091'")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {bronze_schema}")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {silver_schema}")

#from cfbd.jobs import run_build_silver_flat
target = run_build_silver_flat(spark, silver_schema, bronze_schema, endpoint, season, week, mode)
print(f"Silver table: {target}")
