# Operaciones avanzadas con DataFrames

## Descripción de las variables

El dataset, obtenido de <a target = "_blank" href="https://www.transtats.bts.gov/Fields.asp?Table_ID=236">este link</a> está compuesto por las siguientes variables referidas siempre al año 2018:

1. **Month** 1-4
2. **DayofMonth** 1-31
3. **DayOfWeek** 1 (Monday) - 7 (Sunday)
4. **FlightDate** fecha del vuelo
5. **Origin** código IATA del aeropuerto de origen
6. **OriginCity** ciudad donde está el aeropuerto de origen
7. **Dest** código IATA del aeropuerto de destino
8. **DestCity** ciudad donde está el aeropuerto de destino  
9. **DepTime** hora real de salida (local, hhmm)
10. **DepDelay** retraso a la salida, en minutos
11. **ArrTime** hora real de llegada (local, hhmm)
12. **ArrDelay** retraso a la llegada, en minutos: se considera que un vuelo ha llegado "on time" si aterrizó menos de 15 minutos más tarde de la hora prevista en el Computerized Reservations Systems (CRS).
13. **Cancelled** si el vuelo fue cancelado (1 = sí, 0 = no)
14. **CancellationCode** razón de cancelación (A = aparato, B = tiempo atmosférico, C = NAS, D = seguridad)
15. **Diverted** si el vuelo ha sido desviado (1 = sí, 0 = no)
16. **ActualElapsedTime** tiempo real invertido en el vuelo
17. **AirTime** en minutos
18. **Distance** en millas
19. **CarrierDelay** en minutos: El retraso del transportista está bajo el control del transportista aéreo. Ejemplos de sucesos que pueden determinar el retraso del transportista son: limpieza de la aeronave, daño de la aeronave, espera de la llegada de los pasajeros o la tripulación de conexión, equipaje, impacto de un pájaro, carga de equipaje, servicio de comidas, computadora, equipo del transportista, problemas legales de la tripulación (descanso del piloto o acompañante) , daños por mercancías peligrosas, inspección de ingeniería, abastecimiento de combustible, pasajeros discapacitados, tripulación retrasada, servicio de inodoros, mantenimiento, ventas excesivas, servicio de agua potable, denegación de viaje a pasajeros en mal estado, proceso de embarque muy lento, equipaje de mano no válido, retrasos de peso y equilibrio.
20. **WeatherDelay** en minutos: causado por condiciones atmosféricas extremas o peligrosas, previstas o que se han manifestado antes del despegue, durante el viaje, o a la llegada.
21. **NASDelay** en minutos: retraso causado por el National Airspace System (NAS) por motivos como condiciones meteorológicas (perjudiciales pero no extremas), operaciones del aeropuerto, mucho tráfico aéreo, problemas con los controladores aéreos, etc.
22. **SecurityDelay** en minutos: causado por la evacuación de una terminal, re-embarque de un avión debido a brechas en la seguridad, fallos en dispositivos del control de seguridad, colas demasiado largas en el control de seguridad, etc.
23. **LateAircraftDelay** en minutos: debido al propio retraso del avión al llegar, problemas para conseguir aterrizar en un aeropuerto a una hora más tardía de la que estaba prevista.

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

# Leemos los datos y quitamos filas con NA y convertimos a numéricas las columnas inferidas incorrectamente
flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://ucmbucket2023/datos/flights-jan-apr-2018.csv")

# Convertimos a enteros y re-categorizamos ArrDelay en una nueva columna ArrDelayCat
# None (< 15 min), Slight(entre 15 y 60 min), Huge (> 60 min)

cleanFlightsDF = flightsDF.withColumn("ArrDelayCat", F.when(F.col("ArrDelay") < 15, "None")\
                                                      .when((F.col("ArrDelay") >= 15) & (F.col("ArrDelay") < 60), "Slight")\
                                                      .otherwise("Huge"))\
                           .cache()

                                                                                

## Hagamos algunas preguntas a los datos para obtener conclusiones

Imaginemos que somos los dueños de una web de viajes que rastrea internet en busca de vuelos en agencias y otras páginas, los compara y recomienda el más adecuado para el aeropuerto. Junto con esta recomendación, querríamos dar también información sobre vuelos fiables y no fiables en lo que respecta a la puntualidad. Esto depende de muchos factores, como el origen y destino, duración del vuelo, hora del día, etc.

### Agrupación y agregaciones

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Cuáles son los vuelos (origen, destino) con mayor retraso medio? ¿Cuántos vuelos existen entre cada par de aeropuertos?</p>  Guardar el resultado en la variable averageDelayOriginDest
<p><b>PISTA</b>: Tras hacer las agregaciones para cada pareja "Origin", "Dest" (una agregación para el retraso medio y otra para contar), aplica el método sort(F.col("avgDelay").desc()) para ordenar de forma decreciente por la nueva columna del retraso medio.
</div>

**Nota:** vamos a practicar con la función `F.round(columna, cifras_decimales)` que recibe un objeto columna numérica y el número de cifras decimales a las que queremos redondearlo, y devuelve otro objeto columna numérico redondeado. OJO: el nuevo objeto columna devuelto no conserva el nombre que tenía el objeto original, sino que trae el nombre por defecto "round(col_original, n_cifras)" así que conviene renombrarlo con `alias()`.

In [2]:
averageDelayOriginDest = cleanFlightsDF.groupBy("Origin", "Dest")\
                                       .agg(F.round(F.mean("ArrDelay"), 3).alias("meanDelay"),
                                            F.count("*").alias("n")
                                           )\
                                       .orderBy("meanDelay", ascending=False)
averageDelayOriginDest.show()



+------+----+---------+---+
|Origin|Dest|meanDelay|  n|
+------+----+---------+---+
|   RDM| MFR|   1347.0|  2|
|   MDT| HPN|    798.0|  1|
|   ORD| GTF|    212.0|  1|
|   ICT| DAY|    210.0|  1|
|   ELM| ATL|    169.0|  2|
|   DSM| PIA|    168.0|  1|
|   ERI| ITH|    160.0|  1|
|   YNG| PIE|    141.0|  1|
|   CMH| HOU|    120.0|  1|
|   HRL| DAL|    111.0|  1|
|   PPG| HNL|  109.857| 35|
|   HNL| PPG|  105.857| 35|
|   PIE| YNG|    104.0|  1|
|   AVP| SFB|     93.0|  1|
|   ACY| MSY|   87.455| 11|
|   CPR| LAS|     85.0|  1|
|   LAS| CPR|     82.0|  1|
|   TTN| BNA|     76.5| 10|
|   MSP| PVD|     74.0|  1|
|   TUL| OKC|     69.0|  1|
+------+----+---------+---+
only showing top 20 rows



                                                                                

<div class="alert alert-block alert-success">
    <p><b>PREGUNTA</b>: ¿Es el avión un medio de transporte fiable?</p>
    <p>(a) Mostrar, para cada aeropuerto de destino, el número de vuelos que hay en cada <i>categoría de retraso</i> (variable ArrDelayCat). En lugar de llamar agg(F.count("*")), podemos llamar a la transformación count() sobre el resultado de groupBy(), y creará automáticamente una columna llamada "count" con los conteos para cada grupo. Guardarlo en la variable arr_delay_cat_df.
<p> (b) Ahora agrupa por cada aeropuerto de origen y de destino, y mostrando una columna distinta por cada tipo de retraso, con el recuento del número de vuelos en cada combinación. Guardarlo en la variable arr_delay_cat_pivot_df. PISTA: utilizar la función pivot("colName").</p>

In [3]:
array_delay_cat_df = cleanFlightsDF\
    .groupBy('Dest','ArrDelayCat')\
    .count()

array_delay_cat_df.show()



+----+-----------+-----+
|Dest|ArrDelayCat|count|
+----+-----------+-----+
| SRQ|       Huge|  123|
| OMA|     Slight| 1030|
| LRD|       None|  664|
| CIU|       None|  163|
| BGR|       Huge|  247|
| BPT|       Huge|   28|
| PPG|       None|   23|
| MMH|       None|  239|
| FNT|       Huge|  154|
| COU|       Huge|  100|
| SRQ|       None| 1789|
| ISN|       None|  448|
| DBQ|       Huge|   36|
| TXK|       None|  269|
| ASE|       None| 2424|
| ABI|     Slight|   87|
| TOL|       None|  559|
| FAT|       Huge|  202|
| ROC|     Slight|  780|
| IAG|     Slight|   26|
+----+-----------+-----+
only showing top 20 rows



                                                                                

In [4]:
array_delay_cat_pivot_df = cleanFlightsDF\
    .groupBy('Origin','Dest')\
    .pivot('ArrDelayCat', ["Slight", "None", "Huge"])\
    .count()

array_delay_cat_pivot_df.show()



+------+----+------+----+----+
|Origin|Dest|Slight|None|Huge|
+------+----+------+----+----+
|   LBB| DEN|    20| 184|  20|
|   TPA| ACY|     4| 112|   4|
|   PHL| MCO|   273|1291| 162|
|   ORD| PDX|    85| 528|  23|
|   SNA| PHX|   256| 967|  57|
|   MDW| MEM|    42| 172|  22|
|   DSM| EWR|    14|  94|  10|
|   SPI| ORD|    37| 255|  50|
|   FSD| ATL|     9|  83|   9|
|   MCI| IAH|    54| 487|  38|
|   SJC| LIH|     5|  83|   1|
|   DSM| MCO|    10|  30|   1|
|   LAS| LIT|    22|  93|   5|
|   PBG| PGD|     6|  19|   1|
|   SHD| LWB|  null|  25|   2|
|   IAD| ILM|     8|  34|   2|
|   EWR| STT|  null|   4|null|
|   CPR| DEN|    55| 330|  51|
|   CVG| BDL|    11|  78|   5|
|   ICT| IAH|    37| 375|  37|
+------+----+------+----+----+
only showing top 20 rows



                                                                                

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Hay relación entre el día de la semana y el retraso a la salida o a la llegada?</p>
    <p> (a) Sin usar la función pivot, calcula el retraso medio a la salida y a la llegada para cada día de la semana y ordena por una de ellas descendentemente.</p>
    <p> (b) Ahora haz lo mismo para cada día pero calculando solamente el retraso medio a la llegada, desagregado por cada aeropuerto de origen y destino, utilizando la función pivot() para generar un DF con tantas columnas como días de la semana, más dos (el origen y destino). </p>
</div>

<div class="alert alert-block alert-info">
<p><b>LA FUNCIÓN PIVOT</b>: Puede ser interesante ver, para cada (Origin, Dest), el retraso promedio por
día de la semana. Si agrupamos por esas tres variables (Origin, Dest, DayOfWeek), nuestro resultado tendría demasiadas filas para ser fácil de visualizar (7 x 1009 ya que hay 1009 combinaciones de (Origin, DayOfWeek)). En cambio, vamos a crear 7 columnas, una por día de la semana, en nuestro resultado DF. Lo haremos utilizando una de las variables de agrupación (DayOfWeek) como <i> variable pivot</i>. Como esta variable tiene 7 valores distintos, se crearán 7 columnas nuevas. De esta manera, visualizaremos toda la información de cada combinación (Origen, Dest) condensada en una fila con 7 columnas con los 7 retrasos promedio correspondientes a ese (Origen, Dest) en cada día de la semana.
</div>

In [5]:
ej4a = cleanFlightsDF.groupBy('Origin','Dest', 'DayOfWeek')\
                     .agg(
                          F.mean("ArrDelay").alias("AvgArrDelay"),
                          F.count("*").alias("groupSize")
                     )\
                     .sort(F.col('AvgArrDelay').desc())

ej4a.show()



+------+----+---------+------------------+---------+
|Origin|Dest|DayOfWeek|       AvgArrDelay|groupSize|
+------+----+---------+------------------+---------+
|   RDM| MFR|        7|            1347.0|        2|
|   MDT| HPN|        5|             798.0|        1|
|   RSW| TTN|        2|             393.0|        1|
|   GRK| ATL|        3|             392.5|        2|
|   IND| AZA|        6|             382.0|        1|
|   SJU| SFB|        3|             360.0|        1|
|   SFB| SJU|        3|             357.0|        1|
|   ELM| ATL|        2|             346.0|        1|
|   HNL| IAD|        5|243.33333333333334|        6|
|   PPG| HNL|        5|238.45454545454547|       11|
|   HNL| PPG|        5|238.36363636363637|       11|
|   FCA| ORD|        2|             224.0|        1|
|   OAK| DFW|        2|223.33333333333334|        6|
|   BLV| JAX|        2|             213.0|        1|
|   ORD| GTF|        1|             212.0|        1|
|   ICT| DAY|        2|             210.0|    

                                                                                

In [6]:
ej4b = cleanFlightsDF.groupBy('Origin','Dest')\
               .pivot("DayOfWeek")\
               .agg(
                    F.mean("ArrDelay").alias("AvgArrDelay")  # este alias será ignorado
                )
ej4b.show()



+------+----+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|Origin|Dest|                   1|                   2|                  3|                  4|                  5|                  6|                   7|
+------+----+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|   LBB| DEN|  12.057142857142857| -1.9393939393939394|  4.454545454545454|-11.294117647058824| 27.558823529411764|  7.413793103448276| -4.2272727272727275|
|   TPA| ACY|  -6.647058823529412|  -4.529411764705882|-3.6470588235294117|           -12.5625|            -11.125|-10.588235294117647|  -6.235294117647059|
|   MCI| IAH|               -4.75|-0.17721518987341772|  1.654320987654321| 11.607142857142858| -8.047619047619047|-10.486486486486486| -1.1168831168831168|
|   DSM| MCO|                null|   9.428571428571429|   

                                                                                

### Operaciones JOIN y de ventana

Estaría bien tener el retraso promedio de una ruta junto a cada vuelo, para que podamos ver qué vuelos tuvieron un retraso que fue superior o inferior al retraso promedio de esa ruta.

<div class="alert alert-block alert-success">
    <b> PREGUNTA </b>:
Usa el averageDelayOriginDestDF creado anteriormente, elimina la columna de conteo y luego únerlo con cleanFlightsDF, utilizando Origin y Dest como columnas de enlace. Finalmente, selecciona solo las columnas Origin, Dest, DayOfWeek, ArrDelay y avgDelay del resultado.
</div>

**PREGUNTA**: ¿qué tipo de JOIN utilizarías? ¿Es relevante en este caso?

In [7]:
joinedDF = cleanFlightsDF.join(averageDelayOriginDest, on = ["Origin", "Dest"], how = "left_outer")

# podríamos usar inner también! En este caso da igual, porque todas las combinaciones de Origin y Dest que existan en
# cleanFlightsDF sabemos con seguridad que también existirán en averageDelayOriginDest, ya que este proviene de cleanFlightsDF

joinedDF.show()

23/05/02 09:21:08 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------+----+-----+----------+---------+----------+--------------------+--------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+-----------+---------+---+
|Origin|Dest|Month|DayofMonth|DayOfWeek|FlightDate|          OriginCity|      DestCity|DepTime|DepDelay|ArrTime|ArrDelay|Cancelled|CancellationCode|Diverted|ActualElapsedTime|AirTime|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|ArrDelayCat|meanDelay|  n|
+------+----+-----+----------+---------+----------+--------------------+--------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+-----------+---------+---+
|   BQN| MCO|    1|         1|        1|2018-01-01|       Aguadilla, PR|   Orlando, FL|    412|    -2.0|    618|     9.0|      0.0|           

<div class="alert alert-block alert-info">
    <p><b>BONUS (OPCIONAL)</b>: crear una nueva columna <i>belowAverage</i> que tenga valor True si ArrDelay es menor que el avgDelay de esa ruta, y False en caso contrario. No utilizar la función when() sino el operador de comparación directamente entre columnas, la cual devolverá una columna booleana.
</div>

In [8]:
# Al final seleccionamos solo algunas columnas para mostrar el resultado sin que sea demasiado ancho
ej5 = joinedDF.withColumn('belowAverage', F.col('ArrDelay') < F.col('MeanDelay'))\
              .select("Origin", "Dest", "FlightDate", "ArrDelay", "MeanDelay", "belowAverage") 

ej5.show()

                                                                                

+------+----+----------+--------+---------+------------+
|Origin|Dest|FlightDate|ArrDelay|MeanDelay|belowAverage|
+------+----+----------+--------+---------+------------+
|   BQN| MCO|2018-01-01|     9.0|    5.763|       false|
|   PBI| DCA|2018-01-01|    48.0|   -0.391|       false|
|   PBI| DCA|2018-01-01|    -6.0|   -0.391|        true|
|   PBI| DCA|2018-01-01|    64.0|   -0.391|       false|
|   BQN| MCO|2018-01-02|    67.0|    5.763|       false|
|   PBI| DCA|2018-01-02|    -1.0|   -0.391|        true|
|   PBI| DCA|2018-01-02|    33.0|   -0.391|       false|
|   PBI| DCA|2018-01-02|   -22.0|   -0.391|        true|
|   BQN| MCO|2018-01-03|    29.0|    5.763|       false|
|   PBI| DCA|2018-01-03|    11.0|   -0.391|       false|
|   PBI| DCA|2018-01-03|   362.0|   -0.391|       false|
|   PBI| DCA|2018-01-03|    43.0|   -0.391|       false|
|   BQN| MCO|2018-01-04|    19.0|    5.763|       false|
|   PBI| DCA|2018-01-04|   -12.0|   -0.391|        true|
|   BQN| MCO|2018-01-05|   -14.

<div class="alert alert-block alert-success">
    <b> PREGUNTA </b>:repetir la operación utilizando funciones de ventana, sin usar `join`.
</div>

In [9]:
from pyspark.sql import Window

w = Window().partitionBy(["Origin","Dest"])
flightsWindowDF = flightsDF\
                    .withColumn("MeanDelay", F.mean("ArrDelay").over(w))\
                    .withColumn("belowAverage", F.col("ArrDelay") < F.col("MeanDelay"))\
                    .select("Origin", "Dest", "FlightDate", "ArrDelay", "MeanDelay")\

flightsWindowDF.show(truncate=False)

[Stage 43:>                                                         (0 + 1) / 1]

+------+----+----------+--------+-----------------+
|Origin|Dest|FlightDate|ArrDelay|MeanDelay        |
+------+----+----------+--------+-----------------+
|ABE   |CLT |2018-02-01|36.0    |5.876494023904383|
|ABE   |CLT |2018-02-02|-14.0   |5.876494023904383|
|ABE   |CLT |2018-02-03|9.0     |5.876494023904383|
|ABE   |CLT |2018-02-04|347.0   |5.876494023904383|
|ABE   |CLT |2018-02-05|0.0     |5.876494023904383|
|ABE   |CLT |2018-02-06|2.0     |5.876494023904383|
|ABE   |CLT |2018-02-07|12.0    |5.876494023904383|
|ABE   |CLT |2018-02-08|12.0    |5.876494023904383|
|ABE   |CLT |2018-02-09|-7.0    |5.876494023904383|
|ABE   |CLT |2018-02-10|46.0    |5.876494023904383|
|ABE   |CLT |2018-02-11|null    |5.876494023904383|
|ABE   |CLT |2018-02-12|4.0     |5.876494023904383|
|ABE   |CLT |2018-02-13|31.0    |5.876494023904383|
|ABE   |CLT |2018-02-14|-17.0   |5.876494023904383|
|ABE   |CLT |2018-02-15|19.0    |5.876494023904383|
|ABE   |CLT |2018-02-16|11.0    |5.876494023904383|
|ABE   |CLT 

                                                                                

### Funciones de agregación para traer, en cada grupo, todos los valores de cierta columna que hay en ese grupo. Esto genera una columna de tipo lista o de tipo conjunto

#### Funciones F.collect_list() y F.collect_set(), que pueden utilizarse dentro de agg() en groupBy().agg() y también en una ventana, como ocurre con cualquier función de agregación

#### Funciones para manejar columnas de tipo vector: todas las que empiezan por array_ [aquí](https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#functions)

In [10]:
todasFechasDF = flightsDF.groupBy("Origin", "Dest")\
                    .agg(F.collect_list("FlightDate").alias("todas_fechas"),
                         F.collect_set("FlightDate").alias("fechas_sin_repetidos")
                    )\
                    .withColumn("primer_elemento", F.element_at("todas_fechas", 1)) # primer elemento de cada array

# mostramos 3 filas y solo algunas columnas para que no sea muy largo
todasFechasDF.select("Origin", "Dest", "fechas_sin_repetidos", "primer_elemento").show(3, truncate=False)



+------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Funciones para crear una columna de tipo estructura

Una columna de tipo estructura representa un tipo de dato creado por el usuario, que puede ser "plano", es decir, ser simplemente una tupla, o puede ser "jerárquico" porque dentro de la estructura existan campos que son, a su vez, otras estructuras. El caso más simple es crear una tupla con los valores de varias columnas, fila a fila. Es posible "ordenar" tuplas, porque ordena en base al primer elemento de cada tupla (el más a la izquierda), y si empatan entonces se fija en el segundo, y así sucesivamente.

Esto es útil cuando queremos ordenar dentro de un grupo en base a cierta columna, y después queremos ver el valor concreto de otra columna diferente que estaba en la misma fila, emparejado con el valor máximo o mínimo (es decir, cuando buscamos máximo o mínimo pero *no queremos perder la correspondencia* de ese valor con otros valores de su fila).

Se puede acceder a cada uno de los campos de una tupla con el operador `.` (punto)

**Ejemplo**: para cada par de aeropuertos origen y destino, encontrar la fecha en la que tuvo lugar el vuelo con más retraso a la llegada. Como estamos creando una columna de tipo estructura con dos campos, de los cuales el primero es el ArrDelay, al buscar dentro de cada grupo el máximo de dicha columna de tipo estructura, se fijará justamente en el ArrDelay.

In [11]:
from pyspark.sql import Window

fechaMaxDelayDF = flightsDF\
                    .withColumn("estructura", F.struct("ArrDelay", "FlightDate"))\
                    .groupBy("Origin", "Dest")\
                    .agg(F.max("estructura").alias("max_struct"))\
                    .withColumn("fecha_max_retraso", F.col("max_struct.FlightDate"))

fechaMaxDelayDF.show(truncate=False)



+------+----+--------------------+-----------------+
|Origin|Dest|max_struct          |fecha_max_retraso|
+------+----+--------------------+-----------------+
|ABE   |CLT |{347.0, 2018-02-04} |2018-02-04       |
|ABE   |FLL |{340.0, 2018-03-07} |2018-03-07       |
|ABI   |DFW |{397.0, 2018-01-01} |2018-01-01       |
|ABQ   |ORD |{460.0, 2018-04-16} |2018-04-16       |
|ABY   |ATL |{641.0, 2018-01-07} |2018-01-07       |
|ACY   |FLL |{1385.0, 2018-04-05}|2018-04-05       |
|ACY   |MCO |{296.0, 2018-04-27} |2018-04-27       |
|ACY   |MYR |{352.0, 2018-04-21} |2018-04-21       |
|ACY   |PBI |{132.0, 2018-02-10} |2018-02-10       |
|ADQ   |ANC |{221.0, 2018-03-20} |2018-03-20       |
|AEX   |DFW |{467.0, 2018-02-22} |2018-02-22       |
|AGS   |ATL |{1366.0, 2018-01-23}|2018-01-23       |
|AGS   |DFW |{80.0, 2018-04-03}  |2018-04-03       |
|AGS   |PHL |{215.0, 2018-04-04} |2018-04-04       |
|ALB   |EWR |{390.0, 2018-02-05} |2018-02-05       |
|ALB   |MSP |{131.0, 2018-03-29} |2018-03-29  

                                                                                

<div class="alert alert-block alert-success">
<b> PREGUNTA </b>: Vamos a construir otro DF con información sobre los aeropuertos (en una situación real, tendríamos otra tabla en la base de datos como la tabla de la entidad Aeropuerto). Sin embargo, solo tenemos información sobre algunos aeropuertos. Nos gustaría agregar esta información a cleanFlightsDF como nuevas columnas, teniendo en cuenta que queremos que la información del aeropuerto coincida con el aeropuerto de origen de flightsDF. Utilizar la operación de unión adecuada para asegurarse de que no se perderá ninguna de las filas existentes de cleanFlightsDF después de la unión.
</div>

In [12]:
airportsDF = spark.createDataFrame([
    ("JFK", "John F. Kennedy International Airport", 1948),
    ("LIT", "Little Rock National Airport", 1931),
    ("SEA", "Seattle-Tacoma International Airport", 1949),
], ["IATA", "FullName", "Year"])

airportsDF.show(truncate=False)



+----+-------------------------------------+----+
|IATA|FullName                             |Year|
+----+-------------------------------------+----+
|JFK |John F. Kennedy International Airport|1948|
|LIT |Little Rock National Airport         |1931|
|SEA |Seattle-Tacoma International Airport |1949|
+----+-------------------------------------+----+



                                                                                

In [13]:
joinedFlightsDF = cleanFlightsDF.join(airportsDF, on=cleanFlightsDF.Origin == airportsDF.IATA, how="left_outer")

# PREGUNTA: mostrar algunas filas donde FullName no sea null
joinedFlightsDF.where("FullName is not null").show()

                                                                                

+-----+----------+---------+----------+------+-----------+----+------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+-----------+----+--------------------+----+
|Month|DayofMonth|DayOfWeek|FlightDate|Origin| OriginCity|Dest|    DestCity|DepTime|DepDelay|ArrTime|ArrDelay|Cancelled|CancellationCode|Diverted|ActualElapsedTime|AirTime|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|ArrDelayCat|IATA|            FullName|Year|
+-----+----------+---------+----------+------+-----------+----+------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+-----------+----+--------------------+----+
|    2|         1|        4|2018-02-01|   SEA|Seattle, WA| JFK|New York, NY|    733|    -2.0|   1526|   -38.0|      0.0|    

## User-defined functions (UDFs)

Vamos a construir un UDF para convertir millas a kilómetros. Ten en cuenta que esto podría hacerse fácilmente multiplicando directamente la columna de millas por 1.6 (y sería mucho más eficiente), ya que Spark permite el producto entre una columna y un número. En todos los casos en los que Spark proporciona funciones integradas para realizar una tarea (como esta), debes usar esas funciones y no una UDF. Las UDF deben emplearse solo cuando no hay otra opción.

La razón es que las funciones integradas de Spark están optimizadas y Catalyst, el optimizador automático de código integrado en Spark, puede optimizarlo aún más. Sin embargo, las UDF son una caja negra para Catalyst y su contenido no se optimizará, y por lo tanto, generalmente son mucho más lentas.

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Primer paso: crear una función de Python que reciba UN número y lo multiplique por 1.6
def milesToKm(miles):
    return miles*1.6

# Vamos a probarla
print(milesToKm(5)) # 5 millas a km: 8 km

# Segundo paso: crear un objeto UDF que envuelva a nuestra función. 
# Hay que especificar el tipo de dato que devuelve nuestra función
udfMilesToKm = F.udf(milesToKm, DoubleType())

# Con esto, Spark será capaz de llamar a nuestra función milesToKm sobre cada uno de los valores de una columna numérica.
# Spark enviará el código de nuestra función a los executors a través de la red, y cada executor la ejecutará sobre las
# particiones (una por una) que estén en ese executor

# Tercer paso: vamos a probar la UDF añadiendo una nueva columna con el resultado de la conversión
flightsWithKm = cleanFlightsDF.withColumn("DistKm", udfMilesToKm(F.col("Distance")))

flightsWithKm = cleanFlightsDF.withColumn("DistKm", 1.6 * F.col("Distance"))


flightsWithKm.select("Origin", "Dest", "Distance", "DistKM")\
             .distinct()\
             .show(5)

8.0




+------+----+--------+------------------+
|Origin|Dest|Distance|            DistKM|
+------+----+--------+------------------+
|   FAT| SFO|   158.0|             252.8|
|   EWR| MYR|   550.0|             880.0|
|   IAH| CLT|   912.0|            1459.2|
|   BUF| EWR|   282.0|451.20000000000005|
|   ROP| GUM|    56.0| 89.60000000000001|
+------+----+--------+------------------+
only showing top 5 rows



                                                                                

<div class="alert alert-block alert-info">
<p><b>BONUS</b>: Crea tu propia UDF que convierta DayOfWeek en una cadena.
Puedes hacerlo creando una función de Python que reciba un número entero y devuelva el día de la semana,
simplemente leyendo desde un vector de cadenas de longitud 7 el valor en la posición indicada por el argumento entero. Para la UDF, recuerda que tu función devuelve un StringType(). Finalmente, prueba tu UDF creando una nueva columna "DayOfWeekString".
</div>

In [15]:
from pyspark.sql.types import StringType

from pyspark.sql import types as T

# Primer paso: creamos una función de python que convierte un número entero en el día de la semana como cadena
def dayOfWeekToString(dayInteger):
    # En nuestros datos Monday es 1 pero las listas de python empiezan en el 0 y 
    # queremos usar el dayInteger como índice del vector
    daysOfWeek = ["", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    return daysOfWeek[dayInteger]
    
# Segundo paso: ajustamos nuestra función con un Spark UDF para que Spark pueda invocarlo en cada valor de una columna completa
# De esta manera, Spark puede enviar nuestra función a los ejecutores, que eventualmente ejecutarán la función en las particiones
# de los datos que tiene cada ejecutor
dayOfWeekStringUDF = F.udf(dayOfWeekToString, T.StringType())

# Tercer paso: intentemos nuestro UDF agregando una nueva columna que resulta de transformar (a través del UDF) el
# columna existente DayOfWeek
flightsWithDayOfWeekStr = cleanFlightsDF.withColumn("DayOfWeekString", dayOfWeekStringUDF(F.col("DayOfWeek")))

flightsWithDayOfWeekStr.select("Origin", "Dest", "DayOfWeek", "DayOfWeekString")\
                       .distinct()\
                       .show()



+------+----+---------+---------------+
|Origin|Dest|DayOfWeek|DayOfWeekString|
+------+----+---------+---------------+
|   BQK| ATL|        4|       Thursday|
|   CVG| PHL|        3|      Wednesday|
|   DTW| DFW|        5|         Friday|
|   SEA| JFK|        2|        Tuesday|
|   JAX| JFK|        2|        Tuesday|
|   RDU| BOS|        3|      Wednesday|
|   SEA| BOS|        3|      Wednesday|
|   AUS| FLL|        3|      Wednesday|
|   JFK| LAS|        5|         Friday|
|   SLC| BOS|        6|       Saturday|
|   BOS| HOU|        6|       Saturday|
|   BDL| MCO|        7|         Sunday|
|   SJU| TPA|        7|         Sunday|
|   PGD| TYS|        6|       Saturday|
|   PIE| CVG|        6|       Saturday|
|   ABE| SFB|        7|         Sunday|
|   LAS| BIS|        7|         Sunday|
|   ROC| PGD|        1|         Monday|
|   EWR| CVG|        1|         Monday|
|   CVG| SFB|        1|         Monday|
+------+----+---------+---------------+
only showing top 20 rows



                                                                                

## UDF con argumentos que no son columnas (usando currificación)

In [None]:
from pyspark.sql.types import DoubleType

def udf_currificada(param_config):
    """
    El parámetro param_config configura si la función se comporta como suma o como producto. 
    Este parámetro no proviene de una columna del DF sino que lo indicamos nosotros de antemano
    """
    def funcion_interna(x1, x2):
        if param_config == "suma":
            return x1+x2
        elif param_config == "producto":
            return x1*x2

    # Envolvemos como UDF el objeto función junto al parámetro que hemos pasado. Aunque param_config
    # es externo a la función "funcion_interna", dicho parámetro es necesario para ejecutar esa función y por
    # tanto, forma parte de la "clausura" de la función interna: es un "dato adjunto" de la función interna
    return F.udf(funcion_interna, DoubleType())


def funcion_interna(tipo, x1, x2):
    if tipo == "suma":
        return x1+x2
    elif tipo == "producto":
        return x1*x2

udf_para_usar = udf_currificada("producto")

cleanFlightsDF.where("ArrDelay is not null and DepDelay is not null")\
              .select(udf_currificada("producto")(F.col("ArrDelay"), F.col("DepDelay")).alias("suma_o_producto"))\
              .show()