In [0]:
# ============================================================
# Lectura de la tabla Bronze
# ============================================================
# Usamos spark.read.table para cargar la tabla Delta previamente creada
# en la capa Bronze. Esta tabla contiene los datos crudos consolidados
# de todas las sucursales, junto con columnas de auditoría como
# 'archivo_fuente' y 'fecha_ingesta'.

df_TechMart_SilverStage = spark.read.table("workspace.default.df_techmart_broze_ingestion")

In [0]:
# ============================================================
# Transformaciones sobre la tabla SilverStage
# ============================================================

import pyspark.sql.functions as F

df_TechMart_SilverStage = (
    df_TechMart_SilverStage
    # 1. Convertir la columna 'fecha' a tipo timestamp
    #    Se especifica el formato 'yyyy-MM-dd HH:mm:ss' para asegurar
    #    que Spark interprete correctamente la fecha y hora.
    .withColumn(
        "fecha",
        F.to_timestamp("fecha", "yyyy-MM-dd HH:mm:ss")
    )
    # 2. Eliminar la columna 'fecha'
    #    Aquí se elimina la columna recién creada, lo cual normalmente
    #    no tendría sentido porque acabamos de transformarla.
    #    (Probablemente se buscaba limpiar duplicados o redefinir la columna).
    .drop("fecha")
    # 3. Renombrar la columna 'fecha_ingesta' como 'fecha'
    #    Esto reemplaza la columna de ingesta por el nombre genérico 'fecha',
    #    dejando como referencia la fecha de carga en lugar de la fecha original.
    .withColumnRenamed("fecha_ingesta", "fecha")
)


In [0]:
# ============================================================
# Derivación de atributos de fecha en la capa SilverStage
# ============================================================

df_TechMart_SilverStage = (
    df_TechMart_SilverStage
    # 1. Extraer el año desde la columna 'fecha'
    .withColumn("año", F.year("fecha"))
    # 2. Extraer el mes (valor numérico 1–12)
    .withColumn("mes", F.month("fecha"))
    # 3. Extraer el día del mes (1–31)
    .withColumn("dia", F.dayofmonth("fecha"))
    # 4. Obtener el nombre completo del día de la semana (ej. Monday, Tuesday)
    .withColumn("dia_semana", F.date_format("fecha", "EEEE"))
)



In [0]:
# ============================================================
# Normalización de texto en la capa SilverStage
# ============================================================

df_TechMart_SilverStage = (
    df_TechMart_SilverStage
    # 1. Convertir la columna 'sucursal' a mayúsculas
    #    Esto asegura consistencia en los nombres de sucursal,
    #    evitando problemas de comparación por diferencias de formato.
    .withColumn("sucursal", F.upper(F.col("sucursal")))
    
    # 2. Convertir la columna 'producto' a minúsculas
    #    Se estandariza el nombre de los productos para facilitar
    #    búsquedas, agrupaciones y evitar duplicados por diferencias
    #    de capitalización (ej. "Laptop" vs "laptop").
    .withColumn("producto", F.lower(F.col("producto")))
)


In [0]:
# ============================================================
# Eliminación de duplicados en la capa SilverStage
# ============================================================

# Usamos dropDuplicates indicando la columna 'id_venta' como clave única.
# Esto asegura que, si existen múltiples registros con el mismo id_venta,
# solo se conserve uno y se eliminen los duplicados.
# Es una práctica común en la capa Silver para garantizar
# la integridad y consistencia de los datos antes de pasar a la capa Gold.

df_TechMart_SilverStage = df_TechMart_SilverStage.dropDuplicates(["id_venta"])

In [0]:
# ============================================================
# Filtrado de registros inválidos en la capa SilverStage
# ============================================================

# Usamos filter para conservar únicamente las filas donde la columna 'total' > 0.
# Esto elimina registros con valores nulos, cero o negativos en el campo 'total',
# garantizando que solo se mantengan ventas válidas en el DataFrame SilverStage.

df_TechMart_SilverStage = df_TechMart_SilverStage.filter(F.col("total") > 0)


In [0]:
# ============================================================
# Validación de campos obligatorios en la capa SilverStage
# ============================================================

# Definimos una lista con los nombres de los campos que deben ser obligatorios.
# Estos campos no pueden contener valores nulos, ya que son críticos para el análisis:
# - id_venta   → identificador único de la transacción
# - fecha      → fecha de la venta
# - sucursal   → sucursal donde ocurrió la venta
# - producto   → producto vendido
# - total      → monto total de la transacción

campos_obligatorios = ["id_venta", "fecha", "sucursal", "producto", "total"]

# Iteramos sobre cada campo obligatorio y aplicamos un filtro
# para eliminar las filas donde ese campo sea nulo.
# De esta forma, garantizamos que el DataFrame SilverStage
# solo contenga registros válidos y completos.
for campo in campos_obligatorios:
    df_TechMart_SilverStage = df_TechMart_SilverStage.filter(F.col(campo).isNotNull())

In [0]:
# ============================================================
# Escritura de la tabla Silver particionada
# ============================================================

# Guardamos el DataFrame SilverStage en formato Delta Lake.
# Se utiliza 'append' para permitir cargas incrementales sin sobrescribir datos previos.
# La partición se realiza por la columna 'mes', lo que optimiza las consultas
# que filtran o agrupan por mes, ya que Spark puede leer solo las particiones necesarias.
# Finalmente, se registra la tabla en el catálogo con el nombre 'df_techmart_silver_partitioned'.

df_TechMart_SilverStage.write.format("delta") \
    .mode("append") \
    .partitionBy("mes") \
    .saveAsTable("df_techmart_silver_partitioned")