# Actividad 2: Structured Streaming y Kafka

## Es muy importante eliminar siempre las líneas que contienen el código `raise NotImplementedError` ❌

### Iniciamos Actividad 2 desde el final de la actividad 1: Copiamos función `retrasoMedioLlegada` de la Actividad 1 para la actividad 2.

De los trayectos de llegada positiva del bus, calcular el retraso medio de llegada de cada estación.

Importante, se debe de copiar la función de la actividad 1 llamada `retrasoMedioLlegada` en la siguiente celda para esta actividad 2. Lo que devuelve esta función debería de ser 2 columnas: `destination` y `retraso_medio`.

In [1]:
from confluent_kafka.admin import AdminClient, NewTopic

bootstrap_servers = "kafka:9092"
kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})

topic_name = "retrasos"
retrasos_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)

fs = kafka_admin.create_topics([retrasos_topic])

for topic, f in fs.items():
    try:
        f.result()
        print(f"Topic '{topic}' creado correctamente.")
    except Exception as e:
        print(f"Error creando topic '{topic}': {e}")

kafka_admin.list_topics().topics

Error creando topic 'retrasos': KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'retrasos' already exists."}


{'retrasos': TopicMetadata(retrasos, 1 partitions)}

In [1]:
from pyspark.sql import SparkSession

bootstrap_servers = "kafka:9092"

spark = (SparkSession.builder
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
         .getOrCreate())

print("Spark version:", spark.version)

Spark version: 3.5.0


In [None]:
from pyspark.sql import functions as F

def retrasoMedioLlegada(df):
    return (df.where(F.col("delay_end_M") > 0)
                .groupBy("destination")
                .agg(F.avg("delay_end_M").alias("retraso_medio_llegada"))
                .orderBy(F.desc("retraso_medio_llegada")))

### Ejercicio 1

Para implementar una solución en tiempo real utilizando Kafka de manera que nos aseguremos que los cálculos hechos anteriormente "Actividad 1" se actualicen continuamente y en tiempo real, es esencial la configuración adecuada de un DataFrame de Streaming en Spark. Aquí, nuestro objetivo es procesar eficientemente los datos provenientes de Kafka, enfocándonos en los campos `destination` y `delay_end_M`. El proceso implica la utilización de un Streaming DataFrame. El Dataframe será almacenado en la variable `delayStreamingDF` y será el encargado de leer datos del topic `retrasos` directamente desde Apache Kafka. Este Dataframe se construirá de la siguiente manera:

**1. Uso de `readStream` en lugar de `read`:**

- En nuestro entorno de Spark, en lugar de utilizar el método estándar read, optaremos por readStream. Esta elección es fundamental para establecer un flujo de datos continuo, permitiendo que nuestro DataFrame procese datos en tiempo real. readStream es una característica clave de Spark que habilita el procesamiento de streaming, esencial para trabajar con fuentes de datos en tiempo real como Kafka.
    
**2. Establecer el formato de origen a Kafka:**

- Con el uso de .format("kafka"), estamos instruyendo a Spark para que interprete los datos entrantes como provenientes de un servidor Kafka. Este paso es crucial porque define la manera en que Spark va a leer y decodificar los datos, asegurando la compatibilidad y la correcta interpretación del flujo de datos desde Kafka.
    
**3. Configuración de los brokers de Kafka y el puerto:**

- Mediante la opción .option("kafka.bootstrap.servers", "<nombre-cluster>-w-0:9092,<nombre-cluster>-w-1:9092"), especificamos los nodos de Kafka desde los cuales vamos a leer los datos. Aquí, <nombre-cluster>-w-0:9092,<nombre-cluster>-w-1:9092 se refiere a los brokers (nodos) de Kafka en nuestro clúster, con el puerto 9092, que es el puerto por defecto de Kafka. Este paso es vital para establecer una conexión efectiva con Kafka, permitiendo que nuestro DataFrame acceda a los datos distribuidos a través de múltiples nodos en el clúster.


**4. Suscripción al tópico `retrasos`:**

- Utilizando .option("subscribe", "retrasos"), definimos explícitamente el tópico de Kafka al que nuestro DataFrame debe suscribirse. Al especificar "retrasos", nos aseguramos de que solo se lea y procese la información relevante para nuestra tarea, filtrando cualquier otro flujo de datos que no sea pertinente. Esta suscripción es esencial para dirigir el flujo de datos hacia los mensajes específicos que necesitamos.


**5. Cargar la configuración con load():**

- El último paso es invocar load(), lo cual inicia el proceso de lectura y procesamiento de los datos conforme a las configuraciones previas. Este comando es el que activa todas las opciones establecidas anteriormente, y es el paso final para que nuestro DataFrame de streaming comience a funcionar y procesar los datos transmitidos desde Kafka.

In [None]:
delayStreamingDF = None # Sustituye con el código adecuado conforme a las instrucciones previas

# Tu código hazlo aquí
delayStreamingDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_servers).option("subscribe", "retrasos").load()

In [None]:
# Obtenemos y almacenamos los tipos (columnTypes) de datos de cada columna en 'delayStreamingDF'
columnTypes = delayStreamingDF.dtypes

# Comprobamos que 'delayStreamingDF' es un DataFrame de streaming.
assert(delayStreamingDF.isStreaming)

# Verificamos que los tipos de datos de cada columna sean los esperados.
assert((columnTypes[0][0] == "key")             & (columnTypes[0][1] == "binary"))
assert((columnTypes[1][0] == "value")           & (columnTypes[1][1] == "binary"))
assert((columnTypes[2][0] == "topic")           & (columnTypes[2][1] == "string"))
assert((columnTypes[3][0] == "partition")       & (columnTypes[3][1] == "int"))
assert((columnTypes[4][0] == "offset")          & (columnTypes[4][1] == "bigint"))
assert((columnTypes[5][0] == "timestamp")       & (columnTypes[5][1] == "timestamp"))
assert((columnTypes[6][0] == "timestampType")   & (columnTypes[6][1] == "int"))

### Ejercicio 2



**1. Transformación de la Columna `value`:**

- Selecciona la columna `value` y convierte `value` a `StringType` usando `withColumn`. Esto reemplazará la columna `value` existente con la nueva versión convertida. Este paso es crucial ya que permitirá manejar cada fila de esta columna como un fichero JSON completo, facilitando la extracción de datos específicos más adelante.
    
**2. Extracción de Datos de los JSON:**

- Para obtener los campos individuales de cada JSON, usaremos la función `from_json` de Spark. Aplicaremos esta función a cada fila de la columna `value`, lo que nos permitirá parsear el String de JSON según un esquema definido. El resultado será una nueva columna `jsonData` de tipo `struct`, que incluye dos campos, uno de tipo `String` y otro de tipo `Integer`.
    
**3. Acceso a Campos de la Estructura `jsonData`:**

- La columna `jsonData`, al ser un tipo `struct`, nos permite acceder a sus campos mediante el operador punto (.). Utilizando `withColumn` dos veces, crearemos dos nuevas columnas: `destination` y `delay_end_M`. Cada una de estas será el resultado de acceder a los campos respectivos de `jsonData`, es decir, `jsonData.destination` y `jsonData.delay_end_M`.
    
    
Con estos pasos, convertiremos datos en bruto en una forma estructurada y utilizable, preparando el camino para análisis y operaciones más avanzadas.
    


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F

esquema = StructType([\
  StructField("destination", StringType()),\
  StructField("delay_end_M", DoubleType())\
])

destinationDelaysDF = None   # Elimina y descomenta las siguientes lineas y verás que debes completar el código " <COMPLETAR> "

destinationDelaysDF = delayStreamingDF\
    .select("value")\
    .withColumn("value", F.col("value").cast(StringType()))\
    .withColumn("jsonData", F.from_json(F.col("value"), esquema))\
    .withColumn("destination", F.col("jsonData.destination"))\
    .withColumn("delay_end_M", F.col("jsonData.delay_end_M"))


In [None]:
# Almacena en 'columnTypes' los tipos (columnTypes) de datos de cada columna del DataFrame 'destinationDelaysDF'.
columnTypes = destinationDelaysDF.dtypes
assert(("value", "string") in columnTypes)
assert(('jsonData', 'struct<destination:string,delay_end_M:double>') in columnTypes)
assert(('destination', 'string') in columnTypes)
assert(('delay_end_M', 'double') in columnTypes)

Ahora que nuestro DataFrame incluye una columna destination, que identifica la estación de destino, y una columna delay_end_M, que registra los retrasos en formato de números reales, estamos en condiciones de realizar un tipo de análisis similar al que se lleva a cabo en la función retrasoMedioLlegadaStreamingDF. Por lo tanto, procederemos a aplicar retrasoMedioLlegadaStreamingDF, utilizando destinationDelaysDF como su argumento.

In [None]:
# No modifiques el la celda de código
# Se crea el DataFrame 'retrasoMedioLlegadaStreamingDF' mediante la función 'retrasoMedioLlegada'
# sobre 'destinationDelaysDF', con el objetivo de calcular el promedio del tiempo de retraso en las llegadas a destino.
retrasoMedioLlegadaStreamingDF = retrasoMedioLlegada(destinationDelaysDF)

# Configuramos y arrancamos la ejecución en streaming.
# .writeStream configura el flujo de escritura.
# .queryName asigna un nombre a la consulta en streaming para su referencia futura.
# .outputMode establece el modo de salida 'complete', actualizando toda la tabla con cada disparador.
# .format especifica el formato de los resultados de la salida, en este caso, en memoria.
# .start inicia la ejecución de la consulta en streaming.
consoleOutput = retrasoMedioLlegadaStreamingDF\
                    .writeStream\
                    .queryName("retrasosAgg")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()

IllegalArgumentException: Cannot start query with name retrasosAgg as a query with that name is already active in this SparkSession

Después de completar la ejecución de la celda previa, inicia sesión en el productor de Kafka a través de SSH en una de las máquinas (puedes revisar las instrucciones de la práctica para recordar cómo hacerlo). Luego, inserta exactamente estos 4 mensajes a Kafka en formato JSON. Observarás que incluyen un campo 'destination' y un campo 'delay_end_M', representando los datos que recibiríamos en vivo de diferentes estaciones de bus conforme los trayectos van llegando a destino.

Cada vez que introduzcas un mensaje, realiza una consulta utilizando `select * from retrasosAgg` mediante `spark.sql(...)` y observa los resultados en el DataFrame `retrasosAggregadosDF `. Esto ejecutará una consulta contra la vista temporal 'retrasosAgg' que se creó en el metastore de Hive gracias al `writeStream` del paso anterior. Continúa ejecutando el comando `show()` en esa celda hasta que observes un cambio en los resultados, lo cual confirmará que Spark ha procesado el nuevo dato en su cálculo de agregación y actualizado el resultado.

Ten en cuenta que `spark.sql(...)` es una transformación, así que la consulta se ejecutará nuevamente cada vez que utilices `show()` en el resultado. No cachearás nada, lo cual es intencional para forzar la reevaluación de la consulta y así poder ver el contenido actualizado de la tabla (en memoria) en Hive cada vez que hagas `show()`.

Lo que se solicita es:

- Anota el resultado de la agregación (el valor de la columna 'retraso_medio_llegada') para SAO PAULO y RIO DE JANEIRO en las variables designadas para ello cada vez que envíes un mensaje y confirmes que Spark ha añadido esa información a su cálculo.
- No te preocupes si necesitas ejecutar la misma celda varias veces. El cálculo solo se actualizará una vez con cada nuevo mensaje enviado a Kafka. Las ejecuciones subsiguientes mostrarán el mismo resultado hasta que envíes otro mensaje nuevo.

Los 4 mensajes en formato JSON que debes ingresar sucesivamente en Kafka son los siguientes:

- {"destination": "SAO PAULO", "delay_end_M": 4.1}
- {"destination": "RIO DE JANEIRO", "delay_end_M": 8.6}
- {"destination": "SAO PAULO", "delay_end_M": 2.1}
- {"destination": "RIO DE JANEIRO", "delay_end_M": 42.2}
`

In [None]:
retrasosAggregadosDF = spark.sql("select * from retrasosAgg")   # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

# Después de enviar el primer mensaje a Kafka que contiene datos para 'SAO PAULO'
# Ejecutamos la consulta y mostramos los resultados.
retrasosAggregadosDF.show()

# Tu código hazlo aquí
#raise NotImplementedError

+-----------+---------------------+
|destination|retraso_medio_llegada|
+-----------+---------------------+
+-----------+---------------------+



In [None]:
columnas = retrasosAggregadosDF .columns
assert(len(columnas) == 2)
assert("destination" in columnas)
assert("retraso_medio_llegada" in columnas)

In [None]:
retrasosAggregadosDF.show() # Muestra el DataFrame una vez Spark Streaming ya procesado la actualización (enviado a Kafka).
result = spark.sql("select retraso_medio_llegada from retrasosAgg where destination = 'SAO PAULO'")

retraso_medio_SAO_PAULO_primer_mensaje = result.head()["retraso_medio_llegada"]

# Tu código hazlo aquí
retraso_medio_SAO_PAULO_primer_mensaje

+-----------+---------------------+
|destination|retraso_medio_llegada|
+-----------+---------------------+
|  SAO PAULO|                  4.1|
+-----------+---------------------+



4.1

In [None]:
retrasosAggregadosDF.show()

result = spark.sql("select retraso_medio_llegada from retrasosAgg order by destination").collect()

retraso_medio_SAO_PAULO_segundo_mensaje = result[1][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
retraso_medio_RIO_JANEIRO_segundo_mensaje = result[0][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

# Tu código hazlo aquí
print(retraso_medio_SAO_PAULO_segundo_mensaje)
print(retraso_medio_RIO_JANEIRO_segundo_mensaje)

+--------------+---------------------+
|   destination|retraso_medio_llegada|
+--------------+---------------------+
|RIO DE JANEIRO|                  8.6|
|     SAO PAULO|                  4.1|
+--------------+---------------------+

4.1
8.6


In [None]:
retrasosAggregadosDF.show()

result = spark.sql("select retraso_medio_llegada from retrasosAgg order by destination").collect()

retraso_medio_SAO_PAULO_tercer_mensaje = result[1][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
retraso_medio_RIO_JANEIRO_tercer_mensaje = result[0][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

# Tu código hazlo aquí
print(retraso_medio_SAO_PAULO_tercer_mensaje)
print(retraso_medio_RIO_JANEIRO_tercer_mensaje)

+--------------+---------------------+
|   destination|retraso_medio_llegada|
+--------------+---------------------+
|RIO DE JANEIRO|                  8.6|
|     SAO PAULO|   3.0999999999999996|
+--------------+---------------------+

3.0999999999999996
8.6


In [None]:
# Ejecuta varias veces esta celda tras enviar el cuarto mensaje, hasta ver que el DataFrame ha cambiado
retrasosAggregadosDF.show()

result = spark.sql("select retraso_medio_llegada from retrasosAgg order by destination").collect()

retraso_medio_SAO_PAULO_cuarto_mensaje = result[1][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
retraso_medio_RIO_JANEIRO_cuarto_mensaje = result[0][0]  # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

# Tu código hazlo aquí
print(retraso_medio_SAO_PAULO_cuarto_mensaje)
print(retraso_medio_RIO_JANEIRO_cuarto_mensaje)

+--------------+---------------------+
|   destination|retraso_medio_llegada|
+--------------+---------------------+
|RIO DE JANEIRO|   25.400000000000002|
|     SAO PAULO|   3.0999999999999996|
+--------------+---------------------+

3.0999999999999996
25.400000000000002


## Ejercicio 3

**Procesamiento de Datos CSV en Tiempo Real con Spark y Kafka**

El ejercicio 3 es parecido al ejercicio 2 pero en vez de utilizar el formato típico JSON se va a recibir a través de formato CSV. 

Objetivo: Procesar en tiempo real datos de estaciones de autobuses enviados a Kafka en formato CSV. Cada mensaje incluirá un `id_estacion`, `nombre_estacion`, y `numero_pasajeros`.

Lo que se solicita: 

- Calcular el número total de pasajeros por estación.
- Mostrar la ejecución de la celda, mostrará una tabla con el resultado de los 4 mensajes.

Los 4 mensajes en formato CSV que debes ingresar sucesivamente en Kafka son los siguientes:

- 101,Estacion Central,120
- 102,Estacion Norte,90
- 101,Estacion Central,130
- 102,Estacion Norte,110

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

bootstrap_servers = "kafka:9092"

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("CsvDataProcessing").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0").getOrCreate()

csvStreamingDF = None # Sustituye con el código adecuado conforme a las instrucciones previas

# Instrucciones:
# 1. Uso de readStream en lugar de read:
#    Utiliza readStream para crear un DataFrame que procese datos en tiempo real.
# 2. Establecer el formato de origen a Kafka:
#    Configura el DataFrame para leer datos desde Kafka.
# 3. Configuración de los brokers de Kafka y el puerto:
#    Especifica los nodos de Kafka y el puerto para la conexión.
# 4. Suscripción al tópico "datosCSV":
#    Define el tópico de Kafka al que debe suscribirse el DataFrame.
# 5. Cargar la configuración con load():
#    Inicia la lectura y procesamiento de los datos.

# Descomenta las siguientes lineas y verás que debes completar el código " <COMPLETAR> "
# csvStreamingDF = spark.readStream\
#     .format(<COMPLETAR>)\
#     .option(<COMPLETAR>, <COMPLETAR>)\
#     .option(<COMPLETAR>, <COMPLETAR>)\
#     .<COMPLETAR>()


# Tu código hazlo aquí
csvStreamingDF = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)\
    .option("subscribe", "datosCSV")\
    .load()

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.2.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0839d46d-dc46-43dd-96f1-0a4addc45394;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 630ms :: artifacts dl 14ms
	:: modules in us

### Instrucciones:
1. Dividir cada línea por comas para obtener campos individuales.
 - Usa la función `split()` para dividir la cadena `value` en cada `coma`. Esto te dará un array de valores que lo almacenarás/asignarás a `fields`.
2. Utiliza withColumn para crear tres nuevas columnas: 'id_estacion', 'nombre_estacion' y 'numero_pasajeros'.
 - 'id_estacion' será un entero, obtenido del primer elemento tras dividir 'value'.
 - 'nombre_estacion' será una cadena, obtenida del segundo elemento.
 - 'numero_pasajeros' será un entero, obtenido del tercer elemento.

In [None]:
# Convertir los datos binarios a texto
csvStreamingDF = csvStreamingDF.selectExpr("CAST(value AS STRING)")

fields = None   # Elimina y descomenta las siguientes lineas y verás que debes completar el código " <COMPLETAR> "

fields =  fields = split(col("value"), ",")
csvStreamingDF = csvStreamingDF\
    .withColumn("id_estacion", fields.getItem(0).cast("integer"))\
    .withColumn("nombre_estacion", fields.getItem(1))\
    .withColumn("numero_pasajeros", fields.getItem(2).cast("integer"))

# Tu código hazlo aquí
#raise NotImplementedError

### Instrucciones:
    1. Realiza una agregación sobre 'csvStreamingDF' para calcular el número total de pasajeros por estación.
    2. Usa groupBy sobre la columna 'nombre_estacion'.
    3. Calcula la suma de 'numero_pasajeros' y nómbrala como 'total_pasajeros'.
    4. Este proceso creará un nuevo DataFrame 'aggregatedDF'.

In [None]:
from pyspark.sql import functions as F

# Ejemplo de código a completar:
# Calcular el número total de pasajeros por estación
aggregatedDF = csvStreamingDF.groupBy("nombre_estacion").agg(F.sum("numero_pasajeros").alias("total_pasajeros"))

In [None]:
# Configura la salida del stream para guardar los resultados en una tabla en memoria
query = aggregatedDF.writeStream\
                    .queryName("agregadosPasajeros")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()

25/11/11 17:54:10 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/11 17:54:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/11/11 17:54:10 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Después de completar la ejecución de la celda previa, inicia sesión en el productor de Kafka a través de SSH en una de las máquinas (puedes revisar las instrucciones de la práctica para recordar cómo hacerlo). Luego, inserta exactamente estos 4 mensajes a Kafka en formato CSV. Observarás que incluyen un campo `id_estacion`, `nombre_estacion` y un campo `numero_pasajeros`, representando los datos que recibiríamos en vivo de diferentes estaciones de bus conforme los trayectos van llegando a destino.


### Instrucciones:
    1. Utiliza spark.sql para ejecutar una consulta SQL que recupere todos los datos de la tabla 'agregadosPasajeros'.
    2. Almacena el resultado de la consulta en el DataFrame 'retrasosAggregadosDF'.
    3. Utiliza el método .show() para mostrar los resultados de la consulta.


In [None]:
retrasosAggregadosDF = None   # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

retrasosAggregadosDF = spark.sql("select * from agregadosPasajeros")
retrasosAggregadosDF.show()

+---------------+---------------+
|nombre_estacion|total_pasajeros|
+---------------+---------------+
+---------------+---------------+



In [None]:
# Verifica que las columnas sean correctas
columnas = retrasosAggregadosDF.columns
assert(len(columnas) == 2)
assert("nombre_estacion" in columnas)
assert("total_pasajeros" in columnas)

[Stage 3:=====>                                                 (94 + 1) / 1000]

### Instrucciones:
    1. Inicializa una variable para almacenar el total de pasajeros para la "Estacion Central".
    2. Usa el DataFrame 'retrasosAggregadosDF' para filtrar los datos de la "Estacion Central".
    3. Selecciona la columna 'total_pasajeros' y usa .collect() para obtener el valor.


In [None]:
# Mostrar los resultados de la consulta
retrasosAggregadosDF.show()
total_pasajeros_estacion_central_primer_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session


total_pasajeros_estacion_central_primer_mensaje = retrasosAggregadosDF.where(F.col("nombre_estacion") == "Estacion Central")\
                                               .select(F.col("total_pasajeros"))\
                                               .collect()[0]['total_pasajeros']
print(total_pasajeros_estacion_central_primer_mensaje)

+----------------+---------------+
| nombre_estacion|total_pasajeros|
+----------------+---------------+
|Estacion Central|            120|
+----------------+---------------+

120


25/11/11 18:08:40 WARN TaskSetManager: Lost task 585.0 in stage 13.0 (TID 2801) (uax-cluster-w-0.europe-southwest1-a.c.tough-zoo-473614-g1.internal executor 7): java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/585/1.delta of HDFSStateStoreProvider[id = (op=0,part=585),dir = file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/585]: file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/585/1.delta does not exist
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:464)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:420)
	at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
	at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75)
	at org.apache.spark.sql.execution.streaming.state.HDFSBa

### Instrucciones:
    1. Inicializa variables para almacenar el total de pasajeros para las estaciones "Estacion Central" y "Estacion Norte".
    2. Utiliza el DataFrame 'retrasosAggregadosDF' para filtrar los datos de cada estación.
    3. Selecciona la columna 'total_pasajeros' y usa .collect() para obtener los valores.


In [None]:
# Mostrar los resultados de la consulta
retrasosAggregadosDF.show()
total_pasajeros_estacion_central_segundo_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
total_pasajeros_estacion_norte_segundo_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

total_pasajeros_estacion_central_segundo_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Central")\
                                                        .select(col("total_pasajeros"))\
                                                        .collect()[0]['total_pasajeros']

total_pasajeros_estacion_norte_segundo_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Norte")\
                                                      .select(col("total_pasajeros"))\
                                                      .collect()[0]['total_pasajeros']

print(total_pasajeros_estacion_central_segundo_mensaje)
print(total_pasajeros_estacion_norte_segundo_mensaje)

+----------------+---------------+
| nombre_estacion|total_pasajeros|
+----------------+---------------+
|  Estacion Norte|             90|
|Estacion Central|            120|
+----------------+---------------+

120
90


25/11/11 18:11:16 WARN TaskSetManager: Lost task 494.0 in stage 19.0 (TID 3783) (uax-cluster-w-1.europe-southwest1-a.c.tough-zoo-473614-g1.internal executor 11): java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/494/1.delta of HDFSStateStoreProvider[id = (op=0,part=494),dir = file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/494]: file:/tmp/temporary-e96e87ce-1561-4e53-8f2b-f1ba3ffe3703/state/0/494/1.delta does not exist
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:464)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:420)
	at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
	at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75)
	at org.apache.spark.sql.execution.streaming.state.HDFSB

In [None]:
# Mostrar los resultados de la consulta
retrasosAggregadosDF.show()
total_pasajeros_estacion_central_tercero_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
total_pasajeros_estacion_norte_tercero_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
total_pasajeros_estacion_sur_tercero_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session 

total_pasajeros_estacion_central_tercero_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Central")\
                                                        .select(col("total_pasajeros"))\
                                                        .collect()[0]['total_pasajeros']

total_pasajeros_estacion_norte_tercero_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Norte")\
                                                      .select(col("total_pasajeros"))\
                                                      .collect()[0]['total_pasajeros']

#total_pasajeros_estacion_sur_tercero_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Sur")\
#                                                      .select(col("total_pasajeros"))\
#                                                      .collect()[0]['total_pasajeros']

print(total_pasajeros_estacion_central_tercero_mensaje)
print(total_pasajeros_estacion_norte_tercero_mensaje)
#print(total_pasajeros_estacion_sur_tercero_mensaje)

+----------------+---------------+
| nombre_estacion|total_pasajeros|
+----------------+---------------+
|  Estacion Norte|             90|
|Estacion Central|            250|
+----------------+---------------+

250
90


In [None]:
# Mostrar los resultados de la consulta
retrasosAggregadosDF.show()
total_pasajeros_estacion_central_quarto_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
total_pasajeros_estacion_norte_quarto_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session
total_pasajeros_estacion_sur_quarto_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session 
otal_pasajeros_estacion_este_quarto_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session 

total_pasajeros_estacion_central_quarto_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Central")\
                                                        .select(col("total_pasajeros"))\
                                                        .collect()[0]['total_pasajeros']

total_pasajeros_estacion_norte_quarto_mensaje = retrasosAggregadosDF.where(col("nombre_estacion") == "Estacion Norte")\
                                                      .select(col("total_pasajeros"))\
                                                      .collect()[0]['total_pasajeros']

# total_pasajeros_estacion_sur_quarto_mensaje = retrasosAggregadosDF.where(<COMPLETAR>("nombre_estacion") == "Estacion Sur")\
#                                                       .<COMPLETAR>(<COMPLETAR>("total_pasajeros"))\
#                                                       .<COMPLETAR>()[0]['total_pasajeros']

# total_pasajeros_estacion_este_quarto_mensaje = retrasosAggregadosDF.where(<COMPLETAR>("nombre_estacion") == "Estacion Este")\
#                                                       .<COMPLETAR>(<COMPLETAR>("total_pasajeros"))\
#                                                       .<COMPLETAR>()[0]['total_pasajeros']
print(total_pasajeros_estacion_central_quarto_mensaje)
print(total_pasajeros_estacion_norte_quarto_mensaje)

+----------------+---------------+
| nombre_estacion|total_pasajeros|
+----------------+---------------+
|  Estacion Norte|            200|
|Estacion Central|            250|
+----------------+---------------+

250
200


**Insertar en el quinto mensaje la Estacion Central de nuevo para comprobar que realiza correctamente la suma**

In [None]:
# Mostrar los resultados de la consulta
retrasosAggregadosDF.show()
total_pasajeros_estacion_central_quinto_mensaje = None # Cambia esta linea para llamaar al método .sql de la sesión de Spark session

# total_pasajeros_estacion_central_quinto_mensaje = retrasosAggregadosDF.where(<COMPLETAR>("nombre_estacion") == "Estacion Central")\
#                                                         .<COMPLETAR>(<COMPLETAR>("total_pasajeros"))\
#                                                         .<COMPLETAR>()[0]['total_pasajeros']

In [None]:
# Aserción para verificar que el total de pasajeros para 'Estacion Central' es 240
assert total_pasajeros_estacion_central_quinto_mensaje == 240, "El total de pasajeros para Estacion Central debería ser 240"