In [0]:
import pyspark.sql.functions as F
from delta.tables import DeltaTable
from pyspark.sql.window import Window

In [0]:
table_cad001p002_bronze  = "data_intelligence.bronze.cad001p002"
table_cad001p002_silver  = "data_intelligence.silver.cad001p002"
table_cad001p002_control = "data_intelligence.silver.cad001p002_control"

In [0]:
def get_last_cad_date_control():
    return spark.sql(f"SELECT MAX(cad_date) AS last_cad_date_control FROM {table_cad001p002_control}").first()['last_cad_date_control']

In [0]:
# Get the latest value of 'created_at' from the control table
last_cad_date_control = get_last_cad_date_control()

type_of_load = "incremental" if last_cad_date_control is not None else "full"

try:
    # If no previous load, perform a full load from bronze table
    if last_cad_date_control is None:
        print("FULL")
        # Read all records from bronze
        df_bronze = spark.table(table_cad001p002_bronze)
    else:
        # Otherwise, perform an incremental load from bronze table
        print("Incremental")
        # Read only new records from bronze
        df_bronze = spark.table(table_cad001p002_bronze).filter(F.col("cad_date") > F.lit(last_cad_date_control))

    # Normalize and hash columns for SCD2 comparison
    _aux_accents_from = "áàâãäçéèêëíìîïóòôõöúùûüÁÀÂÃÄÇÉÈÊËÍÌÎÏÓÒÔÕÖÚÙÛÜ"
    _aux_accents_to   = "aaaaaceeeeiiiiooooouuuuAAAAACEEEEIIIIOOOOOUUUU"

    df_bronze = (
        df_bronze
            .withColumns({
                "is_active"    : F.lit(True), # Static value for 'is_active'
                "hash_columns" : F.sha2(
                    F.concat_ws("||",
                        F.lower(F.translate(F.regexp_replace(F.col("nome"), "[^a-zA-Z0-9]", ""),     _aux_accents_from, _aux_accents_to)),
                        F.lower(F.translate(F.regexp_replace(F.col("endereco"), "[^a-zA-Z0-9]", ""), _aux_accents_from, _aux_accents_to)),
                        F.col("salario")
                    ), 256),
                "_created_at" : F.current_timestamp(),
            })
            .dropDuplicates(["id", "hash_columns"])
    )

    # data silver path
    delta_silver = DeltaTable.forName(spark, table_cad001p002_silver).alias("silver")
    
    # Insert new records into silver where id does not exist or hash_columns are the same (i.e., new or unchanged data)
    delta_silver_insert = delta_silver.merge(
        df_bronze.alias("bronze"),
        "silver.id = bronze.id AND silver.hash_columns = bronze.hash_columns"
    ).whenNotMatchedInsert(values={f"{c}": f"bronze.{c}" for c in df_bronze.columns}).execute()

    # Get only the newly inserted records from the silver table
    df_full_or_incr = df_bronze

    # if incremental, get only the new records affected by silver
    if last_cad_date_control is not None:
        df_full_or_incr = spark.table(table_cad001p002_silver).filter(F.col("cad_date") > F.lit(last_cad_date_control))

    # Create a window specification to partition by 'id' and order by 'id' and 'cad_date' in descending order
    # Get the last record for each 'id'
    window_spec = Window.partitionBy("id").orderBy(F.col("id"), F.col("cad_date").desc())

    # Update existing records in silver table to applay scd 2: 
    # For each id, if the hash_columns have changed and the new record is more recent (cad_date), 
    # and the current record has not been updated yet (updated_at IS NULL), 
    # set is_active to False and updated_at to the new cad_date
    delta_silver_update = delta_silver.merge(
        df_full_or_incr.withColumn("row_number", F.row_number().over(window_spec)).filter(F.col("row_number") == 1).select("id", "cad_date", "hash_columns").alias("source"),
        "silver.id = source.id AND silver.hash_columns != source.hash_columns AND silver.cad_date < source.cad_date AND silver.cad_updated IS NULL"
    ).whenMatchedUpdate(set={
        "is_active" : F.lit(False),
        "cad_updated": "source.cad_date"
    }).execute()

except Exception as e:
    # Handle exceptions and set DataFrame to None
    print(str(e)[:300])
    df_full_or_incr = None

if df_full_or_incr is not None:
    # If there are new records, aggregate by 'cad_date' and count rows, then append to control table
    (df_full_or_incr
        .groupBy(F.col("cad_date"))
        .agg(F.count("*").cast("int").alias("rows_count"))
        .write.option("mergeSchema", "true")
        .mode("append")
        .saveAsTable(table_cad001p002_control))
    
    display(
        delta_silver_insert
            .withColumn("operation", F.lit("insert"))
            .unionByName(delta_silver_update.withColumn("operation", F.lit("update")))
            .select(*["operation", "num_affected_rows", "num_updated_rows", "num_inserted_rows"])
    )