In [0]:
from pyspark.sql.functions import col, row_number, max as spark_max
from delta.tables import DeltaTable
from pyspark.sql.window import Window

def read_latest_bronze_table(
    table_name: str,
    business_keys: list,
    order_column: str
):
    """
    Reads an append-only Bronze Delta table and returns
    only the latest record per business key.

    Parameters:
    - table_name (str): Bronze Delta table name
    - business_keys (list[str]): Business key columns (single or composite)
    - order_column (str): Column used to define recency (e.g. _ingestion_timestamp)

    Returns:
    - DataFrame with only the latest records per business key
    """

    df = spark.read.table(table_name)

    window_spec = (
        Window
        .partitionBy(*business_keys)
        .orderBy(col(order_column).desc())
    )

    df_latest = (
        df
        .withColumn("_row_number", row_number().over(window_spec))
        .filter(col("_row_number") == 1)
        .drop("_row_number")
    )

    return df_latest


def deduplicate_by_pk(df, pk_columns, order_column):
    """
    Deduplicates records keeping the latest one
    """
    window_df = (
        df
        .withColumn("_max_ts", spark_max(order_column).over(
            Window.partitionBy(*pk_columns)
        ))
        .filter(col(order_column) == col("_max_ts"))
        .drop("_max_ts")
    )
    return window_df


def write_silver_table(
    df,
    target_table,
    merge_condition,
    optimize=True
):
    """
    Performs an UPSERT (MERGE) into a Delta table.

    Parameters:
    - df: Source DataFrame
    - target_table: Fully qualified Delta table name
    - merge_condition: SQL merge condition (e.g. "target.id = source.id")
    - optimize: Whether to run OPTIMIZE after merge
    """


    # Load target Delta table
    delta_table = DeltaTable.forName(spark, target_table)

    (
        delta_table.alias("target")
        .merge(
            source=df.alias("source"),
            condition=merge_condition
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    if optimize:
        spark.sql(f"OPTIMIZE {target_table}")

    print(f"Sucess: {target_table}")

