# Actividad 2: Structured Streaming y Kafka

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

### Punto de partida (final de la actividad 1): función `retrasoMedio` 
***Para los vuelos que llegan con retraso positivo, calcular para cada aeropuerto de llegada el retraso medio.***

Recordatorio: *El código que calcule esto debería ir encapsulado en una función de Python que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo del retraso medio por aeropuerto, ordenado de mayor a menor retraso medio. La columna creada con el retraso medio debe llamarse `retraso_medio`.*

**Copia en la siguiente celda el código de tu función retrasoMedio que has completado en la actividad 1**. El DataFrame devuelto por la función debería tener solamente dos columnas: `dest` y `retraso_medio`.

In [58]:
from pyspark.sql.functions import mean

def retrasoMedio(df):
    df_result = df.select('arr_delay', 'dest')\
                  .where(F.col("arr_delay") > 0)\
                  .groupby(F.col("dest"))\
                  .agg(mean("arr_delay").alias("retraso_medio")) 
                      
    df_result = df_result.sort("retraso_medio", ascending=False) 
    return df_result

### Ejercicio 1

Utilizaremos Kafka para actualizar en tiempo real el resultado calculado en el apartado anterior. 

Para simplificar, asumimos que los mensajes leídos de Kafka tiene solamente dos campos que son los únicos necesarios para llevar a cabo la operación anterior: dest y arr_delay. La idea será crear un Streaming DataFrame para leer de Kafka, y después invocar a nuestra función retrasoMedio pasándolo como argumento. Vamos a leer del topic `retrasos` por lo que debes indicar esta opción a continuación.

Se pide crear, en la variable `retrasosStreamingDF`, un Streaming DataFrame leyendo de Apache Kafka, configurando las siguientes opciones:
  * Usar la variable `readStream` (en lugar de `read` como solemos hacer) interna de la SparkSession `spark`
  * Indicar que el formato es `"kafka"` con `.format("kafka")`
  * Indicar cuáles son los brokers de Kafka de los que vamos a leer y el puerto al que queremos conectarnos para leer (9092 es el que usa Kafka por defecto), con `.option("kafka.bootstrap.servers", "<nombre_cluster>-w-0:9092,<nombre_cluster>-w-1:9092")`. De esa manera podremos leer el mensaje si el productor de Kafka lo envía a cualquiera de los dos brokers existentes, que son los nodos del cluster identificados como `<nombre_cluster>-w-0` y `<nombre_cluster>-w-1`
  * Indicar que queremos subscribirnos al topic `"retrasos"` con `.option("subscribe", "retrasos")`.
  * Finalmente ponemos `load()` para realizar la lectura.

In [59]:
# Reemplaza por el código correcto siguiendo las indicaciones anteriores
retrasosStreamingDF = spark.readStream\
                           .format("kafka")\
                           .option("kafka.bootstrap.servers", "cluster-jde-w-0:9092,cluster-jde-w-1:9092")\
                           .option("subscribe", "retrasos")\
                           .load()            

In [60]:
# Mostramos el esquema de este DataFrame
types = retrasosStreamingDF.dtypes
assert(retrasosStreamingDF.isStreaming)
assert((types[0][0] == "key")       & (types[0][1] == "binary"))
assert((types[1][0] == "value")     & (types[1][1] == "binary"))
assert((types[2][0] == "topic")     & (types[2][1] == "string"))
assert((types[3][0] == "partition") & (types[3][1] == "int"))
assert((types[4][0] == "offset")    & (types[4][1] == "bigint"))
assert((types[5][0] == "timestamp") & (types[5][1] == "timestamp"))
assert((types[6][0] == "timestampType") & (types[6][1] == "int"))

In [61]:
retrasosStreamingDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Muestra por pantalla el esquema del DataFrame resultante de la lectura con `printSchema()`. Verás que todas estas columnas son creadas automáticamente por Spark cuando leemos de Kafka. De ellas, la que nos interesa es `value` que contiene propiamente el mensaje de Kafka, en formato datos binarios. 

### Ejercicio 2

Tendremos que estructurar estos datos para poder extraer los campos. Para ello sigue los siguientes pasos, ayudándote de la plantilla que hay en la celda siguiente (descoméntala y complétala):

* **Selecciona** la columna `value` y conviértela (`.cast`) a `StringType()` utilizando `withColumn` para reemplazar la columna existente `"value"` por el objeto Column resultante de la conversión. De esta forma tendremos una columna que contendrá en cada **fila** un **fichero JSON completo**, tal como se muestra en cada una de las plantillas anteriores. 
* Para extraer los dos campos de cada uno de los JSON y convertirlos en una columna llamada `parejas`, de tipo `struct` (una estructura formada por dos campos de tipo String e Integer respectivamente), utilizamos la función `from_json` de Spark, que se aplica a cada elemento (cada fila) de la columna "value" y parsea el String según un esquema que le indiquemos, devolviendo una columna de tipo `struct`.
* La columna `parejas` es de tipo `struct` por lo que puedes acceder a cada uno de sus dos campos (`dest` y `arr_delay`) con el operador `.` (punto). Utilizando `withColumn` dos veces, crea dos columnas llamadas `dest` y `arr_delay` como el resultado de acceder a `parejas.dest` y `parejas.arr_delay` respectivamente.

In [62]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F

esquema = StructType([\
  StructField("dest", StringType()),\
  StructField("arr_delay", DoubleType())\
])

parsedDF = retrasosStreamingDF\
            .select('*')\
            .withColumn("value", F.col("value").cast(StringType()))\
            .withColumn("parejas", F.from_json(F.col("value"), esquema))\
            .withColumn("dest", F.col("parejas.dest"))\
            .withColumn("arr_delay", F.col("parejas.arr_delay"))\

In [63]:
tipos = parsedDF.dtypes
print(tipos)

[('key', 'binary'), ('value', 'string'), ('topic', 'string'), ('partition', 'int'), ('offset', 'bigint'), ('timestamp', 'timestamp'), ('timestampType', 'int'), ('parejas', 'struct<dest:string,arr_delay:double>'), ('dest', 'string'), ('arr_delay', 'double')]


In [64]:
assert(("value", "string") in tipos)
assert(('parejas', 'struct<dest:string,arr_delay:double>') in tipos)
assert(('dest', 'string') in tipos)
assert(('arr_delay', 'double') in tipos)

Nuestro DataFrame ya contiene una columna `dest` con el nombre del aeropuerto destino y una columna de números reales `arr_delay` con el retraso. Ya podemos efectuar el mismo tipo de agregación que estamos haciendo en nuestra función `retrasoMedio`. Por tanto, invocamos a `retrasoMedio` pasando `parsedDF` como argumento.

In [65]:
# Evalúa el siguiente código pero no lo modifiques
# Indicamos que este DataFrame se guarde en memoria cuando se va actualizando,
# y arrancamos la ejecución en Streaming con la acción start()

retrasoMedioStreamingDF = retrasoMedio(parsedDF)

consoleOutput = retrasoMedioStreamingDF\
                    .writeStream\
                    .queryName("retrasosAgg")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()


Una vez evaluada la celda anterior, abre el productor de Kafka console entrando por SSH a cualquiera de las máquinas (revisa el enunciado de la práctica para recordarlo), y copia y pega (literalmente) los siguientes 4 mensajes en formato JSON. Como ves,  tienen un campo `dest` y un campo `arr_delay`, simulando la información que estaríamos recibiendo en tiempo real de los distintos aeropuertos a medida que los vuelos van aterrizando. 

Cada vez que pegues un mensaje, ejecuta la consulta `select * from retrasosAgg` a través del método `spark.sql(...)` y muestra el DataFrame `agregadosDF` devuelto por dicho método. Eso hará una consulta contra la vista temporal (volátil) `retrasosAgg` que se ha creado en el metastore de Hive gracias al `writeStream` del apartado anterior. Ejecuta la celda de `show` tantas veces como sea necesario hasta ver un resultado distinto al que has visto en la ejecución anterior, para asegurarte de que Spark ya ha leído e incorporado el nuevo dato en su cálculo de la agregación y por tanto ha actualizado el resultado.

Recuerda que el método `.sql(...)` es una transformación, y por tanto, se re-ejecuta la consulta cada vez que invocas a la acción `show()` sobre el resultado, ya que **no vamos a cachear nada**, precisamente para forzar la reevaluación de la consulta y poder ver así el contenido actualizado de dicha tabla (en memoria) de Hive cada vez que hacemos `show()`.

Se pide: 
* Cada vez que envíes un mensaje y te hayas asegurado de que Spark ha incorporado el dato a su cálculo, apunta el resultado de la agregación (valor de la columna `retraso_medio`) para MAD y GRX en las variables habilitadas para ello
* No te preocupes por evaluar muchas veces una misma celda, ya que el cálculo sólo se actualizará una vez. Las siguientes veces que la evalúes seguirá mostrando el mismo resultado mientras no envíes otro nuevo mensaje en Kafka.

Los 4 mensajes que hay que introducir sucesivamente en Kafka son:

`
{"dest": "GRX", "arr_delay": 2.6}
{"dest": "MAD", "arr_delay": 5.4}
{"dest": "GRX", "arr_delay": 1.5}
{"dest": "MAD", "arr_delay": 20.0}
`

In [66]:
import time

agregadosDF = spark.sql("select * from retrasosAgg")

# for x in range(5):
#     agregadosDF.show()
#     time.sleep(5)

In [67]:
columnas = agregadosDF.columns
assert(len(columnas) == 2)
assert("dest" in columnas)
assert("retraso_medio" in columnas)

In [80]:
agregadosDF.show()  # Ejecuta varias veces esta celda tras enviar el primer mensaje, hasta ver que el DataFrame no es vacío
retraso_medio_GRX_primer_mensaje = agregadosDF.where(F.col("dest") == "GRX")
retraso_medio_GRX_primer_mensaje.show()

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| GRX|          2.6|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| GRX|          2.6|
+----+-------------+



In [82]:
# Ejecuta varias veces esta celda tras enviar el segundo mensaje, hasta ver que el DataFrame ha cambiado
agregadosDF.show()
retraso_medio_GRX_primer_mensaje = agregadosDF.where(F.col("dest") == "GRX")
retraso_medio_MAD_segundo_mensaje = agregadosDF.where(F.col("dest") == "MAD")

retraso_medio_GRX_primer_mensaje.show()
retraso_medio_MAD_segundo_mensaje.show()

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|          5.4|
| GRX|          2.6|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| GRX|          2.6|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|          5.4|
+----+-------------+



In [85]:
# Ejecuta varias veces esta celda tras enviar el tercer mensaje, hasta ver que el DataFrame ha cambiado
agregadosDF.show()
retraso_medio_GRX_tercer_mensaje = agregadosDF.where(F.col("dest") == "GRX")
retraso_medio_MAD_tercer_mensaje = agregadosDF.where(F.col("dest") == "MAD")

retraso_medio_GRX_tercer_mensaje.show()
retraso_medio_MAD_tercer_mensaje.show()

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|          5.4|
| GRX|         2.05|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| GRX|         2.05|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|          5.4|
+----+-------------+



In [87]:
# Ejecuta varias veces esta celda tras enviar el cuarto mensaje, hasta ver que el DataFrame ha cambiado
agregadosDF.show()
retraso_medio_GRX_cuarto_mensaje = agregadosDF.where(F.col("dest") == "GRX")
retraso_medio_MAD_cuarto_mensaje = agregadosDF.where(F.col("dest") == "MAD")

retraso_medio_GRX_cuarto_mensaje.show()
retraso_medio_MAD_cuarto_mensaje.show()

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|         12.7|
| GRX|         2.05|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| GRX|         2.05|
+----+-------------+

+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|         12.7|
+----+-------------+



In [72]:
#import numpy as np
    
# 1D array 
#arr = [5.4, 20]
#arr = [2.6, 1.5]
  
#print("arr : ", arr) 
#print("mean of arr : ", np.mean(arr))

In [73]:
# Resultados: 2.6, 5.4, 12.7, 2.05

In [88]:
consoleOutput.stop()