In [0]:
table = dbutils.widgets.text("table", "")
table = dbutils.widgets.get("table")

In [0]:
from pyspark.sql.functions import col, to_date, to_timestamp, row_number, current_timestamp, current_date, sha2, concat_ws, lit, expr
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable
import uuid

In [0]:
# Creamos catalogo y schema
catalog_name = "sesion_08"
schema_bronze = "bronze"

In [0]:
ventas=spark.table(f"{catalog_name}.{schema_bronze}.{table}")

In [0]:
silver_ventas = (
    ventas
    .withColumn("id_linea", col("id_linea").cast("int"))
    .withColumn("id_tienda", col("id_tienda").cast("int"))
    .withColumn("id_cliente", col("id_cliente").cast("int"))
    .withColumn("id_producto", col("id_producto").cast("int"))
    .withColumn("fecha_venta", to_date(col("fecha_venta"), "yyyy-MM-dd"))
    .withColumn("cantidad", col("cantidad").cast("int"))
    .withColumn("precio_unitario", col("precio_unitario").cast("decimal(18,2)"))
    .withColumn("updated_at", to_timestamp(col("updated_at")))
    .drop("ingest_at")
)

In [0]:
merge = (
    target.alias("m")
    .merge(
        silver_ventas_dedup.alias("in"),
        (col("m.id_venta") == col("in.id_venta")) &
        (col("m.id_linea") == col("in.id_linea"))
    )
    .whenMatchedUpdate(
        condition="in.updated_at > m.updated_at",
        set={
            "id_tienda": col("in.id_tienda"),
            "id_cliente": col("in.id_cliente"),
            "id_producto": col("in.id_producto"),
            "fecha_venta": col("in.fecha_venta"),
            "cantidad": col("in.cantidad"),
            "precio_unitario": col("in.precio_unitario"),
            "updated_at": col("in.updated_at")
        }
    )
    .whenNotMatchedInsert(
        values={
            "id_venta": col("in.id_venta"),
            "id_linea": col("in.id_linea"),
            "id_tienda": col("in.id_tienda"),
            "id_cliente": col("in.id_cliente"),
            "id_producto": col("in.id_producto"),
            "fecha_venta": col("in.fecha_venta"),
            "cantidad": col("in.cantidad"),
            "precio_unitario": col("in.precio_unitario"),
            "updated_at": col("in.updated_at")
        }
    )
    .execute()
)