# Notebook 03: Procesamiento ETL con Spark

**Universidad:** Universidad Nacional Experimental de Guayana (UNEG)  
**Asignatura:** Sistemas de Bases de Datos II  
**Proyecto:** Proyecto N¬∞ 2 - Data Pipeline Escalable

---

**Descripci√≥n:**  
Extract (Cassandra), Transform (Spark), Load (ClickHouse).

In [3]:
import time
import json
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DecimalType, DateType, IntegerType

# Archivo de m√©tricas compartido (en el directorio docs montado)
METRICS_FILE = '../docs/metricas.json'

# Crear directorio docs si no existe
os.makedirs('../docs', exist_ok=True)

# Configuraci√≥n de Spark con conectores de Cassandra y ClickHouse
spark = SparkSession.builder \
    .appName("ETL_Cassandra_Spark_ClickHouse") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0,com.clickhouse:clickhouse-jdbc:0.5.0") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.localDC", "dc1") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [4]:
print("--- 1. Leyendo datos de Cassandra (ventas_db.ventas_crudas) ---")
start_read = time.time()

df_raw = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="ventas_crudas", keyspace="ventas_db") \
    .load()

count_raw = df_raw.count()
end_read = time.time()

print(f"‚úÖ Lectura completada en {end_read - start_read:.2f} segundos")
print(f"Total de registros crudos: {count_raw}")
df_raw.printSchema()
df_raw.show(5)

--- 1. Leyendo datos de Cassandra (ventas_db.ventas_crudas) ---
‚úÖ Lectura completada en 8.60 segundos
Total de registros crudos: 411474
root
 |-- fecha_venta: timestamp (nullable = false)
 |-- categoria: string (nullable = true)
 |-- id_cliente: string (nullable = true)
 |-- id_producto: string (nullable = true)
 |-- id_venta: string (nullable = true)
 |-- monto_total: decimal(38,18) (nullable = true)

+-------------------+-----------+----------+-----------+--------------------+--------------------+
|        fecha_venta|  categoria|id_cliente|id_producto|            id_venta|         monto_total|
+-------------------+-----------+----------+-----------+--------------------+--------------------+
|2024-07-23 14:11:00|  Alimentos|   CLI-247|   PROD-287|da6424d3-e816-44e...|1338.780000000000...|
|2024-08-14 04:42:00|Electronica|   CLI-243|   PROD-123|785b34af-7ce3-4f8...|822.9700000000000...|
|2024-04-16 06:00:00|      Hogar|   CLI-323|   PROD-355|4fcb7a53-1bd9-421...|970.5700000000000...

In [None]:
print("--- 2. Transformando datos (Agregaci√≥n por Fecha y Categor√≠a) ---")
start_transform = time.time()

# Transformaci√≥n: Casting, GroupBy, Aggregation
df_aggregated = df_raw \
    .withColumn("fecha_dia", col("fecha_venta").cast(DateType())) \
    .groupBy("fecha_dia", "categoria") \
    .agg(
        sum("monto_total").alias("ventas_totales"),
        count("id_venta").alias("cantidad_transacciones")
    )

df_result = df_aggregated.select(
    col("fecha_dia").alias("fecha_venta"),
    col("categoria"),
    col("ventas_totales").cast(DecimalType(18, 2)),
    col("cantidad_transacciones").cast(IntegerType())
)

# Forzamos una acci√≥n para medir el tiempo real de transformaci√≥n (Spark es lazy)
count_result = df_result.count()
end_transform = time.time()

print(f"‚úÖ Transformaci√≥n completada en {end_transform - start_transform:.2f} segundos")
print(f"Total de filas agregadas: {count_result}")
df_result.show(5)

--- 2. Transformando datos (Agregaci√≥n por Fecha y Categor√≠a) ---
‚úÖ Transformaci√≥n completada en 5.03 segundos
Total de filas agregadas: 1830


In [None]:
print("--- 3. Escribiendo en ClickHouse (dw_analitico.ventas_resumen) ---")
start_write = time.time()

jdbc_url = "jdbc:clickhouse://clickhouse:8123/dw_analitico"
properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

try:
    df_result.write \
        .mode("append") \
        .jdbc(url=jdbc_url, table="ventas_resumen", properties=properties)
    
    end_write = time.time()
    print(f"‚úÖ Carga en ClickHouse exitosa en {end_write - start_write:.2f} segundos")
except Exception as e:
    end_write = time.time()
    print(f"‚ùå Error al escribir en ClickHouse: {e}")

In [None]:
# Calcular tiempos
tiempo_lectura = end_read - start_read
tiempo_transformacion = end_transform - start_transform
tiempo_escritura = end_write - start_write
tiempo_total_etl = tiempo_lectura + tiempo_transformacion + tiempo_escritura

print("--- üìä Resumen de M√©tricas de Rendimiento ---")
print(f"1. Lectura (Cassandra):    {tiempo_lectura:.2f} s")
print(f"2. Transformaci√≥n (Spark): {tiempo_transformacion:.2f} s")
print(f"3. Carga (ClickHouse):     {tiempo_escritura:.2f} s")
print(f"-------------------------------------------")
print(f"Tiempo Total ETL:          {tiempo_total_etl:.2f} s")

In [None]:
# =====================================================
# üìä GUARDAR M√âTRICAS PARA NOTEBOOK 04
# =====================================================

# Leer m√©tricas existentes o crear nuevo diccionario
if os.path.exists(METRICS_FILE):
    with open(METRICS_FILE, 'r') as f:
        metricas = json.load(f)
else:
    metricas = {}

# Actualizar con m√©tricas de este notebook
metricas['etl_spark'] = {
    'tiempo_lectura': round(tiempo_lectura, 2),
    'tiempo_transformacion': round(tiempo_transformacion, 2),
    'tiempo_escritura': round(tiempo_escritura, 2),
    'tiempo_total': round(tiempo_total_etl, 2),
    'registros_entrada': count_raw,
    'registros_salida': count_result,
    'timestamp': datetime.now().isoformat()
}

# Guardar en docs
with open(METRICS_FILE, 'w') as f:
    json.dump(metricas, f, indent=2)

print(f"‚úÖ M√©tricas guardadas en: {METRICS_FILE}")
print(f"   - Tiempo total ETL: {tiempo_total_etl:.2f} segundos")