## Leer Data Delta Bronze

In [48]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# Variables
container_name = "datalake"
root_silver_folder = "silver"
root_processed_folder = "processed"

show_debug = False

# Crear sesión Spark
spark = SparkSession.builder.appName("ReadDeltaData").getOrCreate()

# Rutas del Contenedor en ADLS Gen2
sales_delta_path = f"abfss://{container_name}@adlsstoresproject.dfs.core.windows.net/{root_processed_folder}/sales/"
customers_delta_path = f"abfss://{container_name}@adlsstoresproject.dfs.core.windows.net/{root_processed_folder}/customers/"
products_delta_path = f"abfss://{container_name}@adlsstoresproject.dfs.core.windows.net/{root_processed_folder}/products/"
suppliers_delta_path = f"abfss://{container_name}@adlsstoresproject.dfs.core.windows.net/{root_processed_folder}/suppliers/"

# Leer los archivos Delta
df_products = spark.read.format("delta").load(products_delta_path)
df_suppliers = spark.read.format("delta").load(suppliers_delta_path)

# Estos se deben leer filtrados para únicamente obtener los registros que no se han validado
df_sales = (
    spark.read.format("delta")
    .load(sales_delta_path)
    .where((col("is_validated") == False) | (col("is_validated") == 0))
)

df_customers = (
    spark.read.format("delta")
    .load(customers_delta_path)
    .where((col("is_validated") == False) | (col("is_validated") == 0))
)

# Mostrar los primeros registros
if (show_debug == True):
    print("Sales:")
    df_sales.show(3)

    print("Customers:")
    df_customers.show(3)

    print("Products:")
    df_products.show(3)

    print("Suppliers:")
    df_suppliers.show(3)

StatementMeta(sparkpoolnew, 60, 4, Finished, Available, Finished)

## Limpieza - Transformación

In [49]:
from pyspark.sql.functions import col, when, trim, lower, lit, length, regexp_replace, regexp_extract, to_timestamp
from pyspark.sql.types import IntegerType, DoubleType, StringType, TimestampType

StatementMeta(sparkpoolnew, 60, 5, Finished, Available, Finished)

In [50]:
#===============
def remove_null_or_empty(df, column_name):
    """
    Elimina los registros donde una columna tenga valores nulos o vacíos.
    Retorna el DataFrame limpio y muestra cuántos registros fueron eliminados.
    """
    invalid_count = df.filter(col(column_name).isNull() | (col(column_name) == "")).count()

    if invalid_count > 0:
        print(f"Se eliminaron {invalid_count} registros con {column_name} vacío o nulo.")
        df = df.filter(col(column_name).isNotNull() & (col(column_name) != ""))
    else:
        print(f"No se encontraron registros vacíos o nulos en {column_name}.")

    return df

#===============
def is_numeric_int(df, column_name):
    """
    Verifica si una columna tiene valores no numéricos.
    Retorna True si se encontraron valores enteros inválidos, False si todo está bien.
    """
    invalid_count = df.filter(~col(column_name).rlike("^[0-9]+$")).count()
    return invalid_count > 0

#===============
def is_numeric_double(df, column_name):
    """
    Verifica si una columna tiene valores no numéricos.
    Retorna True si se encontraron valores decimales inválidos, False si todo está bien.
    """
    invalid_count = df.filter(~col(column_name).rlike("^[0-9]+(\\.[0-9]+)?$")).count()
    return invalid_count > 0

#===============
def greater_than_zero(df, column_name):
    """
    Verifica si una columna contiene valores menores o iguales a 0.
    Retorna True si hay valores no positivos, False si todo está bien.
    """
    invalid_count = df.filter(col(column_name).cast("double") <= 0).count()
    return invalid_count > 0

#===============
def clean_name(df, column_name):
    """
    Elimina caracteres especiales del campo firstname,
    pero mantiene letras (incluyendo ñ y tildes), espacios, puntos y apóstrofes.
    Retorna el DataFrame con la columna limpia.
    """
    # Permite letras con tildes, ñ/Ñ, espacios, puntos y apóstrofes
    df = df.withColumn(
        column_name,
        regexp_replace(
            col(column_name),
            r"[^a-zA-ZáéíóúÁÉÍÓÚñÑüÜ' .]",  # mantiene caracteres válidos
            ""
        )
    )
    print(f"Caracteres especiales eliminados en la columna '{column_name}'.")
    return df

#===============
def clean_email(df, column_name, default_value="no-email@domain.com", invalid_emails=None):
    """
    Limpia y valida direcciones de correo electrónico.
    
    - Reemplaza valores nulos, vacíos, inválidos o de una lista de correos no válidos.
    - El valor estándar se pasa como parámetro (default: 'no-email@domain.com').
    - Puedes pasar una lista de correos inválidos comunes en el parámetro `invalid_emails`.

    Retorna el DataFrame con el campo limpio.
    """
    # Si no se pasan correos inválidos personalizados, usar algunos por defecto
    if invalid_emails is None:
        invalid_emails = [
            "notengo@hotmail.com",
            "sincorreo@gmail.com",
            "noaplica@hotmail.com",
            "noaplica@gmail.com",
            "ninguno@gmail.com",
            "ninguno@hotmail.com",
            "no.tengo@gmail.com",
            "sinemail@gmail.com"
        ]

    # Convertimos la lista a minúsculas para hacer comparaciones insensibles a mayúsculas
    invalid_emails = [e.lower() for e in invalid_emails]

    # Expresión regular estándar para validar correos electrónicos
    email_regex = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"

    # Limpieza y validación
    df = df.withColumn(
        column_name,
        when(
            # Condición de correo válido
            col(column_name).isNotNull() &
            (trim(col(column_name)) != "") &
            col(column_name).rlike(email_regex) &
            (~lower(col(column_name)).isin(invalid_emails)),
            col(column_name)
        ).otherwise(lit(default_value))
    )

    print(f"Correos inválidos o genéricos en '{column_name}' reemplazados por '{default_value}'.")
    return df

#===============
def clean_phone_number(df, column_name, default_value="0000000000", country_prefix=None):
    """
    Estandariza números de celular:
      - Elimina caracteres no numéricos (espacios, guiones, paréntesis, +, etc.)
      - Opcionalmente agrega un prefijo de país si se define (por ejemplo, '57' para Colombia)
      - Reemplaza valores vacíos o inválidos con un valor estándar

    Parámetros:
        df (DataFrame): DataFrame de entrada
        column_name (str): Nombre de la columna de celular
        default_value (str): Valor a usar si el número es inválido o vacío
        country_prefix (str): Prefijo de país opcional (por ejemplo, "57")

    Retorna:
        DataFrame con el número estandarizado
    """

    # Quitar todos los caracteres que no sean dígitos
    df = df.withColumn(
        column_name,
        regexp_replace(trim(col(column_name)), r"[^0-9]", "")
    )

    # Agregar prefijo de país si aplica y no está vacío
    if country_prefix:
        df = df.withColumn(
            column_name,
            when(
                (col(column_name).isNotNull()) & (col(column_name) != "") & (~col(column_name).startswith(country_prefix)),
                lit(country_prefix) + col(column_name)
            ).otherwise(col(column_name))
        )

    # Reemplazar vacíos, nulos o demasiado cortos con el valor por defecto
    df = df.withColumn(
        column_name,
        when(
            (col(column_name).isNull()) | 
            (trim(col(column_name)) == "") | 
            (length(col(column_name)) < 8) | 
            (length(col(column_name)) > 12),
            lit(default_value)
        ).otherwise(col(column_name))
    )

    print(f"Números de celular en '{column_name}' estandarizados correctamente.")
    return df

#===============

StatementMeta(sparkpoolnew, 60, 6, Finished, Available, Finished)

### df_sales

In [51]:
# --------------------------
df_sales_alert = False

# --------------------------
# AGREGAR CAMPO DE INGESTA INCREMENTAL
df_sales = df_sales.withColumn(
    "is_validated",
    lit(False)
)

# --------------------------
# QUITAR REGISTROS VACIOS/NULOS
df_sales = remove_null_or_empty(df_sales, "sale_id")
df_sales = remove_null_or_empty(df_sales, "customer_id")
df_sales = remove_null_or_empty(df_sales, "product_id")
df_sales = remove_null_or_empty(df_sales, "store")

# --------------------------
# REEMPLAZAR FECHAS INVALIDAS POR FECHA ESTANDAR
df_sales = df_sales.withColumn(
    "created_at",
    when(
        # Se reemplaza por Datetime estándar cuando no tiene el formato correcto, está nulo o vacío
        col("created_at").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$") &
        col("created_at").isNotNull() &
        (trim(col("created_at")) != ""),
        col("created_at")
    ).otherwise("1900-01-01 00:00:00")
)

# --------------------------
# VALIDAR CAMPOS
# Que solo tenga valores numéricos enteros
if (is_numeric_int(df_sales, "sale_id") | is_numeric_int(df_sales, "product_id") | is_numeric_int(df_sales, "customer_id") | is_numeric_int(df_sales, "quantity")):
    df_sales_alert = True
    print("Que solo tenga valores numéricos enteros")

# Que solo tenga valores numéricos decimales
if (is_numeric_double(df_sales, "unit_price") | is_numeric_double(df_sales, "total_amount")):    
    df_sales_alert = True
    print("Que solo tenga valores numéricos decimales")

# ==================
if (df_sales_alert):
    raise Exception("🚫 Ejecución detenida: Se encontraron valores inválidos en la columnas.")
else:
    print("Ejecución Ok")

StatementMeta(sparkpoolnew, 60, 7, Finished, Available, Finished)

No se encontraron registros vacíos o nulos en sale_id.
No se encontraron registros vacíos o nulos en customer_id.
No se encontraron registros vacíos o nulos en product_id.
No se encontraron registros vacíos o nulos en store.
Ejecución Ok


In [52]:
# Definir Schema sales
df_sales_clean = df_sales.select(
    col("sale_id").cast(IntegerType()).alias("sale_id"),
    col("product_id").cast(IntegerType()).alias("product_id"), 
    col("customer_id").cast(IntegerType()).alias("customer_id"),
    col("quantity").cast(StringType()).alias("quantity"),
    regexp_replace(col("unit_price"), ",", ".").cast(DoubleType()).alias("unit_price"),
    regexp_replace(col("total_amount"), ",", ".").cast(DoubleType()).alias("total_amount"),
    col("sale_date").cast(StringType()).alias("sale_date"),
    col("sale_time").cast(StringType()).alias("sale_time"),
    col("store").cast(StringType()).alias("store"),
    to_timestamp(col("created_at"), "yyyy-MM-dd HH:mm:ss").alias("created_at"),
    col("year").cast(StringType()).alias("year"),
    col("month").cast(StringType()).alias("month"),
    col("day").cast(StringType()).alias("day"),
    col("unique_id").cast(StringType()).alias("unique_id"),
    lit(False).alias("is_validated")
)

# Que tenga valores positivos
if (greater_than_zero(df_sales_clean, "sale_id")):
    df_sales_alert = True
    print("Que sale_id tenga valores positivos")

if (greater_than_zero(df_sales_clean, "product_id")):
    df_sales_alert = True
    print("Que product_id tenga valores positivos")

if (greater_than_zero(df_sales_clean, "customer_id")):
    df_sales_alert = True
    print("Que customer_id tenga valores positivos")

# Mostrar esquema y datos
if (show_debug  == True):
    df_sales_clean.printSchema()
    df_sales_clean.show(5)

StatementMeta(sparkpoolnew, 60, 8, Finished, Available, Finished)

### df_customers

In [53]:
# --------------------------
df_customers_alert = False

# --------------------------
# AGREGAR CAMPO DE INGESTA INCREMENTAL
df_customers = df_customers.withColumn(
    "is_validated",
    lit(False)
)

# --------------------------
# QUITAR REGISTROS VACIOS/NULOS
df_customers = remove_null_or_empty(df_customers, "customer_id")
df_customers = remove_null_or_empty(df_customers, "store")

# --------------------------
# REEMPLAZAR FECHAS INVaLIDAS POR FECHA ESTANDAR
df_customers = df_customers.withColumn(
    "created_at",
    when(
        # Se reemplaza por Datetime estándar cuando no tiene el formato correcto, está nulo o vacío
        col("created_at").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$") &
        col("created_at").isNotNull() &
        (trim(col("created_at")) != ""),
        col("created_at")
    ).otherwise("1900-01-01 00:00:00")
)

# --------------------------
# VALIDAR CAMPOS
# Que solo tenga valores numéricos enteros
if (is_numeric_int(df_sales, "customer_id")):
    df_customers_alert = True
    print("Que solo tenga valores numéricos enteros")

# --------------------------
# LIMPIAR NOMBRES Y APELLIDOS
df_customers = clean_name(df_customers, "firstname")
df_customers = clean_name(df_customers, "lastname")

# --------------------------
# LIMPIAR CORREOS
df_customers = clean_email(df_customers, "email")

# --------------------------
# LIMPIAR CELULARES
df_customers = clean_phone_number(df_customers, "phone")

# ==================
if (df_customers_alert):
    raise Exception("🚫 Ejecución detenida: Se encontraron valores inválidos en la columnas.")
else:
    print("Ejecución Ok")

StatementMeta(sparkpoolnew, 60, 9, Finished, Available, Finished)

No se encontraron registros vacíos o nulos en customer_id.
No se encontraron registros vacíos o nulos en store.
Caracteres especiales eliminados en la columna 'firstname'.
Caracteres especiales eliminados en la columna 'lastname'.
Correos inválidos o genéricos en 'email' reemplazados por 'no-email@domain.com'.
Números de celular en 'phone' estandarizados correctamente.
Ejecución Ok


In [54]:
# Definir Schema customers
df_customers_clean = df_customers.select(
    col("customer_id").cast(IntegerType()).alias("customer_id"),
    col("firstname").cast(StringType()).alias("firstname"),
    col("lastname").cast(StringType()).alias("lastname"),
    col("email").cast(StringType()).alias("email"),
    col("phone").cast(StringType()).alias("phone"),
    col("store").cast(StringType()).alias("store"),
    to_timestamp(col("created_at"), "yyyy-MM-dd HH:mm:ss").alias("created_at"),
    col("year").cast(StringType()).alias("year"),
    col("month").cast(StringType()).alias("month"),
    col("day").cast(StringType()).alias("day"),
    col("unique_id").cast(StringType()).alias("unique_id"),
    lit(False).alias("is_validated")
)

# Que tenga valores positivos
if (greater_than_zero(df_sales_clean, "customer_id")):
    df_customers_alert = True
    print("Que customer_id tenga valores positivos")

# Mostrar esquema y datos
if (show_debug  == True):
    df_customers_clean.printSchema()
    df_customers_clean.show(5)

StatementMeta(sparkpoolnew, 60, 10, Finished, Available, Finished)

## Guardar en capa **Silver**

In [55]:
from delta.tables import DeltaTable

silver_sales_path = "abfss://datalake@adlsstoresproject.dfs.core.windows.net/silver/sales"
silver_customers_path = "abfss://datalake@adlsstoresproject.dfs.core.windows.net/silver/customers"

# ===== MERGE SALES SILVER =====
if DeltaTable.isDeltaTable(spark, silver_sales_path):
    delta_sales = DeltaTable.forPath(spark, silver_sales_path)
    
    delta_sales.alias("target").merge(
        df_sales_clean.alias("source"),
        "target.unique_id = source.unique_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    # Si no existe, lo crea la primera vez
    df_sales_clean.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("year", "month", "day") \
        .save(silver_sales_path)

# ===== MERGE CUSTOMERS SILVER =====
if DeltaTable.isDeltaTable(spark, silver_customers_path):
    delta_customers = DeltaTable.forPath(spark, silver_customers_path)
    
    delta_customers.alias("target").merge(
        df_customers_clean.alias("source"),
        "target.unique_id = source.unique_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    df_customers_clean.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("year", "month", "day") \
        .save(silver_customers_path)

StatementMeta(sparkpoolnew, 60, 11, Finished, Available, Finished)

## Actualizar estado de registros validados

In [56]:
# ===== IS_VALIDATED SALES -> TRUE =====
delta_bronze_sales = DeltaTable.forPath(spark, sales_delta_path)
delta_bronze_sales.alias("bronze").merge(
    df_sales.select("unique_id").alias("source"),
    "bronze.unique_id = source.unique_id"
).whenMatchedUpdate(
    set={"is_validated": lit(1)}
).execute()
print("✅ is_validated marcado a 1 en sales.Bronze para registros procesados.")

# ===== IS_VALIDATED CUSTOMERS -> TRUE =====
# Actualizar is_validated = 1 en Bronze para los unique_id procesados
delta_bronze_customers = DeltaTable.forPath(spark, customers_delta_path)
delta_bronze_customers.alias("bronze").merge(
    df_sales.select("unique_id").alias("source"),
    "bronze.unique_id = source.unique_id"
).whenMatchedUpdate(
    set={"is_validated": lit(1)}
).execute()
print("✅ is_validated marcado a 1 en customers.Bronze para registros procesados.")

StatementMeta(sparkpoolnew, 60, 12, Submitted, Running, Running)

## Crear **silver_db**

In [None]:
# Eliminar tablas:
#spark.sql("DROP TABLE IF EXISTS silver_db.customers")
#spark.sql("DROP TABLE IF EXISTS silver_db.sales")

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS silver_db
    LOCATION 'abfss://datalake@adlsstoresproject.dfs.core.windows.net/silver'
""")

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
# Registrar tabla de ventas
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver_db.sales
    USING DELTA
    LOCATION '{silver_sales_path}'
""")

# Registrar tabla de clientes
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver_db.customers
    USING DELTA
    LOCATION '{silver_customers_path}'
""")

StatementMeta(, , -1, Waiting, , Waiting)