## Paths and Resource Locations

In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from datetime import datetime
import time, uuid
import traceback

# --- PATHS ---
silver_path = "abfss://silver@storageaccpiechk.dfs.core.windows.net/customers/"
gold_path   = "abfss://gold@storageaccpiechk.dfs.core.windows.net/DimCustomers/"
gold_dim_table = "databricks_cata.gold.DimCustomers"

checkpoint_log_table = "databricks_cata.gold.checkpoint_log"
checkpoint_table_name = "DimCustomers"

batch_log_table = "databricks_cata.gold.batch_log"

## Configuration

In [0]:
ZORDER_COLS = "customer_sk"

# tune for your environment
TARGET_FILE_SIZE = 128 * 1024 * 1024
MAX_RETRIES = 2
RETRY_BACKOFF_SEC = 10

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.databricks.delta.targetFileSize", str(TARGET_FILE_SIZE))

## Version & Checkpoint Management Functions

In [0]:
def get_current_silver_version():
    hist = spark.sql(f"DESCRIBE HISTORY delta.`{silver_path}`")
    maxv = hist.select(F.max(F.col('version'))).collect()[0][0]
    return maxv

def get_last_processed_version():
    df = spark.sql(f"SELECT last_processed_version FROM {checkpoint_log_table} WHERE table_name = '{checkpoint_table_name}'")
    if df.count() == 0:
        return None
    return df.collect()[0][0]

def upsert_checkpoint(version):
    spark.sql(f"""
    MERGE INTO {checkpoint_log_table} AS c
    USING (SELECT '{checkpoint_table_name}' AS table_name, {int(version)} AS last_processed_version, current_timestamp() AS last_processed_ts) AS s
    ON c.table_name = s.table_name
    WHEN MATCHED THEN UPDATE SET c.last_processed_version = s.last_processed_version, c.last_processed_ts = s.last_processed_ts
    WHEN NOT MATCHED THEN INSERT (table_name, last_processed_version, last_processed_ts) VALUES (s.table_name, s.last_processed_version, s.last_processed_ts)
    """)

## Batch Logging & ID Utilities

In [0]:
def log_batch_start(batch_id, starting_version, ending_version):
    spark.sql(f"INSERT INTO {batch_log_table} (batch_id, table_name, starting_version, ending_version, row_count, status, started_ts) VALUES ('{batch_id}', '{checkpoint_table_name}', {starting_version}, {ending_version}, 0, 'RUNNING', current_timestamp())")

def log_batch_end(batch_id, row_count, status='SUCCESS', error_msg=None):
    err = f"'{error_msg.replace("'","\''")}'" if error_msg else 'NULL'
    spark.sql(f"UPDATE {batch_log_table} SET row_count = {int(row_count)}, status = '{status}', finished_ts = current_timestamp(), error_msg = {err} WHERE batch_id = '{batch_id}'")

def generate_batch_id(prefix="DimCustomers", attempt_num=None):
    now = datetime.now()
    timestamp = now.strftime("%Y%m%d_%H%M%S")
    if attempt_num is not None:
        return f"{prefix}_{timestamp}_try{attempt_num}"
    else:
        return f"{prefix}_{timestamp}"

## Table Creation & Initialization

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {gold_dim_table} (
    customer_sk BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
    customer_id STRING,
    email STRING,
    city STRING,
    state STRING,
    email_domain STRING,
    full_name STRING,
    hash_value STRING,
    last_update_ts TIMESTAMP,
    valid_from TIMESTAMP,
    valid_to TIMESTAMP,
    is_current BOOLEAN
)
USING DELTA
LOCATION '{gold_path}'
""")

DataFrame[]

## Main Processing Loop with Retry and Error Handling

In [0]:
attempt = 0
while attempt < MAX_RETRIES:
    attempt += 1
    batch_id = generate_batch_id(prefix=checkpoint_table_name, attempt_num=attempt)

    try:
        print(f"Attempt {attempt} - Batch ID: {batch_id}")

        last_version = get_last_processed_version()
        current_version = get_current_silver_version()

        print(f"Last processed Silver version: {last_version}")
        print(f"Current Silver version: {current_version}")

        if last_version is not None and last_version >= current_version:
            print("No new changes to process.")
            break

        starting_version = 0 if last_version is None else int(last_version) + 1
        ending_version = current_version

        # Skip if already succeeded for this range
        existing = spark.sql(f"""
            SELECT batch_id FROM {batch_log_table}
            WHERE table_name = '{checkpoint_table_name}'
              AND starting_version = {starting_version}
              AND ending_version = {ending_version}
              AND status = 'SUCCESS'
        """)
        if existing.count() > 0:
            print("This version range already processed. Skipping.")
            break

        # Create batch log
        log_batch_start(batch_id, starting_version, ending_version)

        print(f"Reading CDF from version {starting_version} to {ending_version} ...")

        # Read CDF from silver
        cdf_df = (
            spark.read.format("delta")
            .option("readChangeFeed", "true")
            .option("startingVersion", str(starting_version))
            .option("endingVersion", str(ending_version))
            .load(silver_path)
        )

        # Filter inserts and post-images
        changed_df = cdf_df.filter(F.col("_change_type").isin("insert", "update_postimage"))
        if changed_df.limit(1).count() == 0:
            print("No changes detected in Silver CDF.")
            upsert_checkpoint(ending_version)
            log_batch_end(batch_id, 0, status='SUCCESS')
            break

        # Prepare SCD2 records
        new_records = (
            changed_df
            .select(
                "customer_id", "email", "city", "state", "email_domain", "full_name",
                "hash_value", "last_update_ts"
            )
            .withColumn("valid_from", F.current_timestamp())
            .withColumn("valid_to", F.lit(None).cast("timestamp"))
            .withColumn("is_current", F.lit(True))
        )

        delta_gold = DeltaTable.forPath(spark, gold_path)

        # Expire old versions
        delta_gold.alias("g") \
            .merge(
                new_records.select("customer_id").alias("s"),
                "g.customer_id = s.customer_id AND g.is_current = true"
            ) \
            .whenMatchedUpdate(set={
                "valid_to": F.current_timestamp(),
                "is_current": F.lit(False)
            }) \
            .execute()

        # Append new versions
        new_records.write.format("delta").mode("append").save(gold_path)

        updated_count = new_records.count()
        print(f"Upserted {updated_count} records to Gold DimCustomers")

        # Update checkpoint and batch log
        upsert_checkpoint(ending_version)
        log_batch_end(batch_id, updated_count, status='SUCCESS')

        # Optimize gold table
        print("Running OPTIMIZE + ZORDER ...")
        spark.sql(f"OPTIMIZE delta.`{gold_path}` ZORDER BY {ZORDER_COLS}")

        print("Batch completed successfully.")
        break

    except Exception as e:

        # Handle errors
        error_trace = traceback.format_exc() 
        print(f"Error during batch {batch_id}: {e}")
        print(error_trace) 
        
        log_batch_end(batch_id, 0, status='FAILED', error_msg=error_trace)

        if attempt < MAX_RETRIES:
            sleep_time = RETRY_BACKOFF_SEC * (2 ** (attempt - 1))
            print(f"Retrying after {sleep_time} sec...")
            time.sleep(sleep_time)
        else:
            raise

print("DimCustomers pipeline finished.")

Attempt 1 - Batch ID: DimCustomers_20251020_091926_try1
Last processed Silver version: 1
Current Silver version: 3
Reading CDF from version 2 to 3 ...
Upserted 10 records to Gold DimCustomers
Running OPTIMIZE + ZORDER ...
Batch completed successfully.
DimCustomers pipeline finished.


## Check Checkpoint Log

In [0]:
%sql
select *
from databricks_cata.gold.checkpoint_log

table_name,last_processed_version,last_processed_ts
DimCustomers,3,2025-10-20T09:19:37.903199Z
DimProducts,3,2025-10-20T09:19:47.53992Z
FactOrders,1,2025-10-20T09:10:59.105358Z


## Check Batch Log

In [0]:

%sql
select *
from databricks_cata.gold.batch_log

batch_id,table_name,starting_version,ending_version,row_count,status,started_ts,finished_ts,error_msg
DimCustomers_20251020_090909_try1,DimCustomers,0,1,1990,SUCCESS,2025-10-20T09:09:15.615289Z,2025-10-20T09:09:28.608432Z,
DimCustomers_20251020_091926_try1,DimCustomers,2,3,10,SUCCESS,2025-10-20T09:19:29.801783Z,2025-10-20T09:19:41.874974Z,
DimProducts_20251020_090913_try1,DimProducts,0,1,490,SUCCESS,2025-10-20T09:09:16.627671Z,2025-10-20T09:09:29.799232Z,
DimProducts_20251020_091934_try1,DimProducts,2,3,10,SUCCESS,2025-10-20T09:19:38.46511Z,2025-10-20T09:19:50.817749Z,
FactOrders_20251020_091037_try1,FactOrders,0,1,9990,SUCCESS,2025-10-20T09:10:40.648664Z,2025-10-20T09:11:01.080474Z,
