In [None]:
from kafka import KafkaConsumer
import json

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.kafka:kafka-clients:3.5.1 pyspark-shell"



Este bloque configura e inicializa Spark y Spark Streaming para trabajar con datos de Kafka.  

Incluye la importación de módulos necesarios, la inicialización de SparkContext y StreamingContext, y la creación de una SparkSession con los paquetes requeridos para la integración con Kafka.  

Esto permite procesar flujos de datos en tiempo real provenientes de Kafka usando PySpark.

In [None]:
import findspark
findspark.init('/Users/joseaguilar/Documents/Development/spark/spark-3.5.1-bin-hadoop3')
from pyspark import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType, LongType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType, DecimalType
from pyspark.sql.functions import regexp_extract, split, from_unixtime, col, avg, min, max
from pyspark.sql.functions import grouping_id, window, explode, to_json, from_json
from pyspark.sql.functions import udf, lit, current_timestamp, current_date, date_format



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
#from pyspark.streaming.kafka import KafkaUtils

# Crear el contexto de Spark
sc = None
try:
    sc = SparkContext(appName="kafkademo")
except:
    sc = SparkContext.getOrCreate("kafkademo")
ssc = StreamingContext(sc, 10)  # Intervalo de 10 segundos


spark = SparkSession.builder \
    .appName("kafkademo") \
    .config("spark.jars.packages", 
            ",".join([
                "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1",
                "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.5.1",
                "org.apache.kafka:kafka-clients:3.5.1"
            ])) \
    .getOrCreate()

# Structured Streaming

Para conectarse con Kafka, se utilia un paradigma diferente llamado **Structured Streaming**.

Documentación: [Spark Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)

Mas general, documentación sobre Structured Streaming: [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) sobre todo la sección de [Basic Concepts](https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-concepts)

In [None]:
spark.sparkContext

In [None]:
# Mostrar los JARs cargados en el contexto de Spark
# Validar que esté cargado el paquete de Kafka
print("JARs cargados en el contexto de Spark:")
print("========================================")
java_jars = spark.sparkContext._jsc.sc().listJars()
jars = [java_jars.apply(i) for i in range(java_jars.length())]
for jar in jars:
    print(jar)

In [None]:
# Definir la dirección del servidor de Kafka
kafka_server = "http://localhost:9092"

In [None]:
# Leer datos en streaming desde el tópico "temperatura" de Kafka
rawDF = spark.readStream\
              .format("kafka")\
              .option("kafka.bootstrap.servers", kafka_server)\
              .option("subscribe", "temperatura")\
              .option("startingOffsets", "latest")\
              .load()

In [None]:
# Extraer y transformar los datos del stream de Kafka
# Convertir el valor del mensaje de Kafka a JSON y extraer los campos
from pyspark.sql.functions import from_json
valores = (rawDF
           .selectExpr("CAST(value AS STRING) as json")
           .select(from_json("json", "sensor STRING, value INT").alias("data"))
           .select("data.*"))

In [None]:
# Calcular el promedio de los valores cada 10 segundos usando una ventana de tiempo
# Para esto, utilizamos la función `window` de PySpark
# que permite "ver" los datos agrupados de cierta forma
agregados = (valores
             .groupBy(window(current_timestamp(), "10 seconds"))
             .avg("value"))

In [None]:
# Iniciar la consulta de streaming para mostrar los resultados agregados en la consola
query = (agregados.writeStream
         .outputMode("complete")
         .format("console")
         .option("truncate", False)
         .start())

In [None]:
# Esperar a que la consulta de streaming termine (por ejemplo, durante 60 segundos)
query.awaitTermination(60)
query.stop()
