### Taller Evaluado 03
Contexto y objetivo
La empresa LogisPerú se dedica al transporte y entrega de paquetes en todo el país. Actualmente, maneja información de clientes, sucursales, rutas y envíos. El área de Data & Analytics requiere consolidar esta información en Databricks, aplicando transformaciones avanzadas, modelado analítico y procesamiento incremental.

Objetivo: El estudiante deberá diseñar e implementar un flujo de procesamiento de datos en Databricks que:

• Aplique transformaciones con funciones de ventana (ranking, acumulados, variaciones).

• Modele un esquema analítico en formato estrella en la capa Gold.

• Implemente un proceso incremental con MERGE INTO sobre los envíos, manejando duplicados y actualizaciones.

• Registre la trazabilidad en una tabla de auditoría.

**Capas:**
- **Bronze**: ingesta cruda de los 5 CSV .

- **Silver**: llimpieza, transformaciones y deduplicación.

- **Gold**: modelo analítico + auditoría.

In [0]:
## ELIMINAR CATALOGO

from pyspark.sql import SparkSession

# Crear/usar sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Nombre del catálogo a eliminar
catalog_name = "tallerevaluado03"

# Ejecutar el comando SQL para eliminar el catálogo
spark.sql(f"DROP CATALOG IF EXISTS {catalog_name} CASCADE") 

In [0]:
from pyspark.sql.functions import col, trim, initcap, when, lit, year, month, dayofmonth, concat, concat_ws, current_timestamp, to_date, to_timestamp, row_number, current_date, sha2, expr, count, round, avg, rank, trunc, lag, sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable
import uuid, datetime

## Bronze

#### 1. Creamos el catálogo y los esquemas

In [0]:
catalog_name = "tallerevaluado03"
schema_bronze = "bronze"
schema_silver = "silver"
schema_gold = "gold"

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_bronze}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_silver}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_gold}")

In [0]:
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.default.input")

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.clientes_raw(
  id_cliente string,
  nombre string,
  apellido string,
  email  string,
  segmento string,
  fecha_registro  string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.envios_base_raw(
  id_envio string,
  id_linea string,
  id_cliente string,
  id_sucursal string,
  id_ruta string,
  fecha_envio  string,
  estado string,
  peso_kg string,
  costo_envio string,
  updated_at string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.envios_incremento_raw(
  id_envio string,
  id_linea string,
  id_cliente string,
  id_sucursal string,
  id_ruta string,
  fecha_envio  string,
  estado string,
  peso_kg string,
  costo_envio string,
  updated_at string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.rutas_raw(
  id_ruta string,
  origen string,
  destino string,
  distancia_km  string,
  tiempo_estimado_horas string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.sucursales_raw(
  id_sucursal string,
  ciudad string,
  distrito  string,
  region string,
  tipo string,
  fecha_apertura string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.clientes(
  id_cliente integer,
  nombre string,
  apellido string,
  email  string,
  segmento string,
  fecha_registro  date
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.envios_base(
  id_envio string,
  id_linea integer,
  id_cliente integer,
  id_sucursal integer,
  id_ruta integer,
  fecha_envio  date,
  estado string,
  peso_kg double,
  costo_envio double,
  updated_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.envios_incremento(
  id_envio string,
  id_linea integer,
  id_cliente integer,
  id_sucursal integer,
  id_ruta integer,
  fecha_envio  date,
  estado string,
  peso_kg double,
  costo_envio double,
  updated_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.envios_incremento_dedup(
  id_envio string,
  id_linea integer,
  id_cliente integer,
  id_sucursal integer,
  id_ruta integer,
  fecha_envio  date,
  estado string,
  peso_kg double,
  costo_envio double,
  updated_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.rutas(
  id_ruta integer,
  origen string,
  destino string,
  distancia_km  integer,
  tiempo_estimado_horas double
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_silver}.sucursales(
  id_sucursal integer,
  ciudad string,
  distrito  string,
  region string,
  tipo string,
  fecha_apertura date
)
"""
)

#### 2. Ingestamos la data a Bronze

In [0]:
path_base = "/Volumes/tallerevaluado03/default/input/"

path_clientes = f"{path_base}/clientes.csv"
path_envios_base = f"{path_base}/envios_base.csv"
path_envios_incremento = f"{path_base}/envios_incremento.csv"
path_rutas = f"{path_base}/rutas.csv"
path_sucursales = f"{path_base}/sucursales.csv"

In [0]:
clientes_raw = spark.read.option("header", True).csv(path_clientes).withColumn("ingest_at", current_timestamp())
envios_base_raw = spark.read.option("header", True).csv(path_envios_base).withColumn("ingest_at", current_timestamp())
envios_incremento_raw = spark.read.option("header", True).csv(path_envios_incremento).withColumn("ingest_at", current_timestamp())
rutas_raw = spark.read.option("header", True).csv(path_rutas).withColumn("ingest_at", current_timestamp())
sucursales_raw = spark.read.option("header", True).csv(path_sucursales).withColumn("ingest_at", current_timestamp())

In [0]:
clientes_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.clientes_raw")
envios_base_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_base_raw")
envios_incremento_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_incremento_raw")
rutas_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.rutas_raw")
sucursales_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.sucursales_raw")

In [0]:
display(spark.table(f"{catalog_name}.{schema_bronze}.clientes_raw"))
display(spark.table(f"{catalog_name}.{schema_bronze}.envios_base_raw")) 
display(spark.table(f"{catalog_name}.{schema_bronze}.envios_incremento_raw")) 
display(spark.table(f"{catalog_name}.{schema_bronze}.rutas_raw")) 
display(spark.table(f"{catalog_name}.{schema_bronze}.sucursales_raw"))

In [0]:
%sql
select * from tallerevaluado03.bronze.clientes_raw

In [0]:
%sql select * from tallerevaluado03.bronze.envios_base_raw 

In [0]:
%sql select * from tallerevaluado03.bronze.envios_incremento_raw 

In [0]:
%sql select * from tallerevaluado03.bronze.rutas_raw 

In [0]:
%sql select * from tallerevaluado03.bronze.sucursales_raw 

## Silver

#### 3. Limpieza, transformaciones y deduplicación.

In [0]:
silver_clientes = (
  clientes_raw
  clientes_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.clientes_raw")
  .withColumn("id_cliente", col("id_cliente").cast("int"))
  .withColumn("nombre", initcap(trim(col("nombre"))).cast("string"))
  .withColumn("apellido", initcap(trim(col("apellido"))).cast("string"))
  .withColumn("email", trim(col("email")).cast("string"))
  .withColumn("segmento", trim(col("segmento")).cast("string"))
  .withColumn("fecha_registro", to_date(col("fecha_registro")))
  .dropna(subset=["id_cliente"])
  .dropDuplicates(["id_cliente"])
  .drop("ingest_at")
)

In [0]:
silver_envios_base = (
  envios_base_raw
  .withColumn("id_envio", col("id_envio").cast("string"))
  .withColumn("id_linea", col("id_linea").cast("int"))
  .withColumn("id_cliente", col("id_cliente").cast("int"))
  .withColumn("id_sucursal", col("id_sucursal").cast("int"))
  .withColumn("id_ruta", col("id_ruta").cast("int"))
  .withColumn("fecha_envio", to_date(col("fecha_envio")))
  .withColumn("estado", trim(col("estado")).cast("string"))
  .withColumn("peso_kg", col("peso_kg").cast("double"))
  .withColumn("costo_envio", col("costo_envio").cast("double"))
  .withColumn("updated_at", to_timestamp(col("updated_at")))
  .dropna(subset=["id_envio"])
  .dropDuplicates(["id_envio"])
  .drop("ingest_at")
)

In [0]:
silver_envios_incremento = (
  envios_incremento_raw
  .withColumn("id_envio", col("id_envio").cast("string"))
  .withColumn("id_linea", col("id_linea").cast("int"))
  .withColumn("id_cliente", col("id_cliente").cast("int"))
  .withColumn("id_sucursal", col("id_sucursal").cast("int"))
  .withColumn("id_ruta", col("id_ruta").cast("int"))
  .withColumn("fecha_envio", to_date(col("fecha_envio")))
  .withColumn("estado", trim(col("estado")).cast("string"))
  .withColumn("peso_kg", col("peso_kg").cast("double"))
  .withColumn("costo_envio", col("costo_envio").cast("double"))
  .withColumn("updated_at", to_timestamp(col("updated_at")))
  .dropna(subset=["id_envio"])
  .dropDuplicates(["id_envio"])
  .drop("ingest_at")
)

In [0]:
silver_envios_incremento_dedup = (
  silver_envios_incremento
  .withColumn("id_envio", col("id_envio").cast("string"))
  .withColumn("id_linea", col("id_linea").cast("int"))
  .withColumn("id_cliente", col("id_cliente").cast("int"))
  .withColumn("id_sucursal", col("id_sucursal").cast("int"))
  .withColumn("id_ruta", col("id_ruta").cast("int"))
  .withColumn("fecha_envio", to_date(col("fecha_envio")))
  .withColumn("anio", year(col("fecha_envio")))
  .withColumn("mes", month(col("fecha_envio")))
  .withColumn("estado", trim(col("estado")).cast("string"))
  .withColumn("peso_kg", col("peso_kg").cast("double"))
  .withColumn("costo_envio", col("costo_envio").cast("double"))
  .withColumn("updated_at", to_timestamp(col("updated_at")))
)

In [0]:
silver_rutas = (
  rutas_raw
  .withColumn("id_ruta", col("id_ruta").cast("int"))
  .withColumn("origen", initcap(trim(col("origen"))).cast("string"))
  .withColumn("destino", initcap(trim(col("destino"))).cast("string"))
  .withColumn("distancia_km", col("distancia_km").cast("int"))
  .withColumn("tiempo_estimado_horas", col("tiempo_estimado_horas").cast("double"))
  .dropna(subset=["id_ruta"])
  .dropDuplicates(["id_ruta"])
  .drop("ingest_at")
)

In [0]:
silver_sucursales = (
  sucursales_raw
  .withColumn("id_sucursal", col("id_sucursal").cast("int"))
  .withColumn("ciudad", initcap(trim(col("ciudad"))).cast("string"))
  .withColumn("distrito", initcap(trim(col("distrito"))).cast("string"))
  .withColumn("region", initcap(trim(col("region"))).cast("string"))
  .withColumn("tipo", initcap(trim(col("tipo"))).cast("string"))
  .withColumn("fecha_apertura", to_date(col("fecha_apertura")))
  .dropna(subset=["id_sucursal"])
  .dropDuplicates(["id_sucursal"])
  .drop("ingest_at")
)

In [0]:
display(silver_clientes)
display(silver_envios_base)
display(silver_envios_incremento)
display(silver_envios_incremento_dedup)
display(silver_rutas)
display(silver_sucursales)

#### Aplicando Dedup, Merge y Ranking

In [0]:
windows_dedup = Window.partitionBy("id_envio", "id_linea").orderBy(col("updated_at").desc())

silver_envios_incremento_dedup = (
    silver_envios_incremento
    .withColumn("dedup", row_number().over(windows_dedup))
    .filter(col("dedup") == 1)
    .drop("dedup")
)

In [0]:
target = DeltaTable.forName(spark, f"{catalog_name}.{schema_silver}.envios_base")

merge = (
    target.alias("m")
    .merge(
        silver_envios_incremento_dedup.alias("in"),
        (col("m.id_envio") == col("in.id_envio")) &
        (col("m.id_linea") == col("in.id_linea"))
    )
    .whenMatchedUpdate(
        condition= "in.updated_at > m.updated_at",
        set = {
            "id_cliente": col("in.id_cliente"),
            "id_sucursal": col("in.id_sucursal"),
            "id_ruta": col("in.id_ruta"),
            "fecha_envio": col("in.fecha_envio"),
            "estado": col("in.estado"),
            "peso_kg": col("in.peso_kg"),
            "costo_envio": col("in.costo_envio"),
            "updated_at": col("in.updated_at")
        }
    )
    .whenNotMatchedInsert(
        values = {
            "id_envio": col("in.id_envio"),
            "id_linea": col("in.id_linea"),
            "id_cliente": col("in.id_cliente"),
            "id_sucursal": col("in.id_sucursal"),
            "id_ruta": col("in.id_ruta"),
            "fecha_envio": col("in.fecha_envio"),
            "estado": col("in.estado"),
            "peso_kg": col("in.peso_kg"),
            "costo_envio": col("in.costo_envio"),
            "updated_at": col("in.updated_at")
        }
    )
    .execute()
)

Ranking de clientes por número total de envíos.

In [0]:
df_cliente_envio = (
  silver_envios_incremento_dedup
  .groupBy("id_cliente")
  .agg(count("*").alias("total_envios"))
) 

In [0]:
window_rank = Window.orderBy(col("total_envios").desc())
df_ranking_envio = (
    df_clientes_envios
    .withColumn("ranking", rank().over(window_rank))
    .filter(col("ranking") <= 5)
)
display(df_ranking_envio)

Peso acumulado de envíos por cliente

In [0]:
costo_promedio = (
    silver_envios_incremento_dedup
    .withColumn("mes", trunc(col("fecha_envio"), "month"))
    .groupBy("id_sucursal", "mes")
    .agg(round(avg("costo_envio"),2).alias("costo_promedio"))
)

windows_lag = Window.partitionBy("id_sucursal").orderBy(col("mes"))


df_variacion_costo_sucursal = (
    costo_promedio
    .withColumn("costo_mes_anterior", lag("costo_promedio").over(windows_lag))
    .withColumn("variacion", round(col("costo_promedio") - col("costo_mes_anterior"),2))
)

Variación de costo promedio de envíos por sucursal entre meses consecutivos.

In [0]:
df_peso_acumulado_envio = (
  silver_envios_incremento_dedup
  .groupBy("id_cliente")
  .agg(round(sum("peso_kg"),2).alias("peso_acumulado"))
  .orderBy(col("peso_acumulado").desc())
) 

In [0]:
silver_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.clientes")
silver_envios_base.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_base")
silver_envios_incremento.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_incremento")
silver_envios_incremento_dedup.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog_name}.{schema_silver}.envios_incremento_dedup")
silver_rutas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.rutas")
silver_sucursales.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.sucursales")

df_ranking_envio.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.ranking_envio")
df_variacion_costo_sucursal.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.variacion_costo_sucursal")
df_peso_acumulado_envio.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.peso_acumulado_envio")



## Gold

Diseño Dimensional

In [0]:
dim_cliente = (
    spark.table(f"{catalog_name}.{schema_silver}.clientes")
    #.dropDuplicates(["id_clientes"])
)
display(dim_cliente)

In [0]:
dim_sucursal = (
    spark.table(f"{catalog_name}.{schema_silver}.sucursales")
    #.dropDuplicates(["id_sucursales"])
)
display(dim_sucursal)

In [0]:
dim_ruta = (
    spark.table(f"{catalog_name}.{schema_silver}.rutas")
    #.dropDuplicates(["id_rutas"]) = (
)
display(dim_ruta)

In [0]:
dim_tiempo = (
    silver_envios_base.select(col("fecha_envio").alias("fecha"))
    .dropna().distinct()
    .withColumn("anio", year(col("fecha")))
    .withColumn("mes", month(col("fecha")))
    .withColumn("dia", dayofmonth(col("fecha")))
    .withColumn("semestre", concat_ws("-", col("anio"), when(col("mes") <= 7, lit("01")).otherwise(lit("02"))))
    .withColumn("id_tiempo", (col("anio") * 10000 + col("mes") * 100 + col("dia")).cast("int"))
)
display(dim_tiempo)

In [0]:
dim_region = (
    silver_envios_base.join(dim_sucursal, "id_sucursal")
    .groupBy("region")
    .agg(
        count("id_sucursal").alias("num_sucursales"),
        count("id_envio").alias("volumen_envios")
    )
)
display(dim_region)

In [0]:
fact_envio = (
    silver_envios_base.alias("e")
    .join(dim_cliente.alias("c"), "id_cliente")
    .join(dim_sucursal.alias("s"), "id_sucursal")
    .join(dim_ruta.alias("r"), "id_ruta")
    .join(dim_tiempo.alias("t"), col("e.fecha_envio") == col("t.fecha"))
    .select(
        col("e.id_envio"),
        col("c.id_cliente"),
        col("s.id_sucursal"),
        col("r.id_ruta"),
        col("t.id_tiempo"),
        col("e.peso_kg"),
        col("e.costo_envio"),
        col("e.estado"),
        col("e.fecha_envio")
    )
)
display(fact_envio)

In [0]:
schema = StructType([
    StructField("audit_id", StringType(), False),
    StructField("lote_id", StringType(), False),
    StructField("source_path", StringType(), False),
    StructField("records_read", IntegerType(), False),
    StructField("records_deduplicated", IntegerType(), False),
    StructField("records_inserted", IntegerType(), False),
    StructField("records_updated", IntegerType(), False),
    StructField("target_table", StringType(), False),
    StructField("timestamp", TimestampType(), False)
])

audit_ingestion = spark.createDataFrame(
    [
        (
            str(uuid.uuid4()),
            "lote_001",
            "/path/to/source",
            1000,
            900,
            800,
            100,
            f"{catalog_name}.{schema_silver}.envios_base",
            datetime.datetime.now()
        )
    ],
    schema=schema
)
display(audit_ingestion)

In [0]:
dim_cliente.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_cliente")
dim_sucursal.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_sucursal")
dim_tiempo.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_tiempo")
dim_region.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_region")
fact_envio.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.fact_envio")
audit_ingestion.write.format("delta").mode("append").saveAsTable(f"{catalog_name}.{schema_gold}.audit_ingestion")

![](/Volumes/tallerevaluado03/default/input/evidencia_Taller03.jpg)

In [0]:
%sql
-- Top 5 clientes con más envíos.
SELECT 
c.id_cliente,
CONCAT(c.nombre, ' ', c.apellido) AS cliente,
COUNT(f.id_envio) AS total_envios
FROM tallerevaluado03.gold.fact_envio f
LEFT JOIN tallerevaluado03.gold.dim_cliente c 
  ON f.id_cliente = c.id_cliente
GROUP BY c.id_cliente, c.nombre, c.apellido
ORDER BY total_envios DESC
LIMIT 5; 

In [0]:
%sql
--Ranking de sucursales con mayor volumen de envíos.
SELECT
s.id_sucursal,
s.ciudad,
s.region,
COUNT(f.id_envio) AS total_envios
FROM tallerevaluado03.gold.fact_envio f
LEFT JOIN tallerevaluado03.gold.dim_sucursal s
  ON f.id_sucursal = s.id_sucursal
GROUP BY s.id_sucursal, s.ciudad, s.region
ORDER BY total_envios DESC; 