In [0]:
# ======================================
# SILVER TRANSFORMATION - ORDERS
# ======================================
# Objetivo:
# Tomar datos crudos desde la capa Bronze (bronze_orders)
# y generar una tabla Silver con:
# - Tipos de datos correctos
# - Métricas numéricas válidas
# - Duplicados eliminados con criterio
# - Datos listos para consumo analítico / dbt
# ======================================

from pyspark.sql.functions import (
    col,
    to_date,
    row_number
)

from pyspark.sql.window import Window


In [0]:
# -----------------------------
# Definición de catálogo y esquema
# -----------------------------

CATALOG = "workspace"
SCHEMA = "retail_medallion_pipeline_schema"

BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_orders"
SILVER_TABLE = f"{CATALOG}.{SCHEMA}.silver_orders"

In [0]:
# -----------------------------
# Lectura de datos Bronze
# -----------------------------

df_bronze = spark.table(BRONZE_TABLE)

# Inspección opcional
display(df_bronze)


In [0]:
# -----------------------------
# Transformaciones Silver
# -----------------------------

df_clean = (
    df_bronze
    # Conversión de tipos
    .withColumn("order_id", col("order_id").cast("int"))
    .withColumn("customer_id", col("customer_id").cast("int"))
    .withColumn("product_id", col("product_id").cast("int"))
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("unit_price", col("unit_price").cast("double"))
    .withColumn("total_amount", col("total_amount").cast("double"))

    # Conversión de fecha
    .withColumn("order_date", to_date(col("order_date")))
)


In [0]:
# -----------------------------
# Eliminación de duplicados
# -----------------------------
# Regla:
# Si existen múltiples registros para un mismo order_id,
# conservamos el más reciente por order_date

window_spec = (
    Window
    .partitionBy("order_id")
    .orderBy(col("order_date").desc())
)

df_silver = (
    df_clean
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)


In [0]:
# -----------------------------
# Validaciones de calidad
# -----------------------------

print("Total registros Silver:", df_silver.count())

print(
    "Orders con order_id nulo:",
    df_silver.filter(col("order_id").isNull()).count()
)

print(
    "Orders con total_amount negativo:",
    df_silver.filter(col("total_amount") < 0).count()
)


In [0]:
# -----------------------------
# Escritura tabla Silver
# -----------------------------

(
    df_silver.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(SILVER_TABLE)
)


In [0]:
# -----------------------------
# Verificación final
# -----------------------------

spark.table(SILVER_TABLE).display()
