Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [None]:
NAME = "Sergio Funes Olaria"
COLLABORATORS = ""

---

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)

# PEC_5_2: Structured Streaming.

En esta PEC vamos a trabajar con [Spark Structured Streaming](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html), un motor de procesamiento de flujo escalable y tolerante a fallos construido sobre el motor Spark SQL. 

Spark Structured Streaming nos permite realizar nuestro análisis de datos en streaming de la misma manera que lo hacemos con el procesamiento por lotes sobre datos estáticos. Ahora bien, hay que tener en cuenta que el structured streaming tiene una serie de ventajas. Por ejemplo, que el motor Spark SQL se encargará de ejecutar los analísis programados de forma incremental y continua, generando el resultado final a medida que datos de transmisión. Spark Streaming se basa en la API de Dataset/DataFrame que se puede utilizar Scala, Java, Python o R para expresar agregaciones de transmisión, ventanas de tiempo de eventos, etc. Finalmente, el sistema asegura garantías de tolerancia a fallos de un extremo a otro a través de puntos de control y registros de escritura anticipada.


**IMPORTANTE: Para realizar esta práctica debes hacerlo mediante SSH desde terminal o VSCODE, y poner el código de la misma en este NOTEBOOK solo para su corrección.**

### entrega: 
El formato de entrega será un directorio comprimido en formato gnuzip bajo el nombre `PEC5_username.tar.gz`, substituyendo username por vuestro	nombre	de	usuario. El contenido debe ser un fichero para cada programa Python por cada ejercicio indicando el apartado de los notebooks de enunciado, por ejemplo `PEC5_username_2_1_4.py` .Adjuntar los dos notebooks de la PEC, con el nombre `PEC5_1_username`, y `PEC5_2_username` con las salidas obtenidas que se piden, y las respuestas a las preguntas conceptuales planteadas.

1. PARTE 1. Word Count con Structured Streaming
1. PARTE 2. Operaciones de ventana sobre eventos temporales
1. PARTE 3. Captura y procesamiento de datos en tiempo real de la API OpenSky

## PARTE 1. Word Count con Structured Streaming

En esta primera parte de la PEC vamos a ver como implementar un word count conectando por sockets mediante un proceso netcat https://en.wikipedia.org/wiki/Netcat corriendo en una terminal vía SSH o VSCode y donde vais a ir escribiendo palabras, que posteriormente van a ser contadas.

Para empezar, vamos a realizar un primer ejercicio guiado donde vamos a contar las palabras haciendo uso de los DataFrames que nos ofrece Structured Streaming. 

La siguiente celda de Jupyter Notebook crea un objeto spark que corresponde a una instancia de SparkSession. En las versiones modernas de Spark, la clase SparkSession es el punto de entrada a una aplicación Spark para cualquier tipo de Spark API (RDD, SparkSQL, Streaming, etc). Se pide ejecutar la siguiente celda y comprobar que se ha ejecutado correctamente.

In [None]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_sgraul") \
    .getOrCreate()

Mediante el objeto spark vamos a configurar una lectura de datos en streaming reportados en el puerto que tenéis asociado, dado que es donde está el netcat estará funcionando. En el código de la siguiente celda debéis cambiar \<PUERTO_ASIGNADO\> por vuestro puerto.

El DataFrame `linesDF` representa una tabla ilimitada que contiene la transmisión de datos de texto. Esta tabla contiene una columna de cadenas denominada `value`, y cada línea de los datos de texto de transmisión se convierte en una fila de la tabla. Tened en cuenta que todavía no está recibiendo ningún dato ya que solo estamos configurando la transformación y aún no hemos comenzado a recibir datos. Se pide al estudiante leer el código con detalle, revisar que se entienden todas las operaciones (consultar documentación en caso necesario) y ejecutar la celda.

In [None]:
# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', <PUERTO_ASIGNADO>)\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra')
)

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF.groupBy('palabra').count()

Ahora que hemos configurado la consulta (análisis) sobre los datos de transmisión, declaramos la consulta para comenzar a recibir los datos y contar las palabras. Para hacer esto, vamos a configurar la salida del análisis para que imprima el conjunto completo de recuentos, especificado por `outputMode("complete")` y configurado para trabajar en memoria cada vez que se actualizan. Finalmente iniciamos el cálculo de streaming usando `start()`.

In [None]:
# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
#query.awaitTermination() 

En una **sesión de terminal mediante SSH, no mediante Jupyter terminal** debéis ejecutar un netcat `$ nc -lk <puerto_asignado>`.

Mediante esta celda podemos mostrar en el Notebook los datos de la consulta a la tabla `palabras` en una celda, y vamos actualizando esta celda cada 5 segundos. En este caso utilizamos una sentencia SQL. Como se trata de un bucle sobre el Notebook deberéis parar el kernel una vez vista la salida.

In [None]:
from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM palabras').show())
    sleep(5)

Salida de ejemplo:

`{'isDataAvailable': False,
 'isTriggerActive': True,
 'message': 'Waiting for data to arrive'}
+-------+-----+
|palabra|count|
+-------+-----+
|   Data|    2|
|    UOC|    2|
|    Big|    2|
|  Spark|    1|
+-------+-----+`

Alternativamente podemos consultar los datos que estamos recibiendo por streaming mediante la tabla `palabras`, pero tendremos que actualizar manualmente el show(). Tarda un tiempo en aparecer la primera salida en Jupyter Notebook.

In [None]:
spark.table("palabras").show()

Salida de ejemplo:

`+-------+-----+
|palabra|count|
+-------+-----+
|   Data|    2|
|    UOC|    2|
|    Big|    2|
|  Spark|    1|
+-------+-----+`

A partir de este ejemplo que hemos visto y que el alumno debe ejecutar para probar su funcionamiento, se pide:

> **Pregunta 1. (1 punto)** Realiza un programa en Python que cuente las palabras que empiezan por A y que tengan más de 5 counts. Debéis ejecutarlo el programa en una **terminal**, no dentro del Jupyter, y en otra terminal el netcat. Para ello utilizamos un programa Python en local mediante `./python3 PEC5_2_1_1.py localhost <puerto_asignado>`  

> Adjunta el código y la salida obtenida en **forma textual**. 

Salida de ejemplo:

`+-------+-----+
|palabra|count|
+-------+-----+
| Albert|    6|
+-------+-----+`

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra')
)

wordCountsDF = wordsDF.groupBy('palabra').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql("SELECT * FROM palabras WHERE palabra LIKE 'A%' AND count > 5").show())
    sleep(5)

#raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

{'message': 'Waiting for data to arrive', 'isTriggerActive': False, 'isDataAvailable': False}
+-------+-----+
|palabra|count|
+-------+-----+
|  Adios|    6|
+-------+-----+

#raise NotImplementedError()

Ahora vamos a realizar un ejercicio que nos permita realizar una consulta SQL sobre los datos recibidos.  Además, utilizaremos el mecanismo de control de fallos que Spark utiliza, los [*checkpoint*](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing), que van guardando información en el HDFS por si es necesario recuperarla. 

>**Pregunta 2. (1 punto)** Crea una tabla temporal para poder realizar una consulta SQL sobre las palabras que estamos obteniendo mediante streaming. El programa debe extraer las diferentes palabras de una frase y solo mostrar por consola aquellas que tengan una longitud superior a 3 caracteres. Tenéis que mostrar el tiempo de adquisición y poner un checkpoint en HDFS que se denomine `punto_control_pec5`.

Salida de ejemplo:

`
+-------+--------------------+
|palabra|              tiempo|
+-------+--------------------+
|   Data|2021-12-2  12:21:...|
+-------+--------------------+
`

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

from pyspark.sql.functions import current_timestamp

wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra'), current_timestamp().alias('tiempo')
)

wordCountsDF = wordsDF.groupBy('palabra', 'tiempo').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .option("checkpointLocation", "/user/sfunesolaria/PEC5/punto_control_pec5") \
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT palabra, tiempo FROM palabras WHERE LENGTH(palabra) > 3').show())
    sleep(5)

raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

{'message': 'Waiting for data to arrive', 'isTriggerActive': False, 'isDataAvailable': False}
+-------+--------------------+
|palabra|              tiempo|
+-------+--------------------+
|   Data|2022-01-17 12:27:...|
+-------+--------------------+

#raise NotImplementedError()

>**Pregunta 3. (1 punto)** Modifica el programa para que haga uso del outputMode *append*. Debemos de guardar cada entrada en un fichero de texto en HDFS. Adjunta la salida del HDFS del contenido del directorio creado.

Salida de ejemplo:

`
hdfs dfs -ls /user/<usuario>/pec5_1_3
Found 4 items
drwxr-xr-x   - usuario usuario          0 2021-12-02 12:43 /user/<usuario>/pec5_1_3/_spark_metadata
-rw-r--r--   3 usuario usuario          9 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-499014ff-cf00-4f2f-a8a4-d282cdac1a19-c000.txt
-rw-r--r--   3 usuario usuario          7 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-7d8cb984-95ad-4e50-8887-a72f4d2814a2-c000.txt
-rw-r--r--   3 usuario usuario          0 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-a4918ce0-c930-439f-a5cd-a1bd777f609b-c000.txt
    `

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

from pyspark.sql.functions import current_timestamp

wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra'), current_timestamp().alias('tiempo')
)

wordCountsDF = wordsDF.withWatermark("tiempo", "1 second").groupBy('palabra', 'tiempo').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('append')\
    .option("checkpointLocation", "/user/sfunesolaria/pec5_1_3") \
    .option("hdfs_url", "hdfs:///user/sfunesolaria/pec5_1_3") \
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT palabra, tiempo FROM palabras WHERE LENGTH(palabra) > 3').show())
    sleep(5)

raise NotImplementedError()

Copia la salida obtenida en el HDFS en formato de texto:

In [None]:
# YOUR CODE HERE



#raise NotImplementedError()

>**Pregunta 4.(1 punto)** Realiza un programa en Python para que haga uso del outputMode update, y que los datos entrantes por consola. La lectura debe realizar en intervalos de 5 segundos

Salida de ejemplo:

<code>
-------------------------------------------
Batch: 5
-------------------------------------------
+-------+
|palabra|
+-------+
|    Big|
|   Data|
| Hadoop|
+-------+
<5 ... segundos>
-------------------------------------------
Batch: 6
-------------------------------------------
+-------+
|palabra|
+-------+
|  Spark|
+-------+
    </code>

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Copia la salida obtenida en formato de texto:

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

>**Pregunta 5.(1 punto)** Explica las diferencias y similitudes entre los tipos de salidas existentes en Structured Streaming (complete, update y append). El texto debe ser claro, explicativo y tener una extensión de 10 líneas aproximadamente.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

## PARTE 2. Operaciones de ventana sobre eventos temporales
En esta parte vamos a trabajar con operaciones de ventana sobre eventos temporales. Para ello vamos a utilizar el formato *rate*. El source [RateStreamSource](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#api-using-datasets-and-dataframes) es una fuente de transmisión que genera números consecutivos con marca de tiempo y es utilizada habitualmente para hacer pruebas y PoC (*Proof of Concept*). Para configurar un RateStreamSource utilizaremos  `format('rate')`, y el esquema de los datos entrantes es el adjunto siguiente. A diferencia de los ejercicios de la parte 1 no tendremos dos programas corriendo simultáneamente en el terminal, solo tendremos una, la de nuestro programa pyspark, dado que el formato rate se controla directamente desde la configuración del source Spark.


`root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true) `


>**Pregunta 1. (1 punto)** Realiza un programa mediante Structured Streaming que genere números mediante un formato *rate* como origen del streaming y donde debéis realizar el tratamiento de los mismos para acumularlos. Los números deben generarse cada segundo, y debemos utilizar una ventana de agrupación del streaming de 10 segundos y que se actualice cada 5 segundos.

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('rate')\
    .option('rowsPerSecond', '1')\
    .load()

wordsDF = linesDF.withColumn("window", linesDF.timestamp)\
                .withColumn("value", linesDF.value)

from pyspark.sql.functions import window

wordCountsDF = wordsDF.groupBy(window('window', "10 seconds", "5 seconds"), 'value').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM palabras').show())
    sleep(5)

#raise NotImplementedError()

>**Pregunta 2. (1 punto)** Comenta el código, muestra la salida que has obtenido y coméntala en una extensión en 4 y 8 líneas.

Salida de ejemplo:

<code>
-------------------------------------------                                     
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |value|count|
+------------------------------------------+-----+-----+
|[2021-12-03 10:33:40, 2021-12-03 10:33:50]|0    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|5    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|0    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|3    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|4    |1    |
</code>

# YOUR CODE HERE

Se ha utilizado un código similar al explicado en el primer ejercicio. A la variable spark se le añade el formato rate para configurar un RateStreamSource y la opción rowsPerSecond para indicar que los numeros se generen cada segundo. A la variable linesDF se le indica que la columna window sea un timestamp y value contenga el numero generado. La variable wordsDF se agrupa por la variable window y value y se cuentan en la columna count. El resto del codigo es exactamente igual al del primer ejercicio.

{'message': 'Processing new data', 'isTriggerActive': True, 'isDataAvailable': True}
+--------------------+-----+-----+                                              
|              window|value|count|
+--------------------+-----+-----+
|[2022-01-18 03:31...|    8|    1|
|[2022-01-18 03:31...|   24|    1|
|[2022-01-18 03:31...|   25|    1|
|[2022-01-18 03:31...|    6|    1|
|[2022-01-18 03:31...|    1|    1|
|[2022-01-18 03:31...|   10|    1|
|[2022-01-18 03:31...|   22|    1|
|[2022-01-18 03:31...|   11|    1|
|[2022-01-18 03:31...|    8|    1|
|[2022-01-18 03:31...|   18|    1|
|[2022-01-18 03:31...|   17|    1|
|[2022-01-18 03:31...|   24|    1|
|[2022-01-18 03:31...|    7|    1|
|[2022-01-18 03:31...|    7|    1|
|[2022-01-18 03:31...|   15|    1|
|[2022-01-18 03:31...|   21|    1|
|[2022-01-18 03:31...|    9|    1|
|[2022-01-18 03:31...|    3|    1|
|[2022-01-18 03:31...|   15|    1|
|[2022-01-18 03:31...|   23|    1|
+--------------------+-----+-----+
only showing top 20 rows

En la salida puede verse en cada fila de la columna window el timestamp de cuando se genera y cuando se recibe, con una diferencia de 5 segundos entre los dos timestamp. En la columna value se encuentra el valor generado y la columna count contiene el contado de los valores según el valor de la columna window y value. El valor de count siempre será 1 si no coinciden el timestamp y el número. En el ejemplo de salida, no pueden verse todos los valores porque la tabla es muy ancha.

#raise NotImplementedError()

En la ejecución de las consultas es muy interesante poder ir obteniendo información sobre el progreso realizado en el último disparador del flujo: qué datos se procesaron, cuáles fueron las tasas de procesamiento, latencias, etc.

>**Pregunta 3 (1 punto).** Modifica el programa para que muestre 3 [métricas](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#monitoring-streaming-queries) del streaming mientras este se realiza. Solo se deben mostrar métricas mientras la consulta está activa.

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('rate')\
    .option('rowsPerSecond', '1')\
    .load()

wordsDF = linesDF.withColumn("window", linesDF.timestamp)\
                .withColumn("value", linesDF.value)

from pyspark.sql.functions import window

wordCountsDF = wordsDF.groupBy(window('window', "10 seconds", "5 seconds"), 'value').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(query.lastProgress)
    display(query.recentProgress)
    sleep(5)

#raise NotImplementedError()

>**Pregunta 4 (1 punto).** Explica las 3 métricas aplicadas con una extensión entre 5 y 10 líneas propias.

Salida de ejemplo:

<code>
{'isDataAvailable': False, 'isTriggerActive': False, 'message': 'Initializing sources'}
{'stateOperators': [{'customMetrics': {'loadedMapCacheHitCount': 0, 'stateOnCurrentVersionSizeBytes': 25198, 'loadedMapCacheMissCount': 0}, 'numRowsUpdated': 0, 'memoryUsedBytes': 82798, 'numRowsTotal': 0}], 'timestamp': '2021-12-03T10:16:01.657Z', 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'endOffset': 0, 'startOffset': None, 'processedRowsPerSecond': 0.0, 'numInputRows': 0}], 'runId': '9134994e-c17f-4277-974a-21cd09cf9aea', 'durationMs': {'triggerExecution': 36450, 'walCommit': 48, 'getB[Stage 8:======>        (22 + 2) / 200]
</code>

# YOUR CODE HERE

Salida de ejemplo:

{'isTriggerActive': True, 'message': 'Processing new data', 'isDataAvailable': True}
{'batchId': 0, 'sources': [{'numInputRows': 0, 'processedRowsPerSecond': 0.0, 'endOffset': 0, 'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'startOffset': None}], 'processedRowsPerSecond': 0.0, 'numInputRows': 0, 'stateOperators': [{'numRowsUpdated': 0, 'customMetrics': {'stateOnCurrentVersionSizeBytes': 12599, 'loadedMapCacheHitCount': 0, 'loadedMapCacheMissCount': 0}, 'memoryUsedBytes': 41399, 'numRowsTotal': 0}], 'id': '86598058-bb77-471c-8f24-7a7e5dbc6138', 'runId': 'db0a1e30-5e1f-4a2a-bb24-4005ee7496de', 'durationMs': {'setOffsetRange': 1, 'getBatch': 7, 'addBatch': 25323, 'getEndOffset': 0, 'triggerExecution': 25906, 'walCommit': 148, 'queryPlanning': 323}, 'sink': {'description': 'MemorySink'}, 'name': 'palabras', 'timestamp': '2022-01-18T03:06:35.539Z'}
[{'batchId': 0, 'sources': [{'numInputRows': 0, 'processedRowsPerSecond': 0.0, 'endOffset': 0, 'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'startOffset': None}], 'processedRowsPerSecond': 0.0, 'numInputRows': 0, 'stateOperators': [{'numRowsUpdated': 0, 'customMetrics': {'stateOnCurrentVersionSizeBytes': 12599, 'loadedMapCacheHitCount': 0, 'loadedMapCacheMissCount': 0}, 'memoryUsedBytes': 41399, 'numRowsTotal': 0}], 'id': '86598058-bb77-471c-8f24-7a7e5dbc6138', 'runId': 'db0a1e30-5e1f-4a2a-bb24-4005ee7496de', 'durationMs': {'setOffsetRange': 1, 'getBatch': 7, 'addBatch': 25323, 'getEndOffset': 0, 'triggerExecution': 25906, 'walCommit': 148, 'queryPlanning': 323}, 'sink': {'description': 'MemorySink'}, 'name': 'palabras', 'timestamp': '2022-01-18T03:06:35.539Z'}]

Utilizando lastProgress devuelve un objeto StreamingQueryProgress en Scalay Java y un diccionario en Python. Tiene toda la información sobre el progreso realizado en el último disparo del flujo (qué datos fueron procesados, cuáles fueron las tasas de procesamiento, latencias, etc.). Usando recentProgress devuelve un array de los últimos progresos. Utilizando status da información sobre lo que la consulta está haciendo inmediatamente (es un disparador activo, los datos que están siendo procesados, etc.)

Fuente: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#monitoring-streaming-queries

#raise NotImplementedError()

## PARTE 3. Captura y procesamiento de datos en tiempo real con la API OpenSky

Es esta parte de la práctica vamos a trabajar la adquisición de datos en tiempo real de [OpenSky](https://opensky-network.org/). OpenSky Network es una asociación sin ánimo de lucro con sede en Suiza que brinda acceso abierto a los datos de control de seguimiento de vuelos.  Fue creado como un proyecto de investigación por varias universidades y entidades gubernamentales con el objetivo de mejorar la seguridad, confiabilidad y eficiencia del espacio aéreo. Su función principal es recopilar, procesar y almacenar datos de control de tráfico aéreo y proporcionar acceso abierto a estos datos al público. Esencialmente los datos de los aviones se obtienen vía satélite haciendo uso de  Automatic Dependent Surveillance–Broadcast (ADS–B). Para realizar este ejercicio no es necesario registrarse en el sistema OpenSky dado que vamos ha relizar actualizaciones de la información e vuelo sobre la superficie de España cada 10 segundos. La API está disponible este [enlace](https://openskynetwork.github.io/opensky-api/python.html). El parámetro bbox es una tupla que indica la latitud mínima, máxima, y las longitudes mínimas y máximas.

Primeramente, vamos a utilizar el servicio OpenSkyApi para leer un rectángulo con las latitudes y longitudes que engloban la península ibérica.

Para ello debéis [instalar](https://github.com/openskynetwork/opensky-api) la biblioteca en vuestro directorio del servidor Cloudera

1. Descargar en formato .zip el repositorio
1. Subir a vuestro directorio personal del servidor de Cloudera el zip. 
1. Descomprimirlo.
1. Dentro del directorio que ha creador ejecutar `pip install -e ./python`

Una vez instalada el módulo anterior, la siguiente celda os mostrará los vuelos registrados sobre la península ibérica en estos momentos. Observad con detenimiento las propiedades del diccionario de cada vuelo.

In [None]:
import json
from random import sample

from opensky_api import OpenSkyApi
api = OpenSkyApi()
states = api.get_states(bbox=(36.173357, 44.024422,-10.137019, 1.736138))
#recuperamos codigo, pais_origen, long, lat, altitud, velocidad, ratio_vertical
#atención en este ejemplo solo estamos mostrando 5 vuelos aleatorios, 
#en vuestros ejercicios deberéis eliminar la función sample
for s in sample(states.states,5):
    vuelo_dict = {
                'callsign':s.callsign,
                'country': s.origin_country,
                'longitude': s.longitude,
                'latitude': s.latitude,
                'velocity': s.velocity,
                'vertical_rate': s.vertical_rate,
            }
    vuelo_encode_data = json.dumps(vuelo_dict, indent=2).encode('utf-8')
    print("(%r, %r,%r, %r, %r, %r)" % (s.callsign, s.origin_country, s.longitude, s.latitude,s.velocity,s.vertical_rate))

Salida de ejemplo:

`('BAW457  ', 'United Kingdom',-3.5196, 40.4292, 86.45, 10.73)
('BLX245  ', 'Sweden',-6.0307, 43.8266, 252.51, 0)
('CFG1HE  ', 'Germany',-8.4689, 40.2967, 236.56, 0)
('TOM3MK  ', 'United Kingdom',-7.2687, 41.5878, 247.02, 0)
('AEA57MC ', 'Spain',-0.5364, 38.2791, 64.7, -3.9)`

Ahora vamos a crear un programa en Python para poder enviar cada 10 segundos por el puerto que tenéis asignado información de los vuelos que hay sobre la península ibérica. Deberéis poner en marcha primero el programa Python con el servidor de sockets que lee de Opensky y después el programa de Spark con structured streaming, es decir, en esta parte volvemos a tener dos terminales abiertas a la vez, y lo podéis realizar con el VSCode o con el SSH.

>**Pregunta 1. (1 punto)** Modifica el programa Python para enviar datos de los vuelos en formato JSON. Os podéis auxiliar de la función [json.dumps](https://docs.python.org/3/library/json.html) que nos permite crear un JSON binario de cada diccionario con las propiedades del vuelo. Prestad atención al salto de línea, '\n', que se adjunta al final de cada envío, es fundamental para cerrar la transmisión de datos a Spark.

In [None]:
# YOUR CODE HERE

from time import sleep
import socket
import json
from opensky_api import OpenSkyApi

HOST = 'localhost'  # hostname o IP address
PORT = 20068         # puerto socket server

api = OpenSkyApi()
states = api.get_states(bbox=(36.173357, 44.024422,-10.137019, 1.736138))

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
while True:
    print('\nEscoltant per un client a',HOST , PORT)
    conn, addr = s.accept()
    print('\Connectat per', addr)
    try:
        while(True):
            v = {}
            for vuelo in states.states:
                v = {
                    'callsign':vuelo.callsign,
                    'country': vuelo.origin_country,
                    'longitude': vuelo.longitude,
                    'latitude': vuelo.latitude,
                    'velocity': vuelo.velocity,
                    'vertical_rate': vuelo.vertical_rate,
                }
                print(v)
                conn.send(json.dumps(v).encode('utf-8'))
                conn.send(b'\n')
            sleep(10)       
    except socket.error:
        print ('Error .\n\nClient desconnectat.\n')
conn.close()

#raise NotImplementedError()

>**Pregunta 2. (1 punto)** Se pide leer los datos recibidos mediante structured streaming y mostrar el esquema de los datos recibidos. En este primer ejercicio solo vamos a tener una cadena con el JSON recibido de cada vuelo y un esquema con un único elemento. Debéis utilizar la función "printSchema()".

>Una vez comprobada que la transmisión funciona, se pide realizar un pre-procesado antes del envío de los datos mediante el socket para eliminar aquellas líneas de datos que no sean útiles ni convenientes.

Salida de ejemplo:

<code>
root
 |-- value: string (nullable = true)

|value             |

|{"velocity": 210.12, "vertical_rate": 0, "latitude": 43.6082, "callsign": "TAP441  ", "longitude": -1.3992, "country": "Portugal"}         |
|{"velocity": 246.5, "vertical_rate": 0, "latitude": 40.5836, "callsign": "TAP844  ", "longitude": -3.8452, "country": "Portugal"}          |
|{"velocity": 0, "vertical_rate": null, "latitude": 40.487, "callsign": "IBE2800 ", "longitude": -3.5889, "country": "Spain"}      
</code>

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

wordsDF = linesDF.withColumn("value", linesDF.value)

wordCountsDF = wordsDF.groupBy('value').count()

query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(wordsDF.printSchema())
    display(spark.sql('SELECT value FROM palabras').show())
    sleep(5)

#raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

root
 |-- value: string (nullable = true)

None
+--------------------+                                                          
|               value|
+--------------------+
|{"country": "Spai...|
|{"country": "Irel...|
|{"country": "Aust...|
|{"country": "Germ...|
|{"country": "Spai...|
|{"country": "Spai...|
|{"country": "Port...|
|{"country": "Spai...|
|{"country": "Spai...|
|{"country": "Swit...|
|{"country": "Spai...|
|{"country": "Spai...|
|{"country": "Spai...|
|{"country": "Spai...|
|{"country": "Swed...|
|{"country": "Germ...|
|{"country": "Spai...|
|{"country": "King...|
|{"country": "Spai...|
|{"country": "Port...|
+--------------------+
only showing top 20 rows

#raise NotImplementedError()

>**Pregunta 3. (1 punto)** De pide mostrar la información en forma de tabla, con las columnas, country|callsign|longitude|latitude|velocity|vertical_rate. Para ello vais a tener que crear un esquema mediante [StructType](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html) y aplicarlo a la función SQL [from_json](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.from_json.html). De esta manera podremos pasar de una columna string con todo el JSON, a 6 columnas con el tipo ajustado al valor contenido.

Salida de ejemplo:

<code>
root
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- longitude: long (nullable = true)
 |-- latitude: long (nullable = true)
 |-- velocity: long (nullable = true)
 |-- vertical_rate: long (nullable = true)


+--------------+--------+---------+--------+--------+-------------+
|       country|callsign|longitude|latitude|velocity|vertical_rate|
+--------------+--------+---------+--------+--------+-------------+
|      Portugal|TAP441  |  -1.3992| 43.6082|  210.12|          0.0|
|      Portugal|TAP844  |  -3.8452| 40.5836|   246.5|          0.0|
|         Spain|IBE2800 |  -3.5889|  40.487|     0.0|         null|
|United Kingdom|ABW713  |  -0.5287|  41.899|  155.82|        -7.48|
|         Spain|FYS161  |   -0.188| 38.8394|   64.77|        -2.93|
|         Spain|IBE3242 |  -3.5896| 40.4919|    0.77|         null|
|         Spain|AEA4025 |   0.1208| 38.5346|  125.53|         3.25|
|         Spain|P21     |  -3.5728| 40.4751|   10.29|         null|
|         Spain|IBE30EA |  -2.6086|  40.914|  181.29|        -6.18|
</code>

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

wordsDF = linesDF.withColumn("value", linesDF.value)

wordCountsDF = wordsDF.groupBy('value').count()

from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from pyspark.sql.functions import from_json, col
jsonSchema = StructType([ StructField("callsign", StringType(), True),
                          StructField("velocity", DoubleType(), True),
                          StructField("longitude", DoubleType(), True),
                          StructField("latitude", DoubleType(), True),
                          StructField("country", StringType(), True),
                          StructField("vertical_rate", DoubleType(), True)
                        ])

df = wordCountsDF.withColumn("value", from_json(col("value"), jsonSchema))
df_select = df.select("value.callsign", "value.country", "value.longitude", "value.latitude", "value.velocity",
                      "value.vertical_rate")

query = df_select\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(df_select.printSchema())
    display(spark.sql('SELECT country, callsign, longitude, latitude, velocity, vertical_rate FROM palabras').show())
    sleep(5)

#raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

root
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- vertical_rate: double (nullable = true)

None
+--------------------+--------+---------+--------+--------+-------------+       
|             country|callsign|longitude|latitude|velocity|vertical_rate|
+--------------------+--------+---------+--------+--------+-------------+
|               Spain|VLG73WB |  -5.3908| 38.7958|  234.17|        -0.33|
|             Ireland|RYR2NT  |  -2.6438| 37.8678|  228.89|          0.0|
|             Austria|EJU7674 |  -5.7166| 39.6921|  211.93|        -0.33|
|             Germany|OCN300  |  -2.3122| 40.5742|  239.12|          0.0|
|               Spain|VLG78LM |    0.162|  39.954|  231.12|          3.9|
|               Spain|ANE18LY |  -3.3822| 39.5037|   234.2|         6.18|
|            Portugal|TAP846F |  -2.5077| 40.9977|  229.33|          0.0|
|               Spain|SWT7773 |   1.5802| 38.9577|   106.1|        -3.25|
|               Spain|AEA056  |  -6.1163| 41.2638|  243.94|        -0.33|
|               Spain|IBS39UC |  -6.0401| 37.5052|  112.74|         -2.6|
|         Switzerland|CAZ601  |   -2.805| 38.0225|   247.8|        -13.0|
|               Spain|IBS3946 |  -8.1522| 38.1785|  225.48|         0.33|
|               Spain|IBE05MV |  -8.3019| 42.4179|  139.09|        -7.48|
|               Spain|IBE3490 |   1.5487| 43.7652|  227.59|          0.0|
|              Sweden|NOZ5070 |  -1.4336| 43.9072|  230.64|          0.0|
|             Germany|GAF906  |  -0.4259| 42.7904|  247.98|          0.0|
|               Spain|FAUNA2  |  -3.5442| 40.4783|    6.43|         null|
|Kingdom of the Ne...|TFL605  |  -7.5212| 40.7335|  235.57|          0.0|
|               Spain|AEA33TV |  -2.6347| 40.9556|  180.86|         -5.2|
|            Portugal|TAP088  |  -9.1433| 38.7677|    68.5|        -3.25|
+--------------------+--------+---------+--------+--------+-------------+
only showing top 20 rows

#raise NotImplementedError()

>**Pregunta 4. (1 punto)** Muestra el total de vuelos para cada destino agrupados por país de destino que hay en cada momento. Los datos deben mostrarse ordenados por país alfabéticamente. Tened en cuenta que podemos recibir datos duplicados dado que el script de OpenSky lee cada 10 segundos todos los vuelos existentes y los envía por socket. Por defecto Spark crea 200 tareas (cada una implicará una partición de los datos) por stage en el procesamiento en Structured Streaming. Para acelerar el proceso de captura, se pide que [ajustéis](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options) el parámetro en la configuración de SparkSession a un valor de 4 particiones.

Salida de ejemplo:

<code>
-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------------+-----+
|country                   |count|
+--------------------------+-----+
|Algeria                   |1    |
|Austria                   |3    |
|Belgium                   |1    |
|Chile                     |2    |
|Denmark                   |1    |
|France                    |15   |
|Germany                   |15   |
|Hungary                   |1    |
|Ireland                   |26   |
|Kingdom of the Netherlands|2    |
|Lithuania                 |1    |
|Luxembourg                |1    |
|Malta                     |5    |
|Mexico                    |1    |
</code>

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .master("local[4]") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

wordsDF = linesDF.withColumn("value", linesDF.value)

from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from pyspark.sql.functions import from_json, col, asc
jsonSchema = StructType([ StructField("callsign", StringType(), True),
                          StructField("velocity", DoubleType(), True),
                          StructField("longitude", DoubleType(), True),
                          StructField("latitude", DoubleType(), True),
                          StructField("country", StringType(), True),
                          StructField("vertical_rate", DoubleType(), True)
                        ])

df = wordsDF.withColumn("value", from_json(col("value"), jsonSchema))
df_select = df.select("value.country").groupBy('country').count()

query = df_select\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT country, count FROM palabras').orderBy(asc("country")).show())
    sleep(5)

#raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

None
+--------------------+-----+                                                    
|             country|count|
+--------------------+-----+
|             Austria|    6|
|             Belgium|    6|
|              Brazil|    6|
|               Chile|    6|
|      Czech Republic|    6|
|             Denmark|    9|
|               Egypt|    3|
|             Finland|    3|
|              France|   21|
|             Germany|   30|
|             Hungary|    3|
|             Ireland|   87|
|               Italy|    3|
|Kingdom of the Ne...|    6|
|               Malta|   12|
|              Mexico|    3|
|             Morocco|    3|
|              Poland|    9|
|            Portugal|   48|
|               Qatar|    6|
+--------------------+-----+
only showing top 20 rows

#raise NotImplementedError()

>**Pregunta 5. (1 punto)** Agrupa todos los vuelos que están subiendo en altura, los que están bajando y los que están en tierra. Indica su numero. Deberás auxiliarte de una consulta SQL para poder indicar con -1 que un vuelo está descendiendo, +1 si está subiendo, y 0 si está en tierra.

Salida de ejemplo:

<code>
+------+-----+
|estado|count|
+------+-----+
|     0|   96|
|    -1|   54|
|     1|   41|
+------+-----+
</code>

In [None]:
# YOUR CODE HERE

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_sfunesolaria") \
    .master("local[4]") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20068)\
    .load()

wordsDF = linesDF.withColumn("value", linesDF.value)

from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from pyspark.sql.functions import from_json, col, when
jsonSchema = StructType([ StructField("callsign", StringType(), True),
                          StructField("velocity", DoubleType(), True),
                          StructField("longitude", DoubleType(), True),
                          StructField("latitude", DoubleType(), True),
                          StructField("country", StringType(), True),
                          StructField("vertical_rate", DoubleType(), True)
                        ])

df = wordsDF.withColumn("value", from_json(col("value"), jsonSchema))
df_select = df.select("value.vertical_rate").filter(col("vertical_rate").isNotNull()) \
            .withColumn("estado", when((col("vertical_rate") < 0), -1).when((col("vertical_rate") > 0), 1).otherwise(0) ) \
            .groupBy('estado').count()

query = df_select\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT * FROM palabras').show())
    sleep(5)


#raise NotImplementedError()

Copia la salida obtenida en formato de texto:

# YOUR CODE HERE

None
+------+-----+                                                                  
|estado|count|
+------+-----+
|    -1|  189|
|     1|  144|
|     0|  201|
+------+-----+

#raise NotImplementedError()

>**Pregunta 6. (1 punto)** ¿De que manera podemos identificar que aparece un nuevo vuelo en el espacio aéreo?. No hace falta escribir el código sino describir con palabras como se plantearía la solución.

# YOUR CODE HERE

Identificaremos los vuelos por el valor de callsign que identifica la señal del avión. Si el vuelo tiene valor de callsign significa que la señal se recibe y si callsign es None, quiere decir que no se recibe la señal.

Si utilizamos la información de la tabla de la pregunta 3, tendremos todos los valores que se reciben. Si agrupamos los valores por callsign, obtendremos todos los vuelos actuales. Si obtenemos un valor nuevo de callsign en la llegada de datos, quiere decir que es un nuevo vuelo.

Por otro lado, si usamos la API de OpenSky, la solución más sencilla es comparar los valores de la variable on_ground y comprobar cuando el valor pasa a TRUE, querrá decir que el avión no está en tierra.

#raise NotImplementedError()

>**Pregunta 7. (1 punto)** Explica brevemente en una extensión de entre 5 y 10 lineas las ventajas e inconvenientes de utilizar Structured Streaming versus Spark Streaming.

# YOUR CODE HERE

Si hablamos de la transmisión real de datos, Spark Streaming funciona en lotes y los datos se envían después de cada duración del lote, en cambio, en Structured Streaming los datos fluyen continuamente, por tanto, se decanta más por la transmisión real.
Si hablamos de rendimiento, Spark Streaming utiliza internamente RDD y Structured Streaming utiliza Dataframe para realizar operaciones de transmisión. Los DataFrames (Structured Streaming) están más optimizados y brindan más opciones de agregación.
Si hablamos de la latencia que hay entre la generación de datos y en la entrega de los datos, Spark Streaming coloca los datos en un lote en función de la marca de tiempo lo que puede generar pérdida de datos. Por otro lado, Structured Streaming tiene la funcionalidad de procesar los datos cuando la marca de tiempo se incluye en los datos recibidos.
Si hablamos de restricciones y flexibilidad de eso, en Spark Streaming no hay restricción de uso de sink, ya que tiene el método foreachRDD, pero en Structures Streaming hay un número limitado de sink, porque no tiene un método del estilo.
Se puede concluir que Structured Streaming es una mejor plataforma de Streaming en comparación con Spark Streaming.

Fuente: https://blog.knoldus.com/spark-streaming-vs-structured-streaming/

#raise NotImplementedError()