In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [3]:
spark = SparkSession.builder \
    .appName("HumidityStreamingFinal_Snapshot6") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [4]:
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("timestamp", StringType(), True),
    StructField("location", StringType(), True)
])

In [5]:
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "humidity-sensors") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [6]:
parsed_df = df_kafka.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

monitor = parsed_df.limit(10).agg(
    avg("humidity").alias("Promedio_Ultimas_10_Lecturas"),
    count("*").alias("Total_Lecturas_Recibidas")
)

In [7]:
query = monitor.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("tabla_humedad_final") \
    .start()

In [9]:
import time
print("Esperando a que los datos Sfluyan...")
time.sleep(10)
spark.sql("select * from tabla_humedad_final").show()

Esperando a que los datos fluyan...
+----------------------------+------------------------+
|Promedio_Ultimas_10_Lecturas|Total_Lecturas_Recibidas|
+----------------------------+------------------------+
|                      47.477|                      10|
+----------------------------+------------------------+

