#### Consumo de datos desde Kafka

En esta etapa se realiza el consumo de datos en tiempo real utilizando **Apache Kafka**. Para ello, se importan las librerías necesarias, incluyendo `KafkaConsumer` y PySpark. Posteriormente, se configura el consumidor estableciendo la conexión con el servidor de Kafka y se define el proceso de deserialización de los mensajes, permitiendo transformar los datos recibidos en un formato adecuado para su posterior procesamiento y análisis.

#### Almacenamiento y procesamiento de datos

En esta etapa se realiza el consumo de datos provenientes de **Apache Kafka** mediante el uso de `KafkaConsumer`. A partir de los mensajes recibidos, se construye un DataFrame que permite almacenar los datos y aplicar las transformaciones requeridas para el análisis.

Los datos provenientes de Kafka se leen dentro de un bucle de consumo continuo y se almacenan temporalmente en una estructura denominada `data_batch`. Con el fin de optimizar el procesamiento, se limita el número de registros por lote, y mediante una lógica condicional se transforma cada lote de datos en un DataFrame de Spark sobre el cual se aplican las siguientes operaciones:

- Limpieza de registros con valores nulos.
- Unión (*join*) con la tabla de ubicaciones utilizando los identificadores de origen y destino de los viajes.
- Cálculo de las ganancias de la plataforma por cada viaje.
- Cálculo del acumulado de las ganancias generadas por la plataforma.


In [None]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, FloatType
)
from pyspark.sql.functions import col, sum, first
import time
from datetime import datetime
import os

# =====================================================
# 1. CONFIGURACIÓN GENERAL
# =====================================================

KAFKA_BROKER = '100.68.89.127:9092'
TOPICS = ['uber_trips_clean', 'uber_trips_prod']
CONSUMER_GROUP = 'uber_realtime_analysis'
SIZE_BATCH = 10

REALTIME_OUTPUT_DIR = "output_realtime_analysis"
CLEAN_DIR = f"{REALTIME_OUTPUT_DIR}/clean_data"
AGG_DIR = f"{REALTIME_OUTPUT_DIR}/aggregated_data"

os.makedirs(CLEAN_DIR, exist_ok=True)
os.makedirs(AGG_DIR, exist_ok=True)

# =====================================================
# 2. ESQUEMA DE DATOS
# =====================================================

ENRICHED_SCHEMA = StructType([
    StructField("index_trip", IntegerType(), True),
    StructField("request_datetime", StringType(), True),
    StructField("pickup_datetime", StringType(), True),
    StructField("dropoff_datetime", StringType(), True),
    StructField("pulocationid", IntegerType(), True),
    StructField("dolocationid", IntegerType(), True),
    StructField("base_passenger_fare", FloatType(), True),
    StructField("tips", FloatType(), True),
    StructField("driver_pay", FloatType(), True),
    StructField("hour", FloatType(), True),
    StructField("year", FloatType(), True),
    StructField("month", FloatType(), True),
    StructField("day", FloatType(), True),
    StructField("on_time_pickup", IntegerType(), True),

    # Enriquecidas
    StructField("uber_sales", FloatType(), True),
    StructField("pickup_zone", StringType(), True),
    StructField("delivery_zone", StringType(), True),
    StructField("source", StringType(), True),
    StructField("sent_at", StringType(), True),
])

# =====================================================
# 3. INICIALIZACIÓN SPARK Y KAFKA
# =====================================================

spark = (
    SparkSession.builder
    .appName("KafkaTripsRealtimeProcessor")
    .getOrCreate()
)

print("SparkSession iniciada")

consumer = KafkaConsumer(
    *TOPICS,
    bootstrap_servers=KAFKA_BROKER,
    auto_offset_reset='earliest',
    group_id=CONSUMER_GROUP,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    consumer_timeout_ms=10000
)

print(f"Consumidor Kafka suscrito a: {TOPICS}")
print("\nINICIANDO CONSUMO EN TIEMPO REAL\n")

# =====================================================
# 4. BUCLE PRINCIPAL
# =====================================================

data_batch = []

try:
    for message in consumer:

        if message.value is None or not isinstance(message.value, dict):
            continue

        data_batch.append(message.value)

        print(
            f"Mensaje recibido [{message.topic}] "
            f"Trip ID: {message.value.get('index_trip', 'N/A')}"
        )

        # -------------------------------------------------
        # PROCESAR BATCH
        # -------------------------------------------------
        if len(data_batch) >= SIZE_BATCH:

            print(
                f"\n Batch completo "
                f"({len(data_batch)} registros) "
                f"{datetime.now().strftime('%H:%M:%S')}"
            )

            df = spark.createDataFrame(data_batch, ENRICHED_SCHEMA)

            # ------------------------------
            # AGREGACIÓN
            # ------------------------------
            df_agg_zone = (
                df.groupBy("pickup_zone", "source")
                .agg(
                    sum("uber_sales").alias("total_sales_zone"),
                    first("year").alias("year"),
                    first("hour").alias("hour")
                )
                .filter(col("total_sales_zone") > 0)
                .orderBy(col("total_sales_zone").desc())
            )

            # ------------------------------
            # ESCRITURA CLEAN DATA
            # ------------------------------
            df.coalesce(1).write.mode("append").option(
                "header", True
            ).csv(CLEAN_DIR)

            # ------------------------------
            # ESCRITURA AGREGADOS (SOLO SI HAY DATOS)
            # ------------------------------
            if df_agg_zone.count() > 0:
                df_agg_zone.coalesce(1).write.mode("append").option(
                    "header", True
                ).csv(AGG_DIR)

                print("CSV agregado escrito correctamente")
                print(f"Ruta: {os.path.abspath(AGG_DIR)}")
            else:
                print("Batch sin datos agregables — no se escribe CSV")

            # ------------------------------
            # DEBUG VISUAL
            # ------------------------------
            print("\nTop Zonas por Ventas (Batch)")
            df_agg_zone.select(
                "pickup_zone", "total_sales_zone", "hour"
            ).limit(3).show(truncate=False)

            print("-" * 50)

            data_batch.clear()
            time.sleep(1)

except Exception as e:
    print(f"ERROR CRÍTICO EN CONSUMER: {e}")

finally:
    consumer.close()
    spark.stop()
    print("\nConsumer detenido correctamente")


SparkSession iniciada
Consumidor Kafka suscrito a: ['uber_trips_clean', 'uber_trips_prod']

INICIANDO CONSUMO EN TIEMPO REAL

Mensaje recibido [uber_trips_clean] Trip ID: 987
Mensaje recibido [uber_trips_clean] Trip ID: 878
Mensaje recibido [uber_trips_prod] Trip ID: 987
Mensaje recibido [uber_trips_prod] Trip ID: 878
Mensaje recibido [uber_trips_clean] Trip ID: 110
Mensaje recibido [uber_trips_prod] Trip ID: 110
Mensaje recibido [uber_trips_clean] Trip ID: 91
Mensaje recibido [uber_trips_prod] Trip ID: 91
Mensaje recibido [uber_trips_clean] Trip ID: 128
Mensaje recibido [uber_trips_prod] Trip ID: 128

 Batch completo (10 registros) 00:57:03


                                                                                

CSV agregado escrito correctamente
Ruta: /home/admin/Proyectos/ProyectoFInal/output_realtime_analysis/aggregated_data

Top Zonas por Ventas (Batch)
+------------------------+-----------------+----+
|pickup_zone             |total_sales_zone |hour|
+------------------------+-----------------+----+
|Financial District North|7.65000057220459 |NULL|
|Financial District North|7.65000057220459 |NULL|
|Kips Bay                |5.970000267028809|NULL|
+------------------------+-----------------+----+

--------------------------------------------------
Mensaje recibido [uber_trips_clean] Trip ID: 363
Mensaje recibido [uber_trips_prod] Trip ID: 363
Mensaje recibido [uber_trips_prod] Trip ID: 251
Mensaje recibido [uber_trips_clean] Trip ID: 251
Mensaje recibido [uber_trips_prod] Trip ID: 744
Mensaje recibido [uber_trips_clean] Trip ID: 744
Mensaje recibido [uber_trips_prod] Trip ID: 778
Mensaje recibido [uber_trips_clean] Trip ID: 778
Mensaje recibido [uber_trips_prod] Trip ID: 819
Mensaje recibi

                                                                                

CSV agregado escrito correctamente
Ruta: /home/admin/Proyectos/ProyectoFInal/output_realtime_analysis/aggregated_data

Top Zonas por Ventas (Batch)
+-------------------------+------------------+----+
|pickup_zone              |total_sales_zone  |hour|
+-------------------------+------------------+----+
|Crown Heights North      |1.4200000762939453|NULL|
|Crown Heights North      |1.4200000762939453|NULL|
|Williamsburg (North Side)|1.3400006294250488|NULL|
+-------------------------+------------------+----+

--------------------------------------------------
Mensaje recibido [uber_trips_prod] Trip ID: 310
Mensaje recibido [uber_trips_clean] Trip ID: 310
