In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, round

# Detener sesiones previas para evitar conflictos de configuraci√≥n
try:
    spark.stop()
except:
    pass

# Inicializar sesi√≥n de Spark con el conector de Cassandra
# Usamos el paquete oficial de Datastax para asegurar compatibilidad
spark = SparkSession.builder \
    .appName("Pipeline_BigData_Fase3") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \
    .config("spark.cassandra.connection.host", "cassandra_db") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .getOrCreate()

print("‚úÖ Sesi√≥n de Spark inicializada con el conector de Cassandra.")

‚úÖ Sesi√≥n de Spark inicializada con el conector de Cassandra.


In [2]:
# Fase 3.1: Lectura distribuida desde el Keyspace de Cassandra
df_crudo = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="ventas_crudas", keyspace="proyecto_bigdata") \
    .load()

# Fase 3.3: Limpieza de datos (eliminaci√≥n de montos inv√°lidos)
df_limpio = df_crudo.filter(col("monto_total") > 0)

# Fase 3.2: L√≥gica de Agregaci√≥n
# Consolidamos 100k registros en un resumen diario por categor√≠a
df_resumen = df_limpio.groupBy("fecha_venta", "categoria") \
    .agg(
        round(sum("monto_total"), 2).alias("ventas_totales"),
        count("id_venta").alias("cantidad_transacciones")
    )

print("üìä Resumen anal√≠tico generado (Vista previa):")
df_resumen.show(10)
df_resumen.printSchema()

üìä Resumen anal√≠tico generado (Vista previa):
+-------------------+-----------+--------------+----------------------+
|        fecha_venta|  categoria|ventas_totales|cantidad_transacciones|
+-------------------+-----------+--------------+----------------------+
|2025-12-16 00:00:00|  Alimentos|     329990.06|                   656|
|2025-12-28 00:00:00|  Alimentos|     336338.33|                   642|
|2026-02-03 00:00:00|       Ropa|     334491.98|                   678|
|2026-01-21 00:00:00|  Alimentos|     331743.40|                   666|
|2026-01-09 00:00:00|   Deportes|     342667.70|                   690|
|2025-12-28 00:00:00|   Deportes|     325962.41|                   640|
|2026-01-05 00:00:00|  Alimentos|     320445.81|                   642|
|2026-01-25 00:00:00|Electr√≥nica|     339704.86|                   689|
|2026-02-03 00:00:00|Electr√≥nica|     369185.57|                   688|
|2025-12-22 00:00:00|      Hogar|     320625.39|                   631|
+------------

In [3]:
# Instalaci√≥n de cliente ligero para ClickHouse (v√≠a protocolo HTTP)
!pip install clickhouse-connect



In [4]:
import clickhouse_connect

print("üì¶ Extrayendo registros de la capa de procesamiento...")
# Convertimos el DataFrame a una lista de Python para la inserci√≥n masiva
registros = [list(row) for row in df_resumen.collect()]

try:
    # Conexi√≥n directa al Data Warehouse ClickHouse
    client = clickhouse_connect.get_client(
        host='172.18.0.3', 
        port=8123, 
        username='default', 
        password=''
    )

    # Creamos el esquema dw_analitico seg√∫n especificaciones de la Tarea 4.1
    client.command("CREATE DATABASE IF NOT EXISTS dw_analitico")
    
    # Creamos la tabla con motor MergeTree para optimizar consultas de agregaci√≥n
    client.command("""
        CREATE TABLE IF NOT EXISTS dw_analitico.ventas_resumen (
            fecha_venta Date,
            categoria String,
            ventas_totales Float64,
            cantidad_transacciones Int64
        ) ENGINE = MergeTree() ORDER BY (fecha_venta, categoria)
    """)

    # Inserci√≥n de los datos transformados
    client.insert('dw_analitico.ventas_resumen', registros, 
                  column_names=['fecha_venta', 'categoria', 'ventas_totales', 'cantidad_transacciones'])

    print(f"‚úÖ CARGA EXITOSA: {len(registros)} filas migradas a dw_analitico.ventas_resumen.")
    print("üöÄ Pipeline completo: Cassandra -> Spark -> ClickHouse operativo.")

except Exception as e:
    print(f"‚ùå Error durante la carga: {e}")

üì¶ Extrayendo registros de la capa de procesamiento...
‚úÖ CARGA EXITOSA: 300 filas migradas a dw_analitico.ventas_resumen.
üöÄ Pipeline completo: Cassandra -> Spark -> ClickHouse operativo.
