In [0]:
# UFC Lakehouse - Ingestão a partir de arquivos "landing" no Workspace Files (WSFS)
#
# Params:
#   landing_dir : /Users/<email>/ufc-lakehouse/landing/dt=YYYY-MM-DD
#   run_date    : YYYY-MM-DD

dbutils.widgets.text("landing_dir", "")
dbutils.widgets.text("run_date", "")

landing_dir = dbutils.widgets.get("landing_dir").strip()
run_date = dbutils.widgets.get("run_date").strip()

assert landing_dir, "Param landing_dir vazio"
assert run_date, "Param run_date vazio"

from pyspark.sql import functions as F

# Spark lê arquivos do workspace via file:/Workspace/...
# Normaliza:
#  - se vier /Users/... -> vira /Workspace/Users/...
if landing_dir.startswith("/Workspace/"):
    ws_dir = landing_dir
else:
    ws_dir = "/Workspace" + landing_dir

events_path   = "file:" + ws_dir + "/events.json"
fights_path   = "file:" + ws_dir + "/fights.jsonl"
fighters_path = "file:" + ws_dir + "/fighters.jsonl"

print("[ingest] events_path=", events_path)
print("[ingest] fights_path=", fights_path)
print("[ingest] fighters_path=", fighters_path)
print("[ingest] run_date=", run_date)

# Lê
events_df = spark.read.json(events_path).withColumn("run_date", F.lit(run_date))
fights_df = spark.read.json(fights_path).withColumn("run_date", F.lit(run_date))
fighters_df = spark.read.json(fighters_path).withColumn("run_date", F.lit(run_date))

# Salva Bronze
events_df.write.mode("append").format("delta").partitionBy("run_date").saveAsTable("bronze_ufc_events")
fights_df.write.mode("append").format("delta").partitionBy("run_date").saveAsTable("bronze_ufc_fights")
fighters_df.write.mode("append").format("delta").partitionBy("run_date").saveAsTable("bronze_ufc_fighters")

display(fights_df.limit(10))
