# 02 - Ingest ESPN Athletes (Bronze)

- Collect athlete IDs from bronze fights
- Fetch Core v2 athlete profiles (expand + deref)
- Write bronze table `espn_athletes`
- Show sample


In [None]:
from datetime import datetime, timezone
import json, time
from pyspark.sql import functions as F

try:
    dbutils.widgets.text("storage_account", "storagetmufc")
    dbutils.widgets.text("secret_scope", "kv-scope")
    dbutils.widgets.text("key_name", "adls-account-key")
    dbutils.widgets.text("db_name", "ufc_bronze")
    dbutils.widgets.text("max_concurrency", "5")
    dbutils.widgets.text("http_timeout_sec", "30")
    dbutils.widgets.text("http_retries", "3")
    dbutils.widgets.text("user_agent", "ufc-pipeline/1.0 (+databricks)")
except Exception:
    pass

storage_account  = dbutils.widgets.get("storage_account")
secret_scope     = dbutils.widgets.get("secret_scope")
key_name         = dbutils.widgets.get("key_name")
DB_NAME          = dbutils.widgets.get("db_name")
MAX_CONC         = int(dbutils.widgets.get("max_concurrency"))
HTTP_TIMEOUT     = int(dbutils.widgets.get("http_timeout_sec"))
HTTP_RETRIES     = int(dbutils.widgets.get("http_retries"))
USER_AGENT       = dbutils.widgets.get("user_agent")

account_key = dbutils.secrets.get(secret_scope, key_name)
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", account_key)

def abfss(container, path=""):
    base = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
    return f"{base}/{path}".rstrip("/")

PATH_ATHLETES_DELTA = abfss("bronze", f"{DB_NAME}/espn_athletes")
PATH_LANDING_ROOT   = abfss("landing","espn/ufc/athletes")
PATH_RAW_ROOT       = abfss("raw",    "espn/ufc/athletes")

RUN_ID = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
INGESTION_DATE = datetime.now(timezone.utc).date().isoformat()

for p in [f"{PATH_LANDING_ROOT}/run_id={RUN_ID}", f"{PATH_RAW_ROOT}/ingestion_date={INGESTION_DATE}"]:
    dbutils.fs.mkdirs(p)



In [None]:
from pyspark.sql import functions as F

PATH_EVENTS_RAW = abfss("raw", "espn/ufc/events")

df_events = spark.read.json(f"{PATH_EVENTS_RAW}/ingestion_date=*/event_*.json")

refs_df = (df_events
    .select(F.explode(F.col("competitions")).alias("comp"))
    .select(F.explode(F.col("comp.competitors")).alias("cmp"))
    .select(F.col("cmp.athlete.$ref").alias("ref"))
    .where(F.col("ref").isNotNull())
)
athlete_ids_df = refs_df.select(F.regexp_extract(F.col("ref"), r"/athletes/(\d+)", 1).alias("athlete_id")).where(F.col("athlete_id") != "").distinct()
athlete_ids = [r["athlete_id"] for r in athlete_ids_df.collect()]
print(f"Found unique athlete IDs: {len(athlete_ids)}")


In [None]:
# Read athlete IDs from upstream task (fallback to RAW scan)
try:
    ids_json = dbutils.jobs.taskValues.get(taskKey="ingest_fights", key="athlete_ids_json", debugValue="[]")
    upstream_ids = json.loads(ids_json) if ids_json else []
except Exception:
    upstream_ids = []

if upstream_ids:
    athlete_ids = upstream_ids
    print(f"Using upstream athlete IDs: {len(athlete_ids)}")
else:
    print("No upstream IDs found. Falling back to RAW scan (distinct).")
    PATH_EVENTS_RAW = abfss("raw", "espn/ufc/events")
    df_events = spark.read.json(f"{PATH_EVENTS_RAW}/ingestion_date=*/event_*.json")
    refs_df = (df_events
        .select(F.explode(F.col("competitions")).alias("comp"))
        .select(F.explode(F.col("comp.competitors")).alias("cmp"))
        .select(F.col("cmp.athlete.$ref").alias("ref"))
        .where(F.col("ref").isNotNull())
    )
    athlete_ids_df = refs_df.select(F.regexp_extract(F.col("ref"), r"/athletes/(\d+)", 1).alias("athlete_id")).where(F.col("athlete_id") != "").distinct()
    athlete_ids = [r["athlete_id"] for r in athlete_ids_df.collect()]
    print(f"Found unique athlete IDs via RAW: {len(athlete_ids)}")


In [None]:
# Fetch athlete profiles (Core v2)
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

HEADERS = {"User-Agent": USER_AGENT, "Accept": "application/json"}
BASE_ATHLETE = "https://sports.core.api.espn.com/v2/sports/mma/athletes/{athleteId}"
EXPAND = {
    "lang": "en", "region": "us", "contentorigin": "espn",
    "expand": "records,teams,athlete,statistics"
}

def http_get(url, params=None, timeout=HTTP_TIMEOUT, retries=HTTP_RETRIES, backoff=1.5):
    last_exc=None
    for i in range(retries+1):
        try:
            r = requests.get(url, params=params, headers=HEADERS, timeout=timeout)
            if r.status_code == 200:
                return r.json()
            if r.status_code in (429,500,502,503,504):
                raise requests.HTTPError(str(r.status_code))
            r.raise_for_status()
        except Exception as e:
            last_exc=e
            if i<retries:
                time.sleep(backoff**i)
            else:
                raise last_exc

profiles = {}
if athlete_ids:
    with ThreadPoolExecutor(max_workers=min(MAX_CONC, len(athlete_ids))) as ex:
        futs = {ex.submit(http_get, BASE_ATHLETE.format(athleteId=aid), EXPAND): aid for aid in athlete_ids}
        for f in as_completed(futs):
            aid = futs[f]
            try:
                profiles[aid] = f.result()
            except Exception as e:
                profiles[aid] = {"error": str(e)}

print(f"Fetched profiles: {len(profiles)}")


In [None]:
# Landing and raw mirror
base = f"{PATH_LANDING_ROOT}/run_id={RUN_ID}"
for aid, payload in profiles.items():
    dbutils.fs.put(f"{base}/athlete_{aid}.json", json.dumps(payload, ensure_ascii=False), True)
print(f"landing saved: {len(profiles)} files")

src = f"{PATH_LANDING_ROOT}/run_id={RUN_ID}"
dst = f"{PATH_RAW_ROOT}/ingestion_date={INGESTION_DATE}"
try:
    dbutils.fs.cp(src, dst, recurse=True)
except Exception:
    for f in dbutils.fs.ls(src):
        dbutils.fs.cp(f.path, f"{dst}/{f.name}")


In [None]:
# Bronze table and upsert
spark.sql(f"""
CREATE TABLE IF NOT EXISTS hive_metastore.{DB_NAME}.espn_athletes (
  athlete_id STRING,
  full_name STRING,
  display_name STRING,
  birth_date DATE,
  country STRING,
  height STRING,
  height_cm INT,
  reach_cm INT,
  stance STRING,
  weight_class STRING,
  team STRING,
  combat_style STRING,
  image_url STRING,
  ingestion_date DATE,
  run_id STRING,
  raw_payload STRING
) USING DELTA LOCATION '{PATH_ATHLETES_DELTA}'
""")

# Helpers reusing HTTP for optional $ref deref
ref_cache = {}

def ref_get(url: str):
    if not isinstance(url, str):
        return {}
    if url in ref_cache:
        return ref_cache[url]
    try:
        data = http_get(url)
        ref_cache[url] = data
        return data
    except Exception:
        ref_cache[url] = {}
        return {}

import re

def extract_country(p: dict) -> str | None:
    if isinstance(p.get("citizenship"), str) and p.get("citizenship"):
        return p.get("citizenship")
    assoc = p.get("association")
    if isinstance(assoc, dict):
        loc = assoc.get("location")
        if isinstance(loc, dict) and isinstance(loc.get("country"), str):
            return loc.get("country")
    bp = p.get("birthPlace")
    if isinstance(bp, dict):
        if "$ref" in bp:
            d = ref_get(bp["$ref"]) or {}
            return d.get("country") or d.get("name")
        return bp.get("country") or bp.get("name")
    cc = p.get("citizenshipCountry")
    if isinstance(cc, dict):
        return cc.get("alt") or cc.get("name")
    return None

def _parse_display_height_inches(txt: str) -> float | None:
    try:
        m = re.match(r"^(\d+)\s*'\s*(\d+)?\s*\"?", txt.strip())
        if not m:
            return None
        feet = int(m.group(1)); inches = int(m.group(2) or 0)
        return feet * 12 + inches
    except Exception:
        return None

def extract_display_height_str(p: dict) -> str | None:
    dh = p.get("displayHeight")
    if isinstance(dh, str) and dh.strip():
        return dh.strip()
    h = p.get("height")
    try:
        inches = float(h) if isinstance(h, (int, float)) else None
        if inches is not None:
            feet = int(inches // 12); rest = int(round(inches - feet * 12))
            return f"{feet}' {rest}\""
    except Exception:
        pass
    return None

def extract_height_cm(p: dict) -> int | None:
    v = p.get("heightCentimeters")
    if isinstance(v, (int, float)): return int(v)
    h = p.get("height")
    if isinstance(h, dict):
        v = h.get("centimeters") or h.get("cm")
        if isinstance(v, (int, float)): return int(v)
    if isinstance(h, (int, float)):
        return int(round(float(h) * 2.54))
    dh = p.get("displayHeight")
    if isinstance(dh, str):
        inches = _parse_display_height_inches(dh)
        if isinstance(inches, (int, float)):
            return int(round(float(inches) * 2.54))
    meas = p.get("measurements")
    if isinstance(meas, list):
        for m in meas:
            if isinstance(m, dict) and str(m.get("type")).lower() == "height":
                v = m.get("value") or m.get("centimeters")
                if isinstance(v, (int, float)):
                    return int(v)
    return None

def extract_reach_cm(p: dict) -> int | None:
    v = p.get("reachCentimeters")
    if isinstance(v, (int, float)): return int(v)
    r = p.get("reach")
    if isinstance(r, dict):
        v = r.get("centimeters") or r.get("cm")
        if isinstance(v, (int, float)): return int(v)
    if isinstance(r, (int, float)):
        return int(round(float(r) * 2.54))
    dr = p.get("displayReach")
    if isinstance(dr, str):
        try:
            v = float(dr.replace('"','').strip())
            return int(round(v * 2.54))
        except Exception:
            pass
    meas = p.get("measurements")
    if isinstance(meas, list):
        for m in meas:
            if isinstance(m, dict) and str(m.get("type")).lower() == "reach":
                v = m.get("value") or m.get("centimeters")
                if isinstance(v, (int, float)):
                    return int(v)
    return None

def extract_stance(p: dict) -> str | None:
    st = p.get("stance")
    if isinstance(st, dict):
        return st.get("text") or st.get("name")
    if isinstance(st, str):
        return st
    return p.get("fightingStyle") or None

def extract_team(p: dict) -> str | None:
    assoc = p.get("association")
    if isinstance(assoc, dict):
        return assoc.get("name")
    return None

def extract_combat_style(p: dict) -> str | None:
    styles = p.get("styles")
    if isinstance(styles, list):
        texts = []
        for s in styles:
            if isinstance(s, dict):
                t = s.get("text") or s.get("name")
                if t: texts.append(str(t))
        if texts:
            return ", ".join(sorted(set(texts)))
    return None

# Parse profiles into rows
rows = []
for aid, p in profiles.items():
    if not isinstance(p, dict):
        continue
    full_name = p.get("fullName") or p.get("name")
    display_name = p.get("displayName")
    birth_date = p.get("birthDate") or p.get("dateOfBirth")
    country = extract_country(p)
    height_str = extract_display_height_str(p)
    height_cm = extract_height_cm(p)
    reach_cm = extract_reach_cm(p)
    stance = extract_stance(p)
    wc = p.get("weightClass")
    weight_class = wc.get("text") or wc.get("shortName") if isinstance(wc, dict) else None
    team = extract_team(p)
    combat_style = extract_combat_style(p)
    image_url = (p.get("headshot") or {}).get("href") if isinstance(p.get("headshot"), dict) else None
    rows.append((aid, full_name, display_name, birth_date, country, height_str, height_cm, reach_cm, stance, weight_class, team, combat_style, image_url, INGESTION_DATE, RUN_ID, json.dumps(p, ensure_ascii=False)))

schema = "athlete_id STRING, full_name STRING, display_name STRING, birth_date STRING, country STRING, height STRING, height_cm INT, reach_cm INT, stance STRING, weight_class STRING, team STRING, combat_style STRING, image_url STRING, ingestion_date STRING, run_id STRING, raw_payload STRING"
df = spark.createDataFrame(rows, schema)
df = (df
    .withColumn("birth_date", F.to_date("birth_date"))
    .withColumn("ingestion_date", F.to_date("ingestion_date"))
)

# Upsert (avoid disk staging; use temp view)
view_name = f"_s_athletes_{RUN_ID}"
df.createOrReplaceTempView(view_name)
spark.sql(f"""
MERGE INTO hive_metastore.{DB_NAME}.espn_athletes t
USING (SELECT * FROM {view_name}) s
ON t.athlete_id = s.athlete_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
spark.catalog.dropTempView(view_name)

print(f"Upserted athletes: {df.count()}")


In [None]:
# Sample athletes (Python to avoid ${DB_NAME} SQL interpolation issues)
display(spark.table(f"hive_metastore.{DB_NAME}.espn_athletes").orderBy(F.col("full_name")).limit(20))