### Clean up tables, schemas, paths

In [0]:
# ================================================================
# 🧹 CLEANUP CELL — Drop tables, schema, and/or physical paths
# ================================================================

# ==== CONFIGURE THESE ==== 
drop_tables = "yes"     # "yes" / "no" — Drop tables
drop_schema = "yes"      # "yes" / "no" — Drop schema
drop_paths  = "yes"      # "yes" / "no" — Delete all data folders (irreversible!)

catalog = "main"        # or "hive_metastore" if not using Unity Catalog
schema  = "demo_schema_migration"
table   = "bronze_customers_demo"

full_table = f"{catalog}.{schema}.{table}"

# ==== 1️⃣ DROP TABLE(S) =======================================================
if drop_tables.lower() == "yes":
    print(f"🗂 Dropping table if exists: {full_table}")
    try:
        spark.sql(f"DROP TABLE IF EXISTS {full_table}")
        print("   ✅ Table dropped (or didn’t exist)")
    except Exception as e:
        print(f"   ⚠️ DROP TABLE failed: {e}")
else:
    print("🚫 Skipping table drop (drop_tables != 'yes')")

# ==== 2️⃣ DROP SCHEMA =========================================================
if drop_schema.lower() == "yes":
    print(f"🏷  Dropping schema if exists: {catalog}.{schema}")
    try:
        spark.sql(f"USE CATALOG {catalog}")
        # UC: DROP SCHEMA, HMS: DROP DATABASE — both accept CASCADE in Databricks
        spark.sql(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
        print("   ✅ Dropped schema (and all contained objects)")
    except Exception as e:
        print(f"   ⚠️ DROP SCHEMA failed: {e}")
else:
    print("🚫 Skipping schema drop (drop_schema != 'yes')")

# ==== 3️⃣ DELETE PHYSICAL PATHS ==============================================
if drop_paths.lower() == "yes":
    print("🗑 Deleting data and auxiliary folders...")
    paths_to_delete = [raw_path, schema_loc_path, delta_path, chk1, chk2, base_path]
    for p in paths_to_delete:
        try:
            print(f"   ➜ Deleting: {p}")
            dbutils.fs.rm(p, recurse=True)
            print("      ✅ Deleted successfully")
        except Exception as e:
            print(f"      ⚠️ Could not delete {p}: {e}")
else:
    print("🚫 Skipping path deletion (drop_paths != 'yes')")

print("\n✅ Cleanup completed (based on selected parameters).")


### 1️⃣ Environment Setup & SAS Authentication


In [0]:
# ==== EDIT THESE ==== # <-- Section you customize per environment/run

storage_account = "peterunitystorage"      
# Azure Storage Account name (ADLS Gen2). Needed to build abfss:// URLs.
container       = "peterunitycontainer"
# ADLS Gen2 container (filesystem) that holds your data & table files.
sas_token       = "?sp=...&se=...&sig=..."
# SAS token granting time-boxed access. Lets Spark read/write without AAD creds.

# Hive DB name (non-UC). If UC, just USE main.<schema>
schema_name     = "demo_schema_migration"
# Target database/schema for CREATE TABLE (legacy Hive metastore).
table_name      = "bronze_customers_demo"    
# Logical table name; also used to derive storage folder paths below.

# Spark auth for SAS (Azure ADLS Gen2)
# Tell Hadoop ABFS driver to authenticate to this storage account using SAS.
spark.conf.set(
  f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS"
)

# Use a fixed (pre-generated) SAS token provider at runtime.
spark.conf.set(
  f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net",
  "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)

# The actual SAS token value used by the provider above.
spark.conf.set(
  f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

# Paths
# Root folder for this dataset/table in ADLS Gen2 using the ABFS(S) scheme.
base_path       = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{table_name}"

# Landing zone for raw source files (e.g., JSON/CSV) per table.
raw_path        = f"{base_path}/raw"

# Auto Loader schema evolution store (_schemas). Tracks inferred/updated schema.
schema_loc_path = f"{base_path}/_schemas"

# Where the Delta Lake table files (parquet + _delta_log) are written.
delta_path      = f"{base_path}/delta"       

# Streaming checkpoint dir for pipeline version 1 (offsets, progress, state).
chk1            = f"{base_path}/_chk_v1"
# A separate checkpoint for an alternate pipeline/version to avoid conflicts.
chk2            = f"{base_path}/_chk_v2"

### 1️⃣/A Inspect All Paths and Show Their Contents

In [0]:
# ================================================================
# INSPECT STORAGE STRUCTURE FOR CURRENT TABLE PIPELINE
# ================================================================

# Show the paths used in this environment
print("=== PATHS CONFIGURED ===")
print(f"Base Path        : {base_path}")
print(f"Raw Path         : {raw_path}")
print(f"Schema Log Path  : {schema_loc_path}")
print(f"Delta Table Path : {delta_path}")
print(f"Checkpoint v1    : {chk1}")
print(f"Checkpoint v2    : {chk2}")
print("=========================================\n")

# Helper function to list a folder's contents safely
def show_folder(path: str):
    print(f"\n📁 {path}")
    try:
        files = dbutils.fs.ls(path)
        if len(files) == 0:
            print("   (empty)")
        else:
            for f in files:
                print(f"   {f.name:<40}  {f.size/1024:.1f} KB")
    except Exception as e:
        print(f"   ❌ Could not list: {e}")

# List each important folder
for p in [base_path, raw_path, schema_loc_path, delta_path, chk1, chk2]:
    show_folder(p)


### Drop the table(s) first (idempotent)

In [0]:
# Adjust if your table/schema names differ
catalog = "main"  # or "hive_metastore" if you used legacy HMS
schema  = "demo_schema_migration"
table   = "bronze_customers_demo"

full_table = f"{catalog}.{schema}.{table}"

print(f"🗂 Dropping table if exists: {full_table}")
try:
    spark.sql(f"DROP TABLE IF EXISTS {full_table}")
    print("   ✅ Dropped (or didn’t exist)")
except Exception as e:
    print("   ⚠️ DROP TABLE failed:", e)


### Drop the schema (catalog) entry (idempotent)

In [0]:
catalog = "main"  # or "hive_metastore"
schema  = "demo_schema_migration"

print(f"🏷  Dropping schema if exists: {catalog}.{schema}")
try:
    spark.sql(f"USE CATALOG {catalog}")
    # UC uses DROP SCHEMA; HMS uses DROP DATABASE — both accept CASCADE on Databricks
    spark.sql(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
    print("   ✅ Dropped schema (and all catalog objects within)")
except Exception as e:
    print("   ⚠️ DROP SCHEMA failed:", e)

### Drop the paths

In [0]:
paths_to_delete = [raw_path, schema_loc_path, delta_path, chk1, chk2, base_path]

for p in paths_to_delete:
    try:
        print(f"🗑 Deleting: {p}")
        dbutils.fs.rm(p, recurse=True)
        print("   ✅ Deleted successfully")
    except Exception as e:
        print(f"   ⚠️ Could not delete {p}: {e}")

## 2️⃣ Prepare Sample Input Files (v1 & v2)

In [0]:
# Purpose: create small JSON datasets
# v1 (base schema)
# v2 (adds new column, bad type, unexpected column).

# v1
dbutils.fs.mkdirs(f"{raw_path}/v1")
dbutils.fs.put(f"{raw_path}/v1/customers_1.json", """
{"id":1,"name":"Alice","age":31}
{"id":2,"name":"Bob","age":28}
""", True)

# v2 (new column + type mismatch + extra unexpected column)
dbutils.fs.mkdirs(f"{raw_path}/v2")
dbutils.fs.put(f"{raw_path}/v2/customers_2.json", """
{"id":3,"name":"Chao","age":35,"email":"chao@example.com"}
{"id":4,"name":"Dana","age":"unknown","email":"dana@example.com","country":"PL"}
""", True)

## 3️⃣ Auto Loader – Initial Ingest (v1, v2 → Delta Bronze)

In [0]:
from pyspark.sql.functions import input_file_name, lit

# --- Config you already have ---
# raw_path, delta_path, table_name, schema_loc_path, chk1, chk2 must exist

# Optional: register the Delta table (idempotent)
spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{delta_path}'")

# Describe the two independent ingests
ingests = [
    {
        "version": "v1",
        "source_path": f"{raw_path}/v1",
        "checkpoint": chk1,
        "add_source_file": True,
    },
    {
        "version": "v2",
        "source_path": f"{raw_path}/v2",
        "checkpoint": chk2,
        "add_source_file": False,
    },
]

# Run each ingest sequentially with trigger(once)
for cfg in ingests:
    stream_df = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", schema_loc_path)      
            # shared schema log (fine)
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.schemaEvolutionMode", "rescue")
            .option("rescuedDataColumn", "_rescued_data")
            .load(cfg["source_path"])
            .withColumn("_ingest_version", lit(cfg["version"]))
    )
    if cfg["add_source_file"]:
        stream_df = stream_df.withColumn("_source_file", input_file_name())

    q = (
        stream_df.writeStream
            .format("delta")
            .option("checkpointLocation", cfg["checkpoint"])
            .option("mergeSchema", "true")
            .outputMode("append")
            .trigger(once=True)
            .start(delta_path)
    )
    q.awaitTermination()  # ensures v1 finishes before v2 starts (or vice versa)

# Show results
display(spark.read.table(table_name).orderBy("id"))

## 4️⃣ Auto Loader – Schema Evolution & Rescue Mode (v2 → Delta Bronze)

### 5️⃣ Schema Enforcement – Delta Rejects Incompatible Write

In [0]:
from pyspark.sql import Row

df_newcol = spark.createDataFrame([Row(id=5, name="Eve", age=29, vip=True)])

try:
    (df_newcol.write.format("delta").mode("append").save(delta_path))  # no mergeSchema
except Exception as e:
    print("Expected failure (schema enforcement):\n", str(e)[:500])

### 6️⃣ Schema Evolution – Add New Column with mergeSchema

In [0]:
(df_newcol.write
   .format("delta")
   .mode("append")
   .option("mergeSchema","true")
   .save(delta_path))

display(spark.read.table(table_name).orderBy("id"))


### 7️⃣ Schema Merging – Union Two Sources with Different Columns

In [0]:
from pyspark.sql import Row

# Build two different DataFrames with non-matching schemas

df_a = spark.createDataFrame([Row(id=6, name="Fran", age=40)])
df_b = spark.createDataFrame([Row(id=7, name="Göran", email="g@example.com")])

# Union-merge these schemas >
# allowMissingColumns=True tells to allow Missing columns and Spark fills them Null
# For columns that exist in both inputs >> their data types must be compatible 
# (Spark may upcast, but it won’t merge fundamentally incompatible types)

union_merged = df_a.unionByName(df_b, allowMissingColumns=True)
# Spark has two main kinds of “union” operations:
# DataFrame.union() >> Stacks two DataFrames row-wise. Columns are matched by position (order must match). Same number of columns and compatible data types required.
# DataFrame.unionByName() >> Stacks two DataFrames row-wise, but aligns columns by name instead of position. Can optionally allow missing columns (allowMissingColumns=True) to fill nulls automatically.

# Result merged schema: id, name, age, email.
# where Row from df_a → email = null
# where Row from df_b → age = null

# Append union-merged dataframe to an existing Delta table at delta_path.
(union_merged.write
  .format("delta")
  .mode("append")
  .option("mergeSchema","true") 
  .save(delta_path))

# .option("mergeSchema","true") >> lets Delta evolve the table schema to include new columns present in the DataFrame (here: email), instead of throwing a schema mismatch error.
# Delta will preserve existing columns’ order and append new columns at the end; the new column(s) will be nullable.

display(spark.read.table(table_name).orderBy("id"))

### 8️⃣ Normalize Schema Before Union (Handling Type Mismatch)

In [0]:
from pyspark.sql.functions import expr, lit

# Read raw JSON
df_a = spark.read.json(f"{raw_path}/v1")  # has: id, name, age (numeric)
df_b = spark.read.json(f"{raw_path}/v2")  # has: id, name, age (sometimes "unknown"), email

# 1) Normalize schemas BEFORE union:
# - df_a: add missing 'email'
# - both: cast age safely (string->long), non-numeric => NULL
df_a_norm = (
    df_a
    .withColumn("email", lit(None).cast("string"))
    .withColumn("age", expr("try_cast(age as long)"))
)

df_b_norm = (
    df_b
    .withColumn("age", expr("try_cast(age as long)"))
    # ensure email exists & is string (already is, but explicit for clarity)
    .withColumn("email", expr("cast(email as string)"))
)

# 2) Union by name allowing missing columns (now types align)
merged = df_a_norm.unionByName(df_b_norm, allowMissingColumns=True)

# 3) Write back (age stays BIGINT; 'unknown' becomes NULL)
(merged.write
   .format("delta")
   .mode("overwrite")            # overwrite for a clean combined result
   .option("mergeSchema","true") # allow adding 'email' if not present
   .save(delta_path))

display(spark.read.table(table_name).orderBy("id"))
