## Variables de entorno

In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("CATALOGO", "principal")
dbutils.widgets.text("ESQUEMA_SOURCE", "bronze")
dbutils.widgets.text("ESQUEMA_DEST", "silver")

In [0]:
catalogo = dbutils.widgets.get("CATALOGO")
esquema_source = dbutils.widgets.get("ESQUEMA_SOURCE")
esquema_dest = dbutils.widgets.get("ESQUEMA_DEST")

## Limpieza de tablas


### Limpieza de products

In [0]:
df_products_clean = (
    spark.table(f"{catalogo}.{esquema_source}.products")
    
    # eliminar duplicados
    .dropDuplicates(["ProductID"])

    # trimming
    .withColumn("ProductName", F.trim("ProductName"))
    .withColumn("QuantityPerUnit", F.trim("QuantityPerUnit"))

    # valores nulos en precios → 0
    .withColumn("UnitPrice", F.coalesce("UnitPrice", F.lit(0.0)))

    # UnitPrice no puede ser negativo
    .withColumn("UnitPrice", F.when(F.col("UnitPrice") < 0, 0).otherwise(F.col("UnitPrice")))
)


### Limpieza de orders

In [0]:
df_orders_clean = (
    spark.table(f"{catalogo}.{esquema_source}.orders")

    # quitar duplicados
    .dropDuplicates(["OrderID"])

    # trimming
    .withColumn("CustomerID", F.trim("CustomerID"))
    .withColumn("ShipCountry", F.trim("ShipCountry"))

    # fechas inválidas a null
    .withColumn("OrderDate", F.to_date("OrderDate"))
)


### Limpieza de order details

In [0]:
df_order_details_clean = (
    spark.table(f"{catalogo}.{esquema_source}.order_details")

    # duplicados
    .dropDuplicates(["OrderID", "ProductID"])

    # UnitPrice no puede ser negativo
    .withColumn("UnitPrice", F.when(F.col("UnitPrice") < 0, 0).otherwise(F.col("UnitPrice")))

    # Quantity mínima 1
    .withColumn("Quantity", F.when(F.col("Quantity") < 1, 1).otherwise(F.col("Quantity")))

    # descuento entre 0 y 1
    .withColumn("Discount", 
                F.when((F.col("Discount") < 0) | (F.col("Discount") > 1), 0)
                 .otherwise(F.col("Discount")))
)


In [0]:
df_products_clean.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_dest}.products")
df_orders_clean.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_dest}.orders")
df_order_details_clean.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_dest}.order_details")