#### Import

In [0]:
from pyspark.sql.functions import (
    col,
    lit,
    concat_ws,
    when,
    to_timestamp,
    current_timestamp
)
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException
import datetime

#### 0. config

In [0]:
# Define widgets (default values optional â€“ good for local testing)
dbutils.widgets.text("bronze_container", "bronze")
dbutils.widgets.text("silver_container", "silver")
dbutils.widgets.text("sgaccount", "sgonpremtoclouddev")

dbutils.widgets.text("source_schema", "dbo")
dbutils.widgets.text("source_table", "Inventory")

dbutils.widgets.text("CATALOG", "onpremtocloud_dev_catalog")
dbutils.widgets.text("SILVER_SCHEMA", "silver")

# Read values passed from ADF
bronze_container = dbutils.widgets.get("bronze_container")
silver_container = dbutils.widgets.get("silver_container")
sgaccount        = dbutils.widgets.get("sgaccount")

source_schema    = dbutils.widgets.get("source_schema")
source_table     = dbutils.widgets.get("source_table")

CATALOG          = dbutils.widgets.get("CATALOG")
SILVER_SCHEMA    = dbutils.widgets.get("SILVER_SCHEMA")

# bronze_container = "bronze"
# silver_container = "silver"
# sgaccount = "sgonpremtoclouddev"

# source_schema = "dbo"
# source_table = "Inventory"

# CATALOG = "onpremtocloud_dev_catalog"
# SILVER_SCHEMA = "silver"

silver_table_fqn = f"{CATALOG}.{SILVER_SCHEMA}.{source_table}"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SILVER_SCHEMA}")

#### 1. ADLS path framing

In [0]:
today = datetime.date.today()
year = today.strftime("%Y")
month = today.strftime("%m")
day = today.strftime("%d")

# Bronze: daily parquet landing from ADF
bronze_path = (
    f"abfss://{bronze_container}@{sgaccount}.dfs.core.windows.net/"
    f"{year}/{month}/{day}/{source_schema}_{source_table}.parquet"
)
print(f"Bronze path: {bronze_path}")

# Silver: stable Delta table location (no date)
silver_path = (
    f"abfss://{silver_container}@{sgaccount}.dfs.core.windows.net/{source_table}"
)
print(f"Silver path: {silver_path}")

#### 2. Read raw customers from bronze

In [0]:
df_bz_inventory = spark.read.format("parquet").load(bronze_path)

#### 3. Basic transformations & SCD columns

In [0]:
load_ts = current_timestamp()  # one timestamp per run

df_stage = (
    df_bz_inventory
    # Deduplicate by business key (adjust if composite key)
    .dropDuplicates(["ProductID"])
    # Core business columns
    .fillna({"StockQuantity": 0.0})
    .withColumn("CreatedDate", to_timestamp("CreatedDate"))
    .withColumn("ModifiedDate", to_timestamp("ModifiedDate"))
    .withColumn("IsLowStock", when(col("StockQuantity")<10, lit(True)).otherwise(lit(False)))
    .withColumn("LoadDate", load_ts)
)


#### 4. Check if silver table already exists

In [0]:
try:
    silver_df = spark.table(silver_table_fqn)
    table_exists = True
    silver_schema = silver_df.schema
    print(f"Silver table exists: {silver_table_fqn}")
except AnalysisException:
    table_exists = False
    silver_schema = None
    print(f"Silver table DOES NOT exist yet: {silver_table_fqn}")


#### 6. Insert data into silver layer

In [0]:
if not table_exists:
    # First run: create Delta + UC table
    print("First load: creating Delta at silver_path and registering UC table...")

    # Write initial full snapshot as Delta
    (
        df_stage
        .write
        .format("delta")
        .mode("overwrite")
        .save(silver_path)
    )

    # Register as external table in Unity Catalog
    spark.sql(f"""
        CREATE TABLE {silver_table_fqn}
        USING DELTA
        LOCATION '{silver_path}'
    """)

    print(f"Initial silver table created: {silver_table_fqn}")

else:
    # Subsequent runs: handle schema drift & SCD2 merge
    print("Aligning schema with existing silver and applying SCD1 MERGE...")

    # 6.1 Align incoming data (df_stage) to existing silver schema (dynamic handling)
    df = df_stage

    # Add any missing columns as nulls, cast existing to expected types
    
    for field in silver_schema:
        if field.name not in df.columns:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))
        else:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))

    # Keep only columns that exist in silver, in correct order
    aligned_cols = [f.name for f in silver_schema]
    updates_df = df.select(aligned_cols).alias("updates")

    # 6.2 Load DeltaTable for silver
    delta_inventory = DeltaTable.forPath(spark, silver_path)

    # SCD2 step 1: close changed current records
    (
        delta_inventory.alias("existing")
        .merge(
            updates_df,
            """
            existing.ProductID = updates.ProductID
            """
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("SCD1 merge for silver.Inventory completed")
