In [0]:
from pyspark.sql.functions import when, coalesce, to_date, substring, initcap, trim, col, lit, lower, current_timestamp, broadcast, expr
from pyspark.sql.types import LongType, StringType, DateType, TimestampType, DecimalType, IntegerType
from delta.tables import DeltaTable

In [0]:
catalog_name = "webinar"

In [0]:
df = (
    spark.table(f"{catalog_name}.silver.tiendas")
    .withColumn("updated_at", current_timestamp())
    .select(
        "id_tienda",
        "nombre",
        "ciudad",
        "region",
        "updated_at"
    )
)

df_ventas = (
    spark.table(f"{catalog_name}.silver.ventas")
)

dim_cliente = spark.table(f"{catalog_name}.gold.clientes").select("sk_cliente", "id_cliente")
dim_tienda = spark.table(f"{catalog_name}.gold.tiendas").select("sk_tienda", "id_tienda")
dim_producto = spark.table(f"{catalog_name}.gold.productos").select("sk_producto", "id_producto")
dim_tiempo = spark.table(f"{catalog_name}.gold.tiempo").select("sk_tiempo", "fecha")


df_enriched = (
    df_ventas
    .join(broadcast(dim_cliente), "id_cliente", "left")
    .join(broadcast(dim_tienda), "id_tienda", "left")
    .join(broadcast(dim_producto), "id_producto", "left")
    .join(broadcast(dim_tiempo), to_date(df_ventas.fecha_venta) == dim_tiempo.fecha, "left")
)


df_enriched = df_enriched.select(
    col("id_venta").cast(LongType()),
    col("sk_cliente").cast(StringType()),
    col("sk_tienda").cast(StringType()),
    col("sk_producto").cast(StringType()),
    col("sk_tiempo").cast(StringType()),
    col("cantidad").cast(IntegerType()),
    col("monto").cast(DecimalType(18, 2)),
    col("fecha_venta").cast(TimestampType()),
    current_timestamp().alias("seq_ts"),  
    current_timestamp().alias("updated_at"),  
    lit("ETL").alias("source_system"),    
    when(col("monto").isNull(), "DELETE") 
        .otherwise("UPSERT").alias("operation")
)

In [0]:
delta_table = DeltaTable.forName(spark, f"{catalog_name}.gold.ventas")

delta_table.alias("t").merge(
    df_enriched.alias("s"),
    "t.id_venta = s.id_venta",
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
%sql
describe history webinar.gold.ventas