In [0]:
# covid_lakehouse_.py
# -------------------------------------------------------------------
# End-to-end COVID Lakehouse demo 
# -------------------------------------------------------------------

# ───────────────────────────────────────────────────────────────────────────
# Step 1: Bootstrap Spark + Hive Metastore & Prep Workspace
# ───────────────────────────────────────────────────────────────────────────
from pyspark.sql import SparkSession
import shutil, os

# 1) Launch Spark with Hive support
spark = (
    SparkSession.builder
      .appName("COVID_Lakehouse_COVIDExercise")
      .enableHiveSupport()
      .config("spark.sql.catalogImplementation", "hive")
      .getOrCreate()
)

# 2) Create / switch to our covid database
spark.sql("USE CATALOG hive_metastore")
spark.sql("CREATE DATABASE IF NOT EXISTS covid")
spark.sql("USE covid")

# 3) Define and clean our exercise folder (all in DBFS!)
base = "dbfs:/tmp/COVID_exercise"
dbutils.fs.rm(base, recurse=True)

# 4) Confirmation
print(f"✅ Workspace initialized under {base}")
print("   Hive database 'covid' is ready, Spark session active.")


# ───────────────────────────────────────────────────────────────────────────
# Step 2: Read real CSV files & show samples, then Bronze ingestion
# ───────────────────────────────────────────────────────────────────────────
from pyspark.sql.functions import lit, col, current_timestamp, to_date
from delta.tables import DeltaTable

data_dir = "dbfs:/databricks-datasets/COVID/covid-19-data"
bronze_counties_path = f"{base}/bronze_counties"
bronze_recent_path   = f"{base}/bronze_recent"
bronze_states_path   = f"{base}/bronze_states"

# 1) Read each CSV with schema inference
counties_df = spark.read.option("header",True).option("inferSchema",True) \
                    .csv(f"{data_dir}/us-counties.csv")
recent_df   = spark.read.option("header",True).option("inferSchema",True) \
                    .csv(f"{data_dir}/us-counties-recent.csv")
states_df   = spark.read.option("header",True).option("inferSchema",True) \
                    .csv(f"{data_dir}/us-states.csv")

# 2) Show samples
print("👉 us-counties.csv sample:")
counties_df.show(5, truncate=False); counties_df.printSchema()
print("👉 us-counties-recent.csv sample:")
recent_df.show(5, truncate=False);   recent_df.printSchema()
print("👉 us-states.csv sample:")
states_df.show(5, truncate=False);   states_df.printSchema()

# 3) Bronze | counties
dbutils.fs.rm(bronze_counties_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS bronze_counties")
spark.sql(f"""
  CREATE TABLE bronze_counties (
    date        DATE,
    county      STRING,
    state       STRING,
    fips        INT,
    cases       INT,
    deaths      INT,
    source      STRING,
    ingest_ts   TIMESTAMP
  ) USING DELTA
  LOCATION '{bronze_counties_path}'
""")
counties_df \
  .withColumn("date", to_date("date")) \
  .withColumn("source", lit("counties_full")) \
  .withColumn("ingest_ts", current_timestamp()) \
  .write.format("delta") \
  .mode("overwrite") \
  .save(bronze_counties_path)

# ───────────────────────────────────────────────────────────────────────────
# **NEW**: enable Change Data Feed on our Bronze table
spark.sql("""
  ALTER TABLE bronze_counties
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
print("✅ CDF enabled on bronze_counties")
# ───────────────────────────────────────────────────────────────────────────

print("✅ bronze_counties ready")
spark.table("bronze_counties").show(5, truncate=False)

# 4) Bronze | recent
dbutils.fs.rm(bronze_recent_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS bronze_recent")
spark.sql(f"""
  CREATE TABLE bronze_recent (
    date        DATE,
    county      STRING,
    state       STRING,
    fips        INT,
    cases       INT,
    deaths      INT,
    source      STRING,
    ingest_ts   TIMESTAMP
  ) USING DELTA
  LOCATION '{bronze_recent_path}'
""")
recent_df \
  .withColumn("date", to_date("date")) \
  .withColumn("source", lit("counties_recent")) \
  .withColumn("ingest_ts", current_timestamp()) \
  .write.format("delta") \
  .mode("overwrite") \
  .save(bronze_recent_path)

print("✅ bronze_recent ready")
spark.table("bronze_recent").show(5, truncate=False)

# 5) Bronze | states
dbutils.fs.rm(bronze_states_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS bronze_states")
spark.sql(f"""
  CREATE TABLE bronze_states (
    date        DATE,
    state       STRING,
    fips        INT,
    cases       INT,
    deaths      INT,
    source      STRING,
    ingest_ts   TIMESTAMP
  ) USING DELTA
  LOCATION '{bronze_states_path}'
""")
states_df \
  .withColumn("date", to_date("date")) \
  .withColumn("source", lit("states")) \
  .withColumn("ingest_ts", current_timestamp()) \
  .write.format("delta") \
  .mode("overwrite") \
  .save(bronze_states_path)

print("✅ bronze_states ready")
spark.table("bronze_states").show(5, truncate=False)


# ───────────────────────────────────────────────────────────────────────────
# Step 3: Create Silver 1 & checkpoint table
# (from new3_Silver1.py)
# ───────────────────────────────────────────────────────────────────────────
silver1_path = f"{base}/silver_county"
dbutils.fs.rm(silver1_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS silver_county")
spark.sql(f"""
  CREATE TABLE silver_county (
    report_date DATE,
    county      STRING,
    state       STRING,
    cases       INT,
    deaths      INT,
    source      STRING,
    ingest_ts   TIMESTAMP
  ) USING DELTA
  PARTITIONED BY (state)
  LOCATION '{silver1_path}'
""")
spark.sql("DROP TABLE IF EXISTS silver_county_ckpt")
spark.sql("""
  CREATE TABLE silver_county_ckpt (
    last_ts TIMESTAMP
  ) USING DELTA
""")
print("✅ silver_county + silver_county_ckpt created")


# ───────────────────────────────────────────────────────────────────────────
# Step 4: Define the upsert function (now using CDF)
# (from new4_Func.py, with CDF swap)
# ───────────────────────────────────────────────────────────────────────────
from datetime import datetime
from pyspark.sql.functions import lit, col, to_date

def process_silver_county():
    # 1) Read last watermark (or default to epoch)
    ck = spark.table("silver_county_ckpt").collect()
    last_ts = ck[0]["last_ts"] if ck and ck[0]["last_ts"] else datetime(1970,1,1)

    # 2) Read only the change events from bronze_counties since last_ts
    cdf_df = (
      spark.read.format("delta")
           .option("readChangeData", "true")
           .option("startingTimestamp", last_ts)
           .table("bronze_counties")
           .select("date","county","state","fips","cases","deaths","source","ingest_ts")
    )

    # 3) Also include any new rows from bronze_recent
    recent_df = spark.table("bronze_recent") \
                     .filter(col("ingest_ts") > lit(last_ts))

    # 4) Union CDF + recent
    new_df = cdf_df.unionByName(recent_df, allowMissingColumns=True)

    # 5) Clean & cast
    clean_df = (
      new_df
        .filter((col("_change_type") != "delete") & col("county").isNotNull() & (col("cases") >= 0))
        .withColumn("report_date", to_date(col("date")))
        .select("report_date","county","state","cases","deaths","source","ingest_ts")
    )

    # 6) MERGE into Silver
    silver = DeltaTable.forName(spark, "silver_county")
    (
      silver.alias("t")
            .merge(clean_df.alias("s"),
                   "t.report_date = s.report_date AND t.county = s.county")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
    )

    # 7) Advance watermark to now
    new_ts = spark.sql("SELECT current_timestamp() AS ts").first()["ts"]
    spark.createDataFrame([(new_ts,)], ["last_ts"]) \
         .write.format("delta") \
         .mode("overwrite") \
         .saveAsTable("silver_county_ckpt")

    print(f"✅ Silver County CDF-upsert from {last_ts} to {new_ts}")

# Run the Silver upsert once
process_silver_county()
spark.table("silver_county").show(5, truncate=False)


# ───────────────────────────────────────────────────────────────────────────
# Step 5: Build Silver 2 & Gold tables
# (unchanged)
# ───────────────────────────────────────────────────────────────────────────
from pyspark.sql.functions import sum as _sum, window, avg, date_trunc, col

silver2_path = f"{base}/silver_state_daily"
dbutils.fs.rm(silver2_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS silver_state_daily")
spark.sql(f"""
  CREATE TABLE silver_state_daily (
    report_date  DATE,
    state        STRING,
    total_cases  LONG,
    total_deaths LONG
  ) USING DELTA
  PARTITIONED BY (report_date)
  LOCATION '{silver2_path}'
""")
(spark.table("silver_county")
      .groupBy("report_date","state")
      .agg(
        _sum("cases").alias("total_cases"),
        _sum("deaths").alias("total_deaths")
      )
      .write.format("delta")
      .mode("overwrite")
      .option("overwriteSchema","true")
      .save(silver2_path)
)
print("✅ silver_state_daily built")
spark.table("silver_state_daily").show(10, truncate=False)

gold1_path = f"{base}/gold_state_ma"
dbutils.fs.rm(gold1_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS gold_state_ma")
gold1_df = (
    spark.table("silver_state_daily")
         .withWatermark("report_date", "1 day")
         .groupBy(
             col("state"),
             window(col("report_date"), "7 days", "1 day").alias("w")
         )
         .agg(
             avg("total_cases").alias("avg_cases"),
             avg("total_deaths").alias("avg_deaths")
         )
         .selectExpr("state","w.end AS report_date","avg_cases","avg_deaths")
)
gold1_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema","true") \
    .saveAsTable("gold_state_ma")
print("✅ gold_state_ma built")
spark.table("gold_state_ma").show(5, truncate=False)

gold2_path = f"{base}/gold_weekly_summary"
dbutils.fs.rm(gold2_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS gold_weekly_summary")
gold2_df = (
    spark.table("silver_state_daily")
         .withColumn("week_start",date_trunc("week",col("report_date")))
         .groupBy("state","week_start")
         .agg(
            _sum("total_cases").alias("total_cases"),
            _sum("total_deaths").alias("total_deaths")
         )
)
gold2_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema","true") \
    .saveAsTable("gold_weekly_summary")
print("✅ gold_weekly_summary built")
spark.table("gold_weekly_summary") \
     .orderBy("week_start","state") \
     .show(5, truncate=False)
