In [1]:
import os
from time import sleep

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [3]:
USUARIO = "yy"

if not USUARIO:
    raise RuntimeError("Por favor escriba su usuario Uniandes")

# Creamos la sesión de spark

In [4]:
spark = (
    SparkSession.builder 
        .appName("streaming_data") 
        .master("local[1]")
        .config(
            "spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension"
        )
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
        .config(
            "fs.azure.account.key.streamingclass.dfs.core.windows.net",
            os.environ.get("AZURE_KEY")
        )
        .getOrCreate()
)


:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2.5.2/cache
The jars for the packages stored in: /home/jovyan/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
io.delta#delta-spark_2.13 added as a dependency
org.apache.hadoop#hadoop-azure added as a dependency
com.azure#azure-storage-blob added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e833fc86-88b4-4f96-a062-e696aa8ce2f0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.0.16 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.1 in central
	found org.

In [5]:
spark

# Cargar datos de referencia de los sensores

In [None]:
datos_referencia = spark.read.format("json").load(f"abfss://data@streamingclass.dfs.core.windows.net/datos/sensores.json")

datos_referencia.show()

## Conectarse a un tema de Kafka como fuente

In [None]:
CONNECTION_STRING = os.environ["KAFKA_CONNECTION_STRING"]
JAAS_CONFIG = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION_STRING}";'

df_kafka = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "https://clase-streaming.servicebus.windows.net:9093")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", JAAS_CONFIG)
    .option("subscribe", "sensores")
    .option("startingOffsets","latest") # podría ser "earliest", "latest" o """{"tema":{"0":23,"1":-1}} """
    .load()
)


# Consultar los datos en consola para depuracion

In [None]:
query = (
    df_kafka.writeStream
    .format("console")
    .outputMode("append")
    .option("checkpointLocation", f"abfss://data@streamingclass.dfs.core.windows.net/{USUARIO}/checkpoint")
    .start()
) 

# Esperamos 10 segundos
sleep(30)

# Finalizamos el proceso
query.stop()

# Procesamiento de cada batch con foreachBatch

En este ejemplo solo imprimimos en pantalla, pero puede incluirse una lógica propia, por ejemplo para hacer un upsert en una tabla 

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Parseamos key y value que son campos binarios

# Definimos el esquema:
json_schema = StructType([
    StructField("temperatura", StringType(), True),
    StructField("funcionamiento", StringType(), True)
])


df_parseado = (
    df_kafka
    .select(
        f.col("key").cast("string"),
        f.from_json(f.col("value").cast("string"), json_schema).alias("data"),
        "timestamp"
    )
    .select(
        "key",
        "data.temperatura",
        "data.funcionamiento",
        "timestamp"
    )
)

df_parseado.printSchema()

In [None]:
def imprimir(x, epoch_id):
    """ 
    Función de ejemplo que simplemente imprime el lote
    """
    print(f"epoch_id: {epoch_id}")
    x.show(truncate=False)

In [None]:
query = (
    df_parseado
    .writeStream
    .outputMode("append")
    .foreachBatch(imprimir)
    .start()
) 

# Esperamos 10 segundos
sleep(30)

# Finalizamos el proceso
query.stop()

# Cálculo de la temperatura promedio de cada sensor

In [None]:
# Agrupamos y calculamos el promedio de deperatura
temperatura_por_sensor_ventana = (
    df_parseado
    .withWatermark("timestamp", "15 seconds")
    .groupby("key", f.window("timestamp", "10 seconds"))
    .agg(f.mean("temperatura"))
)

query = (
    temperatura_por_sensor_ventana
    .writeStream
    .outputMode("update")
    .option("checkpointLocation", f"abfss://data@streamingclass.dfs.core.windows.net/{USUARIO}/checkpoint4")
    .format("console")
    .start()
) 

# Esperamos 10 segundos
sleep(30)

# Finalizamos el proceso
query.stop()

# Retos

* **Escribir en un almacenamiento cada vez que un sensor tenga una temperatura mayor a 35 grados.**

abfss://data@streamingclass.dfs.core.windows.net/{USUARIO}/mayor_a_35


In [None]:
(
    df_parseado
    .filter("temperatura > 35")
    .writeStream
    .format("parquet")
    .start(f"abfss://data@streamingclass.dfs.core.windows.net/{USUARIO}/mayor_a_35")
)

* **Publicar en Kafka cada vez que un sensor supere su variación máxima de temperatura de acuerdo a la tabla de referencia. Debe escribirse cada 10 segundos, pero esperar valores retrasados hasta 10 segundos más.**

In [None]:
(
    df_parseado
    .withWatermark("timestamp", "10 seconds")
    .groupby("key", f.window("timestamp", "10 seconds"))
    .agg((f.max("temperatura") - f.min("temperatura")).alias("variacion"))
    .join(
        datos_referencia,
        on=(f.col("Sensor") == f.col("key"))
    )
    .filter("variacion > variacion_maxima")
    .select(
        f.col("Sensor").alias("key"),
        f.col("variacion").astype("string").alias("value"),
    )
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "https://clase-streaming.servicebus.windows.net:9093")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", JAAS_CONFIG)
    .option("topic", "alertas")
    .start()
)

* **Crear un reporte que muestre cuántas veces cada sensor ha reportado un estado "Incorrecto" en cada minuto.**

En consola



In [None]:
(
    df_parseado
    .withWatermark("timestamp", "10 seconds")
    .filter("estado == 'Incorrecto'")
    .groupby("key", f.window("timestamp", "60 seconds"))
    .count()
    .writeStream
    .format("console")
    .start()
)

* **Calcular la temperatura promedio de cada sensor durante los últimos 30 segundos, actualizando el cálculo cada 10 segundos**

En consola

In [None]:
(
    df_parseado
    .withWatermark("timestamp", "10 seconds")
    .groupby("key", f.window("timestamp", "30 seconds", "10 seconds"))
    .agg(f.mean("temperatura"))
    .writeStream
    .format("console")
    .start()
)