In [0]:
# ============================================
# ETL Pipeline - Ingesta y Transformaci√≥n
# ============================================

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from datetime import datetime

# Configuraci√≥n
CATALOG = "datasets_github_projects"
SCHEMA = "default"
SOURCE_TABLE = "ventas_raw"
TARGET_TABLE = "ventas_transformed"

print(f"üöÄ Pipeline iniciado: {datetime.now()}")
print(f"üìä Origen: {CATALOG}.{SCHEMA}.{SOURCE_TABLE}")

üöÄ Pipeline iniciado: 2025-11-02 04:12:55.258776
üìä Origen: datasets_github_projects.default.ventas_raw


In [0]:
# Leer datos desde la tabla Delta
df_raw = spark.table(f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}")

print(f"‚úÖ Datos cargados: {df_raw.count()} registros")
print("\nüìã Esquema:")
df_raw.printSchema()
print("\nüëÄ Primeras 5 filas:")
display(df_raw.limit(5))

‚úÖ Datos cargados: 10 registros

üìã Esquema:
root
 |-- order_id: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- country: string (nullable = true)


üëÄ Primeras 5 filas:


order_id,customer_id,product_name,category,quantity,price,order_date,country
1001,C001,Laptop,Electronics,1,1200.5,2024-01-15,Colombia
1002,C002,Mouse,Electronics,2,25.99,2024-01-15,Mexico
1003,C001,Keyboard,Electronics,1,75.0,2024-01-16,Colombia
1004,C003,Chair,Furniture,4,150.0,2024-01-16,USA
1005,C002,Monitor,Electronics,1,300.0,2024-01-17,Mexico


In [0]:
# Validaciones iniciales
print("üîç AN√ÅLISIS EXPLORATORIO\n")

# Conteo por categor√≠a
print("üì¶ Productos por categor√≠a:")
df_raw.groupBy("category").count().orderBy(F.desc("count")).show()

# Verificar nulos
print("\nüö® Valores nulos por columna:")
df_raw.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_raw.columns]).show()

# Rango de fechas
print("\nüìÖ Rango temporal:")
df_raw.select(F.min("order_date"), F.max("order_date")).show()

üîç AN√ÅLISIS EXPLORATORIO

üì¶ Productos por categor√≠a:
+-----------+-----+
|   category|count|
+-----------+-----+
|Electronics|    6|
|  Furniture|    3|
| Stationery|    1|
+-----------+-----+


üö® Valores nulos por columna:
+--------+-----------+------------+--------+--------+-----+----------+-------+
|order_id|customer_id|product_name|category|quantity|price|order_date|country|
+--------+-----------+------------+--------+--------+-----+----------+-------+
|       0|          0|           0|       0|       0|    0|         0|      0|
+--------+-----------+------------+--------+--------+-----+----------+-------+


üìÖ Rango temporal:
+---------------+---------------+
|min(order_date)|max(order_date)|
+---------------+---------------+
|     2024-01-15|     2024-01-19|
+---------------+---------------+



In [0]:
# ============================================
# TRANSFORMACIONES
# ============================================

df_transformed = (
    df_raw
    # Calcular monto total por orden
    .withColumn("total_amount", F.col("quantity") * F.col("price"))
    
    # Extraer componentes de fecha
    .withColumn("year", F.year("order_date"))
    .withColumn("month", F.month("order_date"))
    .withColumn("day_of_week", F.dayofweek("order_date"))
    
    # Clasificar por volumen de compra
    .withColumn("order_size", 
        F.when(F.col("quantity") >= 10, "Large")
         .when(F.col("quantity") >= 5, "Medium")
         .otherwise("Small")
    )
    
    # Categorizar por precio unitario
    .withColumn("price_category",
        F.when(F.col("price") >= 500, "Premium")
         .when(F.col("price") >= 100, "Standard")
         .otherwise("Budget")
    )
    
    # Timestamp de procesamiento
    .withColumn("processed_at", F.current_timestamp())
)

print("‚úÖ Transformaciones aplicadas")
print(f"üìä Total de columnas: {len(df_transformed.columns)}")
print("\nüìã Nuevas columnas agregadas:")
print("  - total_amount: quantity * price")
print("  - year, month, day_of_week: componentes temporales")
print("  - order_size: clasificaci√≥n por volumen")
print("  - price_category: clasificaci√≥n por precio")
print("  - processed_at: timestamp de procesamiento")

‚úÖ Transformaciones aplicadas
üìä Total de columnas: 15

üìã Nuevas columnas agregadas:
  - total_amount: quantity * price
  - year, month, day_of_week: componentes temporales
  - order_size: clasificaci√≥n por volumen
  - price_category: clasificaci√≥n por precio
  - processed_at: timestamp de procesamiento


In [0]:
# Validar resultados
print("üîç VALIDACI√ìN DE TRANSFORMACIONES\n")

# Mostrar muestra con nuevas columnas
print("üëÄ Muestra de datos transformados:")
display(
    df_transformed
    .select("order_id", "product_name", "quantity", "price", 
            "total_amount", "order_size", "price_category")
    .limit(10)
)

# Estad√≠sticas de montos
print("\nüí∞ Estad√≠sticas de montos totales:")
df_transformed.select(
    F.min("total_amount").alias("min_amount"),
    F.max("total_amount").alias("max_amount"),
    F.avg("total_amount").alias("avg_amount"),
    F.sum("total_amount").alias("total_revenue")
).show()

# Distribuci√≥n por tama√±o de orden
print("\nüìä Distribuci√≥n por tama√±o de orden:")
df_transformed.groupBy("order_size").count().orderBy("order_size").show()

üîç VALIDACI√ìN DE TRANSFORMACIONES

üëÄ Muestra de datos transformados:


order_id,product_name,quantity,price,total_amount,order_size,price_category
1001,Laptop,1,1200.5,1200.5,Small,Premium
1002,Mouse,2,25.99,51.98,Small,Budget
1003,Keyboard,1,75.0,75.0,Small,Budget
1004,Chair,4,150.0,600.0,Small,Standard
1005,Monitor,1,300.0,300.0,Small,Standard
1006,Desk,2,450.0,900.0,Small,Standard
1007,Headphones,3,80.0,240.0,Small,Budget
1008,Lamp,5,35.5,177.5,Medium,Budget
1009,USB Cable,10,5.99,59.900000000000006,Large,Budget
1010,Notebook,20,2.5,50.0,Large,Budget



üí∞ Estad√≠sticas de montos totales:
+----------+----------+----------+-------------+
|min_amount|max_amount|avg_amount|total_revenue|
+----------+----------+----------+-------------+
|      50.0|    1200.5|   365.488|      3654.88|
+----------+----------+----------+-------------+


üìä Distribuci√≥n por tama√±o de orden:
+----------+-----+
|order_size|count|
+----------+-----+
|     Large|    2|
|    Medium|    1|
|     Small|    7|
+----------+-----+



In [0]:
# ============================================
# M√âTRICAS DE NEGOCIO
# ============================================

# Ventas por pa√≠s
ventas_por_pais = (
    df_transformed
    .groupBy("country")
    .agg(
        F.count("order_id").alias("num_orders"),
        F.sum("total_amount").alias("total_sales"),
        F.avg("total_amount").alias("avg_order_value")
    )
    .orderBy(F.desc("total_sales"))
)

print("üåé VENTAS POR PA√çS:")
display(ventas_por_pais)

# Ventas por categor√≠a
ventas_por_categoria = (
    df_transformed
    .groupBy("category")
    .agg(
        F.count("order_id").alias("num_orders"),
        F.sum("total_amount").alias("total_sales"),
        F.sum("quantity").alias("total_units")
    )
    .orderBy(F.desc("total_sales"))
)

print("\nüì¶ VENTAS POR CATEGOR√çA:")
display(ventas_por_categoria)

üåé VENTAS POR PA√çS:


country,num_orders,total_sales,avg_order_value
Colombia,4,2235.4,558.85
USA,2,777.5,388.75
Mexico,2,351.98,175.99
Spain,1,240.0,240.0
Argentina,1,50.0,50.0



üì¶ VENTAS POR CATEGOR√çA:


category,num_orders,total_sales,total_units
Electronics,6,1927.38,18
Furniture,3,1677.5,11
Stationery,1,50.0,20


In [0]:
# ============================================
# ESCRITURA A DELTA LAKE
# ============================================

# Definir ruta de salida
output_table = f"{CATALOG}.{SCHEMA}.{TARGET_TABLE}"

# Escribir como tabla Delta
(
    df_transformed
    .write
    .format("delta")
    .mode("overwrite")  # En producci√≥n usar√≠as append o merge
    .option("overwriteSchema", "true")
    .saveAsTable(output_table)
)

print(f"‚úÖ Datos guardados exitosamente en: {output_table}")
print(f"üìä Total de registros escritos: {df_transformed.count()}")

‚úÖ Datos guardados exitosamente en: datasets_github_projects.default.ventas_transformed
üìä Total de registros escritos: 10


In [0]:
# Verificar que la tabla existe y es accesible
df_check = spark.table(output_table)

print(f"üîç VALIDACI√ìN DE TABLA CREADA\n")
print(f"üìã Tabla: {output_table}")
print(f"üìä Registros: {df_check.count()}")
print(f"üìÅ Columnas: {len(df_check.columns)}")

# Mostrar esquema de la tabla final
print("\nüìã Esquema de tabla transformada:")
df_check.printSchema()

# Verificar metadatos Delta
print("\nüìú Historial de la tabla (Delta Lake):")
display(spark.sql(f"DESCRIBE HISTORY {output_table}").limit(5))

üîç VALIDACI√ìN DE TABLA CREADA

üìã Tabla: datasets_github_projects.default.ventas_transformed
üìä Registros: 10
üìÅ Columnas: 15

üìã Esquema de tabla transformada:
root
 |-- order_id: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- order_size: string (nullable = true)
 |-- price_category: string (nullable = true)
 |-- processed_at: timestamp (nullable = true)


üìú Historial de la tabla (Delta Lake):


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-11-02T04:17:07.000Z,78303099390903,msosa@unal.edu.co,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(300599389944397),1102-040923-m4ryxk2n-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 4507)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


In [0]:
# ============================================
# RESUMEN DEL PIPELINE
# ============================================

print("=" * 60)
print("‚úÖ PIPELINE COMPLETADO EXITOSAMENTE")
print("=" * 60)

print(f"\nüì• Origen:")
print(f"   Tabla: {CATALOG}.{SCHEMA}.{SOURCE_TABLE}")
print(f"   Registros: {df_raw.count()}")

print(f"\nüì§ Destino:")
print(f"   Tabla: {output_table}")
print(f"   Registros: {df_transformed.count()}")

print(f"\nüîß Transformaciones aplicadas:")
print(f"   - C√°lculo de monto total (quantity √ó price)")
print(f"   - Extracci√≥n de componentes temporales")
print(f"   - Clasificaci√≥n por tama√±o de orden")
print(f"   - Categorizaci√≥n por precio")
print(f"   - Timestamp de procesamiento")

print(f"\n‚è±Ô∏è  Pipeline finalizado: {datetime.now()}")
print("=" * 60)

‚úÖ PIPELINE COMPLETADO EXITOSAMENTE

üì• Origen:
   Tabla: datasets_github_projects.default.ventas_raw
   Registros: 10

üì§ Destino:
   Tabla: datasets_github_projects.default.ventas_transformed
   Registros: 10

üîß Transformaciones aplicadas:
   - C√°lculo de monto total (quantity √ó price)
   - Extracci√≥n de componentes temporales
   - Clasificaci√≥n por tama√±o de orden
   - Categorizaci√≥n por precio
   - Timestamp de procesamiento

‚è±Ô∏è  Pipeline finalizado: 2025-11-02 04:17:22.510562


In [None]:
# Nueva celda de prueba
print("üéâ Editado desde VS Code!")