In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 1. ConfiguraciÃ³n de Spark (Misma estructura que tu archivo de pedidos)
packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.262"
]

spark = SparkSession.builder \
    .appName("AlertasTemperaturaStreaming") \
    .config("spark.jars.packages", ",".join(packages)) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [2]:
from pyspark.sql.functions import col, split, to_timestamp, avg, count

# 1. Lectura de Kafka (Se mantiene igual)
kafka_hum_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "humidity-sensors") \
    .option("startingOffsets", "latest") \
    .load()

# 2. Parseo para formato CSV (separado por comas)
# Dividimos el valor por comas y asignamos cada posiciÃ³n a su columna
parsed_hum_df = kafka_hum_df.selectExpr("CAST(value AS STRING) as raw_data") \
    .select(split(col("raw_data"), ",").alias("fields")) \
    .select(
        col("fields").getItem(0).alias("sensor_id"),
        col("fields").getItem(1).cast("double").alias("humidity"),
        col("fields").getItem(2).alias("original_timestamp"),
        col("fields").getItem(3).alias("location")
    ) \
    .withColumn("event_timestamp", to_timestamp(col("original_timestamp")))

# Variables para el contador
total_acumulado = 0

# 3. FunciÃ³n de MonitorizaciÃ³n (Adaptada)
def monitor_humedad(batch_df, batch_id):
    global total_acumulado
    
    # Limpiamos filas vacÃ­as
    clean_df = batch_df.filter(col("sensor_id").isNotNull())
    
    if not clean_df.isEmpty():
        conteo_lote = clean_df.count()
        total_acumulado += conteo_lote
        
        print(f"\n--- ðŸ’§ MONITOR DE HUMEDAD CSV (Batch: {batch_id}) ---")
        print(f"Total de mensajes procesados: {total_acumulado}")
        
        print("Promedio de humedad (Ãšltimas 10 del lote):")
        clean_df.orderBy(col("event_timestamp").desc()) \
                .limit(10) \
                .agg(avg("humidity").alias("Humedad_Promedio_Reciente")) \
                .show()
        
        print("Lecturas actuales por sensor:")
        clean_df.groupBy("sensor_id").agg(
            avg("humidity").alias("Media_Humedad"),
            count("*").alias("Total_Lecturas")
        ).show()
    else:
        print(f"--- Batch {batch_id}: Esperando datos vÃ¡lidos... ---")

# 4. Inicio del Stream
query_humedad = parsed_hum_df.writeStream \
    .foreachBatch(monitor_humedad) \
    .trigger(processingTime="5 seconds") \
    .start()

print("âœ… Monitor de Humedad (CSV) activo en segundo plano.")
print("Usa la celda de abajo para detenerlo cuando quieras.")

âœ… Monitor de Humedad (CSV) activo en segundo plano.
Usa la celda de abajo para detenerlo cuando quieras.
--- Batch 0: Esperando datos vÃ¡lidos... ---

--- ðŸ’§ MONITOR DE HUMEDAD CSV (Batch: 1) ---
Total de mensajes procesados: 1
Promedio de humedad (Ãšltimas 10 del lote):
+-------------------------+
|Humedad_Promedio_Reciente|
+-------------------------+
|                    46.96|
+-------------------------+

Lecturas actuales por sensor:
+---------+-------------+--------------+
|sensor_id|Media_Humedad|Total_Lecturas|
+---------+-------------+--------------+
|    hum-1|        46.96|             1|
+---------+-------------+--------------+


--- ðŸ’§ MONITOR DE HUMEDAD CSV (Batch: 2) ---
Total de mensajes procesados: 6
Promedio de humedad (Ãšltimas 10 del lote):
+-------------------------+
|Humedad_Promedio_Reciente|
+-------------------------+
|       43.846000000000004|
+-------------------------+

Lecturas actuales por sensor:
+---------+------------------+--------------+
|senso

In [3]:
for query in spark.streams.active:
    print(f"Deteniendo query: {query.name} (ID: {query.id})")
    query.stop()

print("ðŸ›‘ Todos los procesos detenidos.")

Deteniendo query: None (ID: ca2c6363-c5cd-4d90-a5a1-f087be39b6f6)
ðŸ›‘ Todos los procesos detenidos.
