# Spark Structured Streaming - Event-Time and Stateful Processing

El Structured Streaming de Apache Spark es un motor de procesamiento de streaming scalable y tolerante a fallas en el motor de Spark SQL. Cuando definimos a una fuente de datos como streaming, Spark se encarga de hacer la ejecución de manera incremental y de actualizar el resultado a medida que los datos llegan a la fuente.

Podemos procesar los datos en streaming de diversas maneras, tales como:
* Hacer transformaciones a los datos y expresar agregaciones
* Procesar los datos en ventanas de event time
* Realizar joins stream-to-batch
* Entre otros

Por defecto, Spark procesa los datos en **micro-batches**, que son pequeños trabajos que se realizan de manera sequencial a medida que los datos llegan. Pero también es posible sacar provecho del **Continuous Streaming** (modo de procesamiento de baja latencia), sin sacrificar los transformaciones en los DataFrames/Datasets.

#### Objetivo de este notebook

Este notebook tiene por objetivo guiarnos en el API de Structured Streaming, para el procesamiento en base a eventos (event time processing) con un ejemplo sencillo de ingesta y transformación de *tweets* en Twitter mediante su API. Utilizaremos **Apache Kafka** como motor de almacenamiento en streaming distribuido para hacer la lectura de los datos.

Nos enfocaremos en el modelo de procesamiento micro-batch.

## 1. Leyendo los datos del Kafka

#### Iniciar SparkSession

In [None]:
# Encontrar la instalación de Spark
import findspark
import os
from dotenv import load_dotenv

load_dotenv()
spark_installation = os.environ["SPARK_HOME"]
findspark.init(spark_installation)

# Iniciar SparkSession
from pyspark.sql import SparkSession
spark = spark_session = SparkSession \
    .builder \
    .appName("Twitter Kafka Stream") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Reducir el número de Shuffle partitions para 5
spark.conf.set("spark.sql.shuffle.partitions", "5")

#### Cargar Kafka como fuente de streaming

In [None]:
# Definir el tópico de Kafka
topic_consumer = "delatam-streaming"

# Hacer la lectura
streaming_in = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", topic_consumer)\
.option("startingOffsets", "latest")\
.load()

## Transformaciones y agregaciones en streaming

Podemos emplear virtualmente todas las transformaciones que se realizan a los Dataframes y Datasets estáticos. Una gran ventaja del Structured Streaming es que se ejecuta en el mismo motor de Spark SQL, lo que nos posibilita expresar las transformaciones de la misma manera de cómo expresaríamos en un procesamiento en batch.

Para ver la transformación en acción, es necesario ejecutar el `producer.py` para Kafka pase a recibir los datos.

#### Convertir el valor al DataFrame

Definimos el *schema* para cargar los datos almacenados en la columna `value`. Es preferible no inferir el esquema de los datos para no crear jobs desnecesarios, principalmente si leemos archivos con grandes volúmenes de datos.

Luego, hacemos la conversión del `creation_time`, cuyo tipo pasaría de *string* a *timestamp*

In [None]:
import pyspark.sql.types as t
import pyspark.sql.functions as f


# Definir el esquema
schema = t.StructType([
    t.StructField("creation_time", t.StringType(), True),
    t.StructField("tweet_text", t.StringType(), False)])

# Seleccionar la columna de valor
recent_tweets = streaming_in.selectExpr("CAST(value AS STRING)")

# Transformar los datos con from_json
recent_tweets = recent_tweets.select(f.from_json(f.col("value"), schema).alias("data"))\
.select("data.*")

# Convertir la fecha de creación a timestamp
recent_tweets = recent_tweets.withColumn("creation_time",f.to_timestamp("creation_time"))

#### Aplicar una transformación de ejemplo (wordcount)

Aplicamos una transformación de wordcount para contar las palabras que aparecen en cada tweet.

In [None]:
wordcount = recent_tweets.withColumn('word', f.explode(f.split(f.col('tweet_text'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

#### Escribir el output al console sink para prueba y definir la variable de consulta

In [None]:
query_console = wordcount \
    .writeStream \
    .format("console") \
    .outputMode("complete")\
    .start()

#### Ver los streams activos

In [None]:
spark.streams.active

#### Detener el stream utilizando la variable de consulta

In [None]:
query_console.stop()

## Procesamiento de *event time* y *stateful processing*

Spark Streaming nos permite realizar transformaciones a los datos que caen en un periodo tardío en el motor de streaming. En este caso, se toma el *event time* como atributo para el procesamiento. El *event time* está incorporado dentro de los datos (cada evento es un registro), y representa el periodo de tiempo en que se generó el registro o evento.

Comúnmente se aplican funciones de ventana y  agregaciones para procesar los datos en intervalos de tiempo.

#### Aplicar una transformación en base a *event time*

Las transformaciones con *event time* funcionan de manera muy similar a las agregraciones con *groupBy*. De hecho, se aplica el *groupBy* y la función *window* para procesar los datos que caen dentro del rango del evento especificado.

En el siguiente ejemplo, hacemos un conteo de palabras en cada tweet cuyo *event time* se encuentra en el rango o intervalo definido por *window*. El cálculo se hace para cada intervalo, razón por la cual un mismo registro puede ser calculado dentro de uno o más intervalos de *event time*.

In [None]:
# Hacer la escritura
query_event_time = recent_tweets\
    .groupBy(f.window(f.col("creation_time"), "5 minutes", "1 minutes")).count()\
    .writeStream\
    .queryName("tweets_per_window")\
    .format("console")\
    .outputMode("complete")\
    .start()

In [None]:
# Detener el streaming
query_event_time.stop()

#### Aplicar una transformación en base a event time con *watermark*

El *watermark* es un técnica que nos permite descartar los datos que llegan muy tardes al motor de motor de Streaming, en vez de procesarlos en base a una tabla intermedia en memória que sólo crece con el pasar del tiempo. 

En otras palabras, si un dato llega y su *timestamp* cae en el intervalo cuyo término con respecto a periodo máximo (más actual) es mayor al rango del *watermark*, Spark no procesaría ese dato y eliminaría el intervalo antiguo.

En el siguiente ejemplo, los tweets más recientes se leen primero y son procesados. No obstante, a medida que Spark va leyendo los tweets más recientes, se va eliminando los intervalos más antiguos y los tweets que cairían en los rangos eliminados simplemente no se procesarían.

In [None]:
# Hacer la escritura con watermarking
query_event_time = recent_tweets\
    .withWatermark("creation_time", "1 minutes")\
    .groupBy(f.window(f.col("creation_time"), "5 minutes", "1 minutes")).count()\
    .writeStream\
    .queryName("tweets_per_window")\
    .format("console")\
    .outputMode("update")\
    .start()

### Conclusión

Spark Streaming nos ofrece una variedad de formas para procesar los datos en streaming. Una de estas formas es procesar los datos que pueden llegar tarde al motor de streaming mediante funciones de agregación con groupBy y con la función window para especificar los intervalos de tiempo para realizar el cálculo.

Hemos visto también como Spark descarta los datos que llegan muy tardes al motor de procesamiento mediante el *watermarking*. El *watermarking* previene un gasto mayor de recursos  por mantener y procesar datos relacionados a un intervalo de tiempo que es muy lejano al periodo actual.