# Operaciones avanzadas con DataFrames

## Descripción de las variables

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType


In [3]:
# Spark Context
spark = SparkSession.builder.appName("flights").config("spark.sql.caseSensitive", "True").getOrCreate()

In [4]:
# Leemos los datos y quitamos filas con NA y convertimos a numéricas las columnas inferidas incorrectamente
flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("/data/flights_jan08.csv")

# Convertimos a enteros y re-categorizamos ArrDelay en una nueva columna ArrDelayCat
# None (< 15 min), Slight(entre 15 y 60 min), Huge (> 60 min)

cleanFlightsDF = flightsDF.where("ArrDelay != 'NA' and DepDelay != 'NA'")\
                          .withColumn("ArrDelay", F.col("ArrDelay").cast(IntegerType()))\
                          .withColumn("DepDelay", F.col("DepDelay").cast(IntegerType()))\
                          .withColumn("ArrDelayCat", F.when(F.col("ArrDelay") < 15, "None")\
                                                      .when((F.col("ArrDelay") >= 15) & (F.col("ArrDelay") < 60), "Slight")\
                                                      .otherwise("Huge"))\
                           .cache()

## Primeras conclusiones

ESCENARIO:

Imaginemos que somos los dueños de una web de viajes que rastrea internet en busca de vuelos en agencias y otras páginas, los compara y recomienda el más adecuado para el aeropuerto. Junto con esta recomendación, querríamos dar también información sobre vuelos fiables y no fiables en lo que respecta a la puntualidad. Esto depende de muchos factores, como el origen y destino, duración del vuelo, hora del día, etc.

### Agrupación y agregaciones

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Cuáles son los vuelos (origen, destino) con mayor retraso medio? ¿Cuántos vuelos existen entre cada par de aeropuertos?</p>


In [5]:
retrasoMedioDF = flightsDF.groupBy("Origin", "Dest").agg(F.mean(F.col("ArrDelay")).alias("avgDelay"),
                                                       F.count("*").alias("cuantos"))\
                                                       .sort(F.col("avgDelay").desc())
retrasoMedioDF.show()

+------+----+------------------+-------+
|Origin|Dest|          avgDelay|cuantos|
+------+----+------------------+-------+
|   ONT| SAN|             257.0|      2|
|   ABQ| IAH|             149.0|      2|
|   CLE| SDF|              98.0|      1|
|   LCH| IAH|              60.0|      1|
|   TUL| EWR|              57.0|      1|
|   SYR| CLE|              46.0|      1|
|   ORD| EWR|              46.0|      1|
|   MDW| SFO|38.895348837209305|     87|
|   LAS| SFO| 38.85128205128205|    199|
|   IAH| SLC|              36.0|      1|
|   LAX| SFO| 35.08040201005025|    229|
|   SFO| LAS|  32.9639175257732|    199|
|   EWR| RDU|              31.5|      2|
|   MCI| IAH|              31.0|      3|
|   IAH| SAV|              31.0|      1|
|   LAS| BDL|29.310344827586206|     29|
|   SAN| SFO|29.275229357798164|    228|
|   SFO| SAN| 28.80275229357798|    228|
|   SFO| MDW|28.229885057471265|     87|
|   GEG| LGB|              28.0|      1|
+------+----+------------------+-------+
only showing top

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Es el avión un medio de transporte fiable? Mostrar el número de vuelos en cada categoría de retraso.</p>
En lugar de llamar agg(F.count("*")), podemos llamar a la transformación count() sobre el resultado de groupBy(), y creará
automáticamente una columna llamada "count" con los conteos para cada grupo.
<p> Ahora agrupar también por cada aeropuerto de origen, y mostrando una columna distinta por cada tipo de retraso, con el recuento.</p>

In [6]:
cleanFlightsDF.groupBy("ArrDelayCat").count().show()

+-----------+-----+
|ArrDelayCat|count|
+-----------+-----+
|     Slight|14879|
|       None|79069|
|       Huge| 4750|
+-----------+-----+



In [7]:
cleanFlightsDF.groupBy("Origin", "Dest", "ArrDelayCat").count().show()

+------+----+-----------+-----+
|Origin|Dest|ArrDelayCat|count|
+------+----+-----------+-----+
|   LAS| GEG|       None|   39|
|   OAK| BUR|       Huge|   26|
|   BOI| LAS|       None|   44|
|   BWI| SDF|       None|  109|
|   CMH| MDW|     Slight|   23|
|   DEN| TPA|       Huge|    2|
|   BNA| DEN|       Huge|    2|
|   TPA| IND|     Slight|    4|
|   RNO| MDW|     Slight|    2|
|   MDW| STL|       Huge|   15|
|   MHT| MCO|       None|  127|
|   PHX| OKC|       None|   67|
|   RDU| BWI|     Slight|   17|
|   SEA| BOI|       None|   62|
|   SJC| SAN|       Huge|   15|
|   SMF| PDX|     Slight|   40|
|   BHM| MCO|       None|   76|
|   BNA| PHL|       None|   60|
|   BUF| MCO|       None|   68|
|   CMH| PHL|       None|   67|
+------+----+-----------+-----+
only showing top 20 rows



In [8]:
cleanFlightsDF.groupBy("Origin", "Dest").pivot("ArrDelayCat").count().show()

+------+----+----+----+------+
|Origin|Dest|Huge|None|Slight|
+------+----+----+----+------+
|   SMF| BUR|  12| 218|    55|
|   PHL| MCO|   2| 169|    17|
|   SNA| PHX|   7| 190|    24|
|   LAS| LIT|NULL|  29|  NULL|
|   MCI| IAH|   1|   1|     1|
|   BFL| SAN|   3|  23|     4|
|   AUS| ELP|   2|  98|     8|
|   SJC| ONT|   9| 193|    36|
|   PVD| LAS|NULL|  28|     1|
|   BWI| MDW|  10| 187|    18|
|   HOU| PHL|   2|  38|    11|
|   PVD| TPA|   2|  79|     5|
|   MCO| PVD|   2| 158|    14|
|   FLL| PVD|NULL|  23|     5|
|   IND| TPA|NULL|  28|     1|
|   SMF| PHX|   5| 156|    26|
|   BUF| BWI|   6| 150|    21|
|   PHX| TUL|   6|  65|     9|
|   SEA| RNO|   1|  51|     5|
|   BNA| SAN|   2|  44|    10|
+------+----+----+----+------+
only showing top 20 rows



In [9]:
cleanFlightsDF.groupBy("Origin", "Dest").pivot("ArrDelayCat").agg(
    F.count("*").alias("conteo"),
    F.mean("Distance").alias("avgDist")
).show()

+------+----+-----------+------------+-----------+------------+-------------+--------------+
|Origin|Dest|Huge_conteo|Huge_avgDist|None_conteo|None_avgDist|Slight_conteo|Slight_avgDist|
+------+----+-----------+------------+-----------+------------+-------------+--------------+
|   SMF| BUR|         12|       358.0|        218|       358.0|           55|         358.0|
|   PHL| MCO|          2|       861.0|        169|       861.0|           17|         861.0|
|   SNA| PHX|          7|       338.0|        190|       338.0|           24|         338.0|
|   LAS| LIT|       NULL|        NULL|         29|      1295.0|         NULL|          NULL|
|   MCI| IAH|          1|       643.0|          1|       643.0|            1|         643.0|
|   BFL| SAN|          3|       215.0|         23|       215.0|            4|         215.0|
|   AUS| ELP|          2|       528.0|         98|       528.0|            8|         528.0|
|   SJC| ONT|          9|       333.0|        193|       333.0|       

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Hay relación entre el día de la semana y el retraso a la salida o a la llegada?</p>
    <p> Ahora haz lo mismo para cada día pero solo con el retraso a la llegada, desagregado por cada aeropuerto de salida, utilizando la función pivot(). </p>
</div>

In [10]:
avgDelaysDF = cleanFlightsDF.groupBy("DayOfWeek").agg(F.mean("DepDelay"), F.mean("ArrDelay"))\
                            .orderBy(F.col("avg(DepDelay)").desc())
avgDelaysDF.show()

+---------+------------------+-------------------+
|DayOfWeek|     avg(DepDelay)|      avg(ArrDelay)|
+---------+------------------+-------------------+
|        7|17.091833613702402|  12.19062547790182|
|        4|16.447597472278527| 13.387921783712889|
|        5|12.010452961672474| 7.6849477351916375|
|        1| 9.130252979301693|  4.568959509373475|
|        6|7.9830827067669174|  2.069280343716434|
|        2| 5.136341648814871|0.44647916522700576|
|        3|3.6755377826806397| -1.646511307225593|
+---------+------------------+-------------------+



Como esta variable tiene 7 valores distintos, se crearán 7 columnas nuevas. De esta manera, visualizaremos toda la información de cada combinación (Origen, Dest) condensada en una fila con 7 columnas con los 7 retrasos promedio correspondientes a ese (Origen, Dest) en cada día de la semana.

In [11]:
byOriginDestWithPivot = cleanFlightsDF.groupBy("Origin", "Dest")\
                                 .pivot("DayOfWeek")\
                                 .agg(F.round(F.mean("ArrDelay"), 2))\
                                 .orderBy("Origin", "Dest")

byOriginDestWithPivot.show()

+------+----+-----+------+------+------+------+------+------+
|Origin|Dest|    1|     2|     3|     4|     5|     6|     7|
+------+----+-----+------+------+------+------+------+------+
|   ABQ| AMA| 7.25| -6.25|  -3.5|  29.0| 32.75| 14.75|  7.75|
|   ABQ| AUS|  7.5| -13.5| -10.5| -5.63| -5.63|-11.25|  5.25|
|   ABQ| BWI|-15.5|-15.75|-16.75|-13.33|-19.25|-15.25|-15.75|
|   ABQ| DAL| 6.14| -1.78| -0.39| -2.46|  5.31|  -0.9|  7.61|
|   ABQ| DEN|-7.25| -3.45|  -6.0| -7.25|  -1.0|  2.08|-10.42|
|   ABQ| ELP|  5.5|  2.08| -1.33| 11.75| -1.33| 13.17| 10.63|
|   ABQ| HOU|19.08| -1.92| -2.08|  8.92| -6.42|  14.5| -4.33|
|   ABQ| IAH| NULL|  NULL|  NULL| 278.0|  20.0|  NULL|  NULL|
|   ABQ| LAS| 5.29|  5.61|   1.5|   2.0|  8.46| 10.33| 35.61|
|   ABQ| LAX|-0.31|  0.44| -0.13|  9.19| 18.19|   9.0| 13.69|
|   ABQ| LBB| 2.75| 12.75|  -4.0|   3.0|  8.75|  1.75| 16.75|
|   ABQ| MAF| 14.5| -2.25|  -0.5|   7.0|  6.25|  18.5|  60.5|
|   ABQ| MCI|-4.75| -8.63|-11.88|  -9.0| 13.63|  1.33| 21.63|
|   ABQ|

### Join operations

It would be nice to have the average delay of a route appended next to each flight, so that we can see which flights had a delay that was above or below the average delay of that route.

<div class="alert alert-block alert-success">
    <b> PREGUNTA </b>:
Usa el averageDelayOriginDestDF creado anteriormente, elimina la columna de conteo y luego únerlo con cleanFlightsDF, utilizando Origin y Dest como columnas de enlace.

Finalmente, selecciona solo las columnas Origin, Dest, DayOfWeek, ArrDelay y avgDelay del resultado.
</div>

In [12]:
flightsWithAverage = cleanFlightsDF.join(retrasoMedioDF, on = ["Origin", "Dest"])
flightsWithAverage.select("Origin", "Dest", "ArrDelay", "avgDelay").show()

+------+----+--------+-------------------+
|Origin|Dest|ArrDelay|           avgDelay|
+------+----+--------+-------------------+
|   IAD| TPA|     -14|-10.310344827586206|
|   IAD| TPA|       2|-10.310344827586206|
|   IND| BWI|      14|0.49382716049382713|
|   IND| BWI|      -6|0.49382716049382713|
|   IND| BWI|      34|0.49382716049382713|
|   IND| JAX|      11|  7.862068965517241|
|   IND| LAS|      57|-2.2413793103448274|
|   IND| LAS|     -18|-2.2413793103448274|
|   IND| MCI|       2| 3.1296296296296298|
|   IND| MCI|     -16| 3.1296296296296298|
|   IND| MCO|       1|-0.5172413793103449|
|   IND| MCO|      80|-0.5172413793103449|
|   IND| MDW|       1| 11.514018691588785|
|   IND| MDW|      10| 11.514018691588785|
|   IND| MDW|      -4| 11.514018691588785|
|   IND| MDW|      11| 11.514018691588785|
|   IND| PHX|      15| -4.103448275862069|
|   IND| PHX|     -15| -4.103448275862069|
|   IND| TPA|      16| -9.172413793103448|
|   ISP| BWI|      37|  1.764102564102564|
+------+---

In [13]:
flightsComparedWithAverage = flightsWithAverage.withColumn("belowAverage", F.col("ArrDelay") < F.col("avgDelay"))

flightsComparedWithAverage.select("Origin", "Dest", "ArrDelay", "avgDelay", "belowAverage").show()

+------+----+--------+-------------------+------------+
|Origin|Dest|ArrDelay|           avgDelay|belowAverage|
+------+----+--------+-------------------+------------+
|   IAD| TPA|     -14|-10.310344827586206|        true|
|   IAD| TPA|       2|-10.310344827586206|       false|
|   IND| BWI|      14|0.49382716049382713|       false|
|   IND| BWI|      -6|0.49382716049382713|        true|
|   IND| BWI|      34|0.49382716049382713|       false|
|   IND| JAX|      11|  7.862068965517241|       false|
|   IND| LAS|      57|-2.2413793103448274|       false|
|   IND| LAS|     -18|-2.2413793103448274|        true|
|   IND| MCI|       2| 3.1296296296296298|        true|
|   IND| MCI|     -16| 3.1296296296296298|        true|
|   IND| MCO|       1|-0.5172413793103449|       false|
|   IND| MCO|      80|-0.5172413793103449|       false|
|   IND| MDW|       1| 11.514018691588785|        true|
|   IND| MDW|      10| 11.514018691588785|        true|
|   IND| MDW|      -4| 11.514018691588785|      

<div class="alert alert-block alert-success">
<b> PREGUNTA </b>: Vamos a construir otro DF con información sobre los aeropuertos (en una situación real, tendríamos otra tabla en la base de datos como la tabla de la entidad Aeropuerto).

</div>

In [14]:
airportsDF = spark.createDataFrame([
    ("JFK", "John F. Kennedy International Airport", 1948),
    ("LIT", "Little Rock National Airport", 1931),
    ("SEA", "Seattle-Tacoma International Airport", 1949),
], ["IATA", "FullName", "Year"])

In [17]:
joinedFlightsDF = cleanFlightsDF.join(airportsDF, cleanFlightsDF.Origin == airportsDF.IATA, "inner").select('FullName','Origin','Dest','ArrDelay')

joinedFlightsDF.show()

+--------------------+------+----+--------+
|            FullName|Origin|Dest|ArrDelay|
+--------------------+------+----+--------+
|Little Rock Natio...|   LIT| STL|      40|
|Little Rock Natio...|   LIT| STL|      -3|
|Little Rock Natio...|   LIT| PHX|     -12|
|Little Rock Natio...|   LIT| MDW|     -18|
|Little Rock Natio...|   LIT| LAS|      21|
|Little Rock Natio...|   LIT| HOU|      50|
|Little Rock Natio...|   LIT| DAL|      53|
|Little Rock Natio...|   LIT| DAL|       2|
|Little Rock Natio...|   LIT| DAL|      -1|
|Little Rock Natio...|   LIT| DAL|       4|
|Little Rock Natio...|   LIT| DAL|     113|
|Little Rock Natio...|   LIT| DAL|      85|
|Little Rock Natio...|   LIT| DAL|       0|
|Little Rock Natio...|   LIT| BWI|     115|
|Little Rock Natio...|   LIT| STL|      -2|
|Little Rock Natio...|   LIT| STL|      -9|
|Little Rock Natio...|   LIT| PHX|       3|
|Little Rock Natio...|   LIT| MDW|     -16|
|Little Rock Natio...|   LIT| LAS|      -7|
|Little Rock Natio...|   LIT| HO

## User-defined functions (UDFs)

Vamos a construir un UDF para convertir millas a kilómetros. Esto podría hacerse fácilmente multiplicando directamente la columna de millas por 1.6 (y sería mucho más eficiente), ya que Spark permite el producto entre una columna y un número. En todos los casos en los que Spark proporciona funciones integradas para realizar una tarea (como esta), debes usar esas funciones y no una UDF.

Las UDF deben emplearse solo cuando no hay otra opción.
La razón es que las funciones integradas de Spark están optimizadas y Catalyst, el optimizador automático de código integrado en Spark, puede optimizarlo aún más. Sin embargo, las UDF son una caja negra para Catalyst y su contenido no se optimizará, y por lo tanto, generalmente son mucho más lentas.

In [None]:
def milesToKm(miles):
    return miles*1.6

In [None]:
udfMilesToKm = F.udf(milesToKm, DoubleType())

# Con esto, Spark será capaz de llamar a nuestra función milesToKm sobre cada uno de los valores de una columna numérica.
# Spark enviará el código de nuestra función a los executors a través de la red, y cada executor la ejecutará sobre las
# particiones (una por una) que estén en ese executor

flightsWithKm = cleanFlightsDF.withColumn("DistKm", udfMilesToKm(F.col("Distance")))

flightsWithKm.select("Origin", "Dest", "Distance", "DistKM")\
             .distinct()\
             .show(5)

8.0
+------+----+--------+------------------+
|Origin|Dest|Distance|            DistKM|
+------+----+--------+------------------+
|   LAS| ALB|    2237|3579.2000000000003|
|   LIT| LAS|    1295|            2072.0|
|   PHX| PHL|    2075|            3320.0|
|   HOU| SAT|     192|307.20000000000005|
|   TUS| OMA|    1046|1673.6000000000001|
+------+----+--------+------------------+
only showing top 5 rows



### UDF que convierta DayOfWeek en una cadena.

In [None]:
def dayOfWeekToString(dayInteger):

    daysOfWeek = ["", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

    return daysOfWeek[dayInteger]

In [None]:
dayOfWeekStringUDF = F.udf(dayOfWeekToString, StringType())

flightsWithDayOfWeekStr = cleanFlightsDF.withColumn("DayOfWeekString", dayOfWeekStringUDF(F.col("DayOfWeek")))

flightsWithDayOfWeekStr.select("Origin", "Dest", "DayOfWeek", "DayOfWeekString")\
                       .distinct()\
                       .show()

+------+----+---------+---------------+
|Origin|Dest|DayOfWeek|DayOfWeekString|
+------+----+---------+---------------+
|   MCI| MDW|        4|       Thursday|
|   OAK| PDX|        4|       Thursday|
|   LAS| PDX|        5|         Friday|
|   OAK| SLC|        5|         Friday|
|   ORF| LAS|        5|         Friday|
|   PHX| OAK|        5|         Friday|
|   SLC| PDX|        5|         Friday|
|   BDL| MDW|        6|       Saturday|
|   DAL| HOU|        6|       Saturday|
|   SJC| LAS|        6|       Saturday|
|   SLC| BWI|        6|       Saturday|
|   STL| SDF|        6|       Saturday|
|   BDL| MCO|        7|         Sunday|
|   BUR| OAK|        7|         Sunday|
|   HOU| LAX|        7|         Sunday|
|   IAD| MDW|        7|         Sunday|
|   MCI| PHX|        7|         Sunday|
|   DAL| STL|        1|         Monday|
|   OMA| STL|        1|         Monday|
|   PHX| ONT|        1|         Monday|
+------+----+---------+---------------+
only showing top 20 rows

