# Structured Streaming: convirtiendo consultas batch en streaming

<div class="alert alert-block alert-info">
<p><b>PARA SABER MÁS</b>: Notebook interesante sobre Structured Streaming hecho por Databricks 
    <a target = "_blank" href="https://docs.databricks.com/_static/notebooks/structured-streaming-python.html">aquí</a>
</p>
</div>


## Descripción de las variables

El dataset está compuesto por las siguientes variables:

1. **Year** 2008
2. **Month** 1
3. **DayofMonth** 1-31
4. **DayOfWeek** 1 (Monday) - 7 (Sunday)
5. **DepTime** hora real de salida (local, hhmm)
6. **CRSDepTime** hora prevista de salida (local, hhmm)
7. **ArrTime** hora real de llegada (local, hhmm)
8. **CRSArrTime** hora prevista de llegada (local, hhmm)
9. **UniqueCarrier** código del aparato
10. **FlightNum** número de vuelo
11. **TailNum** identificador de cola: aircraft registration, unique aircraft identifier
12. **ActualElapsedTime** tiempo real invertido en el vuelo
13. **CRSElapsedTime** en minutos
14. **AirTime** en minutos
15. **ArrDelay** retraso a la llegada, en minutos: se considera que un vuelo ha llegado "on time" si aterrizó menos de 15 minutos más tarde de la hora prevista en el Computerized Reservations Systems (CRS).
16. **DepDelay** retraso a la salida, en minutos
17. **Origin** código IATA del aeropuerto de origen
18. **Dest** código IATA del aeropuerto de destino
19. **Distance** en millas
20. **TaxiIn** taxi in time, in minutes
21. **TaxiOut** taxi out time in minutes
22. **Cancelled** *si el vuelo fue cancelado (1 = sí, 0 = no)
23. **CancellationCode** razón de cancelación (A = aparato, B = tiempo atmosférico, C = NAS, D = seguridad)
24. **Diverted** *si el vuelo ha sido desviado (1 = sí, 0 = no)
25. **CarrierDelay** en minutos: El retraso del transportista está bajo el control del transportista aéreo. Ejemplos de sucesos que pueden determinar el retraso del transportista son: limpieza de la aeronave, daño de la aeronave, espera de la llegada de los pasajeros o la tripulación de conexión, equipaje, impacto de un pájaro, carga de equipaje, servicio de comidas, computadora, equipo del transportista, problemas legales de la tripulación (descanso del piloto o acompañante) , daños por mercancías peligrosas, inspección de ingeniería, abastecimiento de combustible, pasajeros discapacitados, tripulación retrasada, servicio de inodoros, mantenimiento, ventas excesivas, servicio de agua potable, denegación de viaje a pasajeros en mal estado, proceso de embarque muy lento, equipaje de mano no válido, retrasos de peso y equilibrio.
26. **WeatherDelay** en minutos: causado por condiciones atmosféricas extremas o peligrosas, previstas o que se han manifestado antes del despegue, durante el viaje, o a la llegada.
27. **NASDelay** en minutos: retraso causado por el National Airspace System (NAS) por motivos como condiciones meteorológicas (perjudiciales pero no extremas), operaciones del aeropuerto, mucho tráfico aéreo, problemas con los controladores aéreos, etc.
28. **SecurityDelay** en minutos: causado por la evacuación de una terminal, re-embarque de un avión debido a brechas en la seguridad, fallos en dispositivos del control de seguridad, colas demasiado largas en el control de seguridad, etc.
29. **LateAircraftDelay** en minutos: debido al propio retraso del avión al llegar, problemas para conseguir aterrizar en un aeropuerto a una hora más tardía de la que estaba prevista.

In [1]:
!wget https://github.com/olbapjose/xapi-clojure/raw/master/flightsFolder.zip
!unzip flightsFolder.zip
!hdfs dfs -copyFromLocal flightsFolder /tmp

--2020-11-14 08:42:53--  https://github.com/olbapjose/xapi-clojure/raw/master/flightsFolder.zip
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flightsFolder.zip [following]
--2020-11-14 08:42:54--  https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flightsFolder.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2728655 (2.6M) [application/zip]
Saving to: ‘flightsFolder.zip’


2020-11-14 08:42:54 (28.8 MB/s) - ‘flightsFolder.zip’ saved [2728655/2728655]

Archive:  flightsFolder.zip
   creating: flightsFolder/
  inflating: flightsFolder/flights0.csv  
  inf

In [2]:
!hdfs dfs -ls /tmp/flightsFolder

Found 10 items
-rw-r--r--   2 root hadoop    1043070 2020-11-14 08:43 /tmp/flightsFolder/flights0.csv
-rw-r--r--   2 root hadoop    1042032 2020-11-14 08:43 /tmp/flightsFolder/flights1.csv
-rw-r--r--   2 root hadoop    1042990 2020-11-14 08:43 /tmp/flightsFolder/flights2.csv
-rw-r--r--   2 root hadoop    1048563 2020-11-14 08:43 /tmp/flightsFolder/flights3.csv
-rw-r--r--   2 root hadoop    1043098 2020-11-14 08:43 /tmp/flightsFolder/flights4.csv
-rw-r--r--   2 root hadoop    1028731 2020-11-14 08:43 /tmp/flightsFolder/flights5.csv
-rw-r--r--   2 root hadoop    1034014 2020-11-14 08:43 /tmp/flightsFolder/flights6.csv
-rw-r--r--   2 root hadoop    1068041 2020-11-14 08:43 /tmp/flightsFolder/flights7.csv
-rw-r--r--   2 root hadoop    1028067 2020-11-14 08:43 /tmp/flightsFolder/flights8.csv
-rw-r--r--   2 root hadoop    1044834 2020-11-14 08:43 /tmp/flightsFolder/flights9.csv


In [3]:
!wget https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flights_jan08.csv
!hdfs dfs -copyFromLocal flights_jan08.csv /tmp

--2020-11-14 08:44:54--  https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flights_jan08.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9719583 (9.3M) [text/plain]
Saving to: ‘flights_jan08.csv’


2020-11-14 08:44:54 (74.2 MB/s) - ‘flights_jan08.csv’ saved [9719583/9719583]



In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

# Leemos los datos, quitamos filas con NA y convertimos a numérico
flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("/tmp/flights_jan08.csv")

cleanFlightsDF = flightsDF.where("ArrDelay != 'NA' and DepDelay != 'NA'")\
                          .withColumn("ArrDelay", F.col("ArrDelay").cast(IntegerType()))\
                          .withColumn("DepDelay", F.col("DepDelay").cast(IntegerType()))\
                          .withColumn("ArrDelayCat", F.when(F.col("ArrDelay") < 15, "None")\
                                                      .when((F.col("ArrDelay") >= 15) & (F.col("ArrDelay") < 60), "Slight")\
                                                      .otherwise("Huge"))\
                          .cache() # we will be working with it from now on!

In [5]:
cleanFlightsDF.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- 

### Agregaciones en streaming

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Cuál es el retraso medio por cada destino para vuelos que salen de SFO?
    Convierte esta consulta en streaming</p>
</div>

In [10]:
def retrasoMedioDestDesdeSFO(df):
    return df.filter(F.col("Origin") == "SFO")\
              .groupBy("Dest").agg(F.mean("ArrDelay").alias("Average_delay"))

res = retrasoMedioDestDesdeSFO(cleanFlightsDF)
res.show()

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS|  32.9639175257732|
| MDW|28.229885057471265|
| LAX|23.899497487437184|
| SAN| 28.80275229357798|
+----+------------------+



In [11]:
flightsSchema = cleanFlightsDF.schema # in Structured Streaming the schema is mandatory

streamingFlights = spark.readStream.schema(flightsSchema)\
                        .option("maxFilesPerTrigger", 1)\
                        .csv("/tmp/flightsFolder") # read 1 file in each operation

# Operación de agregación, igual que con un DataFrame convencional
# COMPLETA LA CONSULTA: de los vuelos que salen de SFO, calcular el retraso medio para cada destino
largestAverageSFOstreamingDF = retrasoMedioDestDesdeSFO(streamingFlights)

# largestAverageSFOstreamingDF = streamingFlights.where("Origin = 'SFO'")\
#                                                .groupBy("Dest")\
#                                                .agg(F.mean("ArrDelay").alias("Retraso"))

# Ahora escribimos continuamente el resultado. Solo para test, escribimos en memoria
countQuery = largestAverageSFOstreamingDF\
                .writeStream\
                .queryName("meanArrDelaySFO")\
                .format("memory")\
                .outputMode("complete")\
                .start() # esto lanza el stream

# Puesto que el driver es este notebook de Jupyter y no lo pensamos cerrar,
# hasta que hayamos visualizado correctamente todas las salidas, entonces podemos omitir la línea siguiente

#countQuery.awaitTermination() # obligatorio en aplicaciones en producción para evitar que finalice el driver

### Escribir en memoria implica que se crea una tabla automáticamente en memoria con el nombre de la consulta

In [12]:
import time 

resultDF = spark.sql("select * from meanArrDelaySFO")

time.sleep(10)

resultDF.show()

time.sleep(10)

resultDF.show()

for i in range(10):
    time.sleep(1)
#    resultDF.write.csv("/tmp/result" + str(i) + ".csv")
    resultDF.show()


+----+-------------+
|Dest|Average_delay|
+----+-------------+
+----+-------------+

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS|              18.8|
| MDW|41.416666666666664|
| LAX|              40.0|
| SAN|24.925925925925927|
+----+------------------+

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS|           40.9375|
| MDW|29.136363636363637|
| LAX|              26.5|
| SAN|20.796610169491526|
+----+------------------+

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS| 41.42307692307692|
| MDW|26.870967741935484|
| LAX| 25.47761194029851|
| SAN|25.864197530864196|
+----+------------------+

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS| 43.04054054054054|
| MDW|27.027027027027028|
| LAX|24.402298850574713|
| SAN|26.427083333333332|
+----+------------------+

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
|

In [13]:
resultDF.show()

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS|  32.9639175257732|
| MDW|28.229885057471265|
| LAX|23.899497487437184|
| SAN| 28.80275229357798|
+----+------------------+



In [14]:
resultDF.show()

+----+------------------+
|Dest|     Average_delay|
+----+------------------+
| LAS|  32.9639175257732|
| MDW|28.229885057471265|
| LAX|23.899497487437184|
| SAN| 28.80275229357798|
+----+------------------+

