# Spark Streaming con Structured Streaming

Structured Streaming emplea el motor de Spark SQL para el procesamiento de Streams.

Comenzaremos creando el Spark Session:

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (SparkSession
         .builder
         .appName("StructuredNetworkWordCountWindowed")
         .getOrCreate())

En una terminal de nuestra máquina, lanzamos el siguiente comando para envíar los mensajes por el puerto 9999:

```nc -lk 9999```

Creamos el DataFrame leyendo desde una conexión al localhost en el puerto 9999:

In [None]:
lines = (spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .option("includeTimestamp", "true")
        .load())

## Ejercicio 1

> Separar las líneas en palabras, transformarlas todas a minúscula y agrupar por palabra:

- Obtenemos las palabras de las líneas:

In [None]:
words_ej1 = lines.select( \
        F.explode( \
            F.split(F.col("value"), " ")) \
        .alias("word"))

- Las transformamos en mínusuculas:

In [None]:
lowerWords_ej1 = words_ej1.withColumn("word", F.lower(F.col("word")))

- Agrupamos por palabras:

In [None]:
wordCounts_ej1 = lowerWords_ej1.groupBy("word").count()

- Lanzamos la query (los resultados se muestran por la consola donde lanzamos el notebook)

In [None]:
query_ej1 = wordCounts_ej1 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
query_ej1.awaitTermination(60)

## Ejercicio 2
> Contar las palabras usando ventanas de tiempo.

- Separamos las líneas por palabras:

In [None]:
words_ej2 = lines.select( \
        F.explode( \
            F.split(F.col("value"), " ")) \
        .alias("word"),
        F.col("timestamp"))

- Agrupamos los datos por ventanas de 5 minutos y desplazamiento de 2 minutos y por palabras:

In [None]:
windowedCounts_ej2 = words_ej2 \
    .groupBy(F.window(F.col("timestamp"), "5 minutes", "2 minutes"), F.col("word")) \
    .count() \
    .orderBy('window')

- Lanzamos la query:

In [None]:
query_ej2 = windowedCounts_ej2 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
query_ej2.awaitTermination(60)

## Ejercicio 3
> Contar las palabras usando ventanas de tiempo y marcas de agua.

- Separamos las líneas en palabras:

In [None]:
words_ej3 = lines.select(
        F.explode(
            F.split(F.col("value"), " ")) \
        .alias("word"),
        F.col("timestamp"))

- Añadimos las marcas de agua con intervalo de 10 minutos, agrupamos los datos por ventanas de 10 minutos y desplazamiento de 5 minutos y contamos las palabras:

In [None]:
windowedCounts_ej3 = words_ej3 \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window(F.col("timestamp"), "10 minutes", "5 minutes"),
        F.col("word")) \
    .count()

- Lanzamos la query:

In [None]:
query_ej3 = windowedCounts_ej3 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
query_ej3.awaitTermination(60)

## Ejercicio 4
> Contar palabras consumiendo a través de un topic de Kafka.

Para realizar este ejercicio, es necesario que el Kernel de Python de Jupyter cargue la dependencia de SparkSQL para Kafka. Para ello, editar el fichero kernel.json y añadir las siguientes dependencias a PYSPARK_SUBMIT_ARGS:
```json
"--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0"
```
- Leemos las líneas desde el topic *wordcount_topic* de Kafka (es necesario crearlo de antemano).

In [None]:
lines_ej4 = spark \
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "wordcount_topic")\
    .load()\
    .selectExpr("CAST(value AS STRING)")

- Separamos por palabras:

In [None]:
words_ej4 = lines_ej4.select(
        F.explode(
            F.split(F.col("value"), " ")) \
        .alias("word"))

- Agrupamos las palabras y las contamos:

In [None]:
wordCounts_ej4 = words_ej4.groupBy("word").count()

- Lanzamos la query:

In [None]:
query_ej4 = wordCounts_ej4 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
query_ej4.awaitTermination(60)

## Ejercicio 5
> Listar los datos recibidos en formato JSON a través de un topic de Kafka.

- Definimos el esquema:

In [None]:
import pyspark.sql.types

In [None]:
schema_ej5 = StructType() \
    .add("nombre", StringType()) \
    .add("edad", IntegerType()) \
    .add("peso", FloatType()) \
    .add("direccion", StringType())

- Leemos las líneas desde el topic *json_topic* de Kafka, convertimos la columna value a JSON y seleccionamos sus campos:

In [None]:
lines_ej5 = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", "localhost:9092")\
        .option("subscribe", "json_topic")\
        .load()\
        .selectExpr("CAST(value AS STRING)")\
        .select(F.from_json(F.col("value"), schema).alias("parsed_value"))\
        .select("parsed_value.*")

- Lanzamos la query:

In [None]:
query_ej5 = lines_ej5.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
query_ej5.awaitTermination(60)