In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/mariapastora.alvarez@bosonit.com/flights_jan_apr_2018.csv")

In [0]:
df1.show(5)

+-----+----------+---------+----------+------+------------+----+------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayofMonth|DayOfWeek|FlightDate|Origin|  OriginCity|Dest|    DestCity|DepTime|DepDelay|ArrTime|ArrDelay|Cancelled|CancellationCode|Diverted|ActualElapsedTime|AirTime|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+----------+---------+----------+------+------------+----+------------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+
|    1|        14|        7|2018-01-14|   SYR|Syracuse, NY| DTW| Detroit, MI|   null|    null|   null|    null|     1.00|               B|    0.00|             null|   null|  374.00|        null|        null|    null|         null|             null|


In [0]:
# Convertimos a enteros y re-categorizamos ArrDelay en una nueva columna ArrDelayCat
# None (< 15 min), Slight(entre 15 y 60 min), Huge (> 60 min)
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
dfclean = df1.withColumn("ArrDelayCat", F.when(F.col("ArrDelay")<15, "None")\
                                         .when((F.col("ArrDelay")>=15)&(F.col("ArrDelay")<=60), "Slight")\
                                         .otherwise("Huge"))\
                            .cache()

In [0]:
#¿Cuáles son los vuelos (origen, destino) con mayor retraso medio? ¿Cuántos vuelos existen entre cada par de aeropuertos?
masretrasoDF = dfclean.groupBy("Origin", "Dest")\
                      .agg(
                            F.mean("ArrDelay").alias("AvgDelay"),\
                            F.count("ArrDelay").alias("Cantidad"))\
                      .sort(F.col("AvgDelay").desc())
                        
masretrasoDF.show()

+------+----+------------------+--------+
|Origin|Dest|          AvgDelay|Cantidad|
+------+----+------------------+--------+
|   RDM| MFR|            1347.0|       1|
|   MDT| HPN|             798.0|       1|
|   ORD| GTF|             212.0|       1|
|   ICT| DAY|             210.0|       1|
|   ELM| ATL|             169.0|       2|
|   DSM| PIA|             168.0|       1|
|   ERI| ITH|             160.0|       1|
|   YNG| PIE|             141.0|       1|
|   CMH| HOU|             120.0|       1|
|   HRL| DAL|             111.0|       1|
|   PPG| HNL|109.85714285714286|      35|
|   HNL| PPG|105.85714285714286|      35|
|   PIE| YNG|             104.0|       1|
|   AVP| SFB|              93.0|       1|
|   ACY| MSY| 87.45454545454545|      11|
|   CPR| LAS|              85.0|       1|
|   LAS| CPR|              82.0|       1|
|   TTN| BNA|              76.5|      10|
|   MSP| PVD|              74.0|       1|
|   TUL| OKC|              69.0|       1|
+------+----+------------------+--

In [0]:
#¿Es el avión un medio de transporte fiable? Mostrar el número de vuelos en cada categoría de retraso.
#En lugar de llamar agg(F.count("*")), podemos llamar a la transformación count() sobre el resultado de groupBy(), y creará automáticamente una columna llamada "count" con los conteos para cada grupo.

delays = dfclean.groupBy("ArrDelayCat").count().show()
    


+-----------+-------+
|ArrDelayCat|  count|
+-----------+-------+
|     Slight| 300807|
|       None|2004727|
|       Huge| 197579|
+-----------+-------+



In [0]:
#Ahora agrupar también por cada aeropuerto de origen, y mostrando una columna distinta por cada tipo de retraso, con el recuento. PISTA: utilizar la función pivot("colName")

delays2 = dfclean.groupBy("Origin", "Dest")\
                 .pivot("ArrDelayCat").agg(
                    F.count("ArrDelayCat").alias("Cantidad")).show()

+------+----+----+----+------+
|Origin|Dest|Huge|None|Slight|
+------+----+----+----+------+
|   MCI| MKE|   8| 172|    18|
|   PBI| DCA|  36| 393|    50|
|   MDW| MEM|  22| 172|    42|
|   ORD| PDX|  23| 528|    85|
|   SMF| BUR|  61| 720|   124|
|   STS| PHX|   9| 105|    14|
|   MCI| IAH|  36| 487|    56|
|   FSD| ATL|   9|  83|     9|
|   PHL| MCO| 160|1291|   275|
|   ATL| GSP|  45|1080|   108|
|   SJC| LIH|   1|  83|     5|
|   DSM| EWR|  10|  94|    14|
|   DSM| MCO|   1|  30|    10|
|   LBB| DEN|  20| 184|    20|
|   SNA| PHX|  55| 967|   258|
|   BQN| MCO|   9|  85|    25|
|   TPA| ACY|   4| 112|     4|
|   PIE| AVP|null|   1|  null|
|   LAS| LIT|   5|  93|    22|
|   PBG| PGD|   1|  19|     6|
+------+----+----+----+------+
only showing top 20 rows



In [0]:
#¿Hay relación entre el día de la semana y el retraso a la salida o a la llegada?

#PISTA: Calcula el retraso medio a la salida y a la llegada para cada día de la semana y ordena por una de ellas descendentemente.

daydelay = df1.groupBy("DayOfWeek").agg(
                    F.mean("DepDelay").alias("AvgDepDelay"),
                    F.mean("ArrDelay").alias("AvgArrDelay")
            ).sort("DayOfWeek")
daydelay.show()

+---------+------------------+-------------------+
|DayOfWeek|       AvgDepDelay|        AvgArrDelay|
+---------+------------------+-------------------+
|        1|10.430177708665964|  5.391113068725289|
|        2| 8.246502522185226| 2.8412409647873806|
|        3|  8.47071347600168| 3.0525338339576717|
|        4|  8.35856546210902| 2.7390527404801026|
|        5|10.220785437977693|  5.027363815430113|
|        6| 6.278199328016013|-0.5748593305876211|
|        7| 9.142161259888235| 3.2344449424598207|
+---------+------------------+-------------------+



In [0]:
#Ahora haz lo mismo para cada día pero solo con el retraso a la llegada, desagregado por cada aeropuerto de salida, utilizando la función pivot()
daydelay2 = df1.groupBy("Origin").pivot("DayOfWeek").agg(
            F.mean("ArrDelay").alias("AvgArrDelay")
            ).sort("Origin").toPandas()

daydelay2


Unnamed: 0,Origin,1,2,3,4,5,6,7
0,ABE,14.953307,13.893519,15.276786,5.924370,18.183333,6.940299,10.027149
1,ABI,10.650000,16.364706,-0.547619,0.376344,4.641304,16.492063,4.606383
2,ABQ,1.442438,0.363406,1.736409,3.111111,-0.176892,-1.655721,-0.252019
3,ABR,10.571429,12.468750,6.294118,16.441176,8.529412,0.125000,9.117647
4,ABY,14.207547,3.978723,12.804348,10.645833,9.040816,29.562500,27.234043
...,...,...,...,...,...,...,...,...
351,XNA,9.592857,9.375580,6.838462,1.854015,3.877941,-0.977208,5.438861
352,YAK,-9.451613,-13.000000,-10.727273,2.848485,-9.470588,-12.088235,-10.090909
353,YKM,0.338462,-0.490909,5.666667,1.359375,6.269841,5.941176,1.603175
354,YNG,9.000000,,,141.000000,,,


In [0]:
#LA FUNCIÓN PIVOT: Puede ser interesante ver, para cada (Origin, Dest), el retraso promedio por día de la semana. Si agrupamos por esas tres variables (Origin, Dest, DayOfWeek), nuestro resultado tendría demasiadas filas para ser fácil de visualizar (7 x 1009 ya que hay 1009 combinaciones de (Origin, DayOfWeek)). En cambio, vamos a crear 7 columnas, una por día de la semana, en nuestro resultado DF. Lo haremos utilizando una de las variables de agrupación (DayOfWeek) como variable pivot. Como esta variable tiene 7 valores distintos, se crearán 7 columnas nuevas. De esta manera, visualizaremos toda la información de cada combinación (Origen, Dest) condensada en una fila con 7 columnas con los 7 retrasos promedio correspondientes a ese (Origen, Dest) en cada día de la semana.

daydelay3 = df1.groupBy("Origin", "Dest").pivot("DayOfWeek").agg(
                      F.mean("ArrDelay").alias("AvgArrDelay"),
                      F.mean("DepDelay").alias("AvgDepDelay")
                ).sort("Origin").toPandas()

daydelay3

Unnamed: 0,Origin,Dest,1_AvgArrDelay,1_AvgDepDelay,2_AvgArrDelay,2_AvgDepDelay,3_AvgArrDelay,3_AvgDepDelay,4_AvgArrDelay,4_AvgDepDelay,5_AvgArrDelay,5_AvgDepDelay,6_AvgArrDelay,6_AvgDepDelay,7_AvgArrDelay,7_AvgDepDelay
0,ABE,CLT,0.050000,-1.375000,7.921053,6.078947,5.181818,1.909091,0.542857,2.685714,-0.361111,0.361111,16.090909,12.212121,12.888889,9.277778
1,ABE,ATL,16.000000,17.230769,17.119048,19.952381,6.256410,6.384615,-1.380000,4.880000,4.458333,14.711111,-12.148148,0.518519,5.846154,10.394737
2,ABE,PGD,-0.222222,0.888889,,,0.727273,3.909091,,,72.176471,71.176471,,,,
3,ABE,SFB,-0.160000,6.360000,27.705882,28.058824,23.625000,29.875000,2.000000,12.166667,32.086957,41.260870,21.464286,24.107143,1.666667,8.750000
4,ABE,PIE,,,17.800000,18.600000,,,-3.588235,3.823529,,,-3.200000,0.700000,2.777778,3.555556
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5790,YAK,CDV,-8.642857,-2.866667,-5.117647,-5.176471,-3.250000,-3.750000,20.176471,15.882353,1.529412,2.352941,-15.647059,-14.058824,-14.562500,-17.250000
5791,YKM,SEA,0.338462,-0.630769,-0.490909,-0.781818,5.666667,6.140351,1.359375,-0.828125,6.269841,3.317460,5.941176,7.980392,1.603175,1.555556
5792,YNG,SFB,9.000000,15.000000,,,,,,,,,,,,
5793,YNG,PIE,,,,,,,141.000000,111.000000,,,,,,


In [0]:
#PREGUNTA : Usa el averageDelayOriginDestDF creado anteriormente, elimina la columna de conteo y luego únerlo con cleanFlightsDF, utilizando Origin y Dest como columnas de enlace. Finalmente, selecciona solo las columnas Origin, Dest, DayOfWeek, ArrDelay y avgDelay del resultado

averageDelayOriginDestDF = dfclean.groupBy("Origin", "Dest").agg(
    F.mean("ArrDelay").alias("avgArrDelay")
)

join = dfclean.join(averageDelayOriginDestDF, on = (dfclean["Origin"] == averageDelayOriginDestDF["Origin"]) & (dfclean["Dest"] == averageDelayOriginDestDF["Dest"]))\
        .drop(dfclean["Origin"]).drop(dfclean["Dest"])\
              .select("Origin","Dest", "DayOfWeek","ArrDelay", "AvgArrDelay")

join.show()

+------+----+---------+--------+------------------+
|Origin|Dest|DayOfWeek|ArrDelay|       AvgArrDelay|
+------+----+---------+--------+------------------+
|   BQN| MCO|        1|    9.00| 5.762711864406779|
|   PBI| DCA|        1|   48.00|-0.391025641025641|
|   PBI| DCA|        1|   -6.00|-0.391025641025641|
|   PBI| DCA|        1|   64.00|-0.391025641025641|
|   BQN| MCO|        2|   67.00| 5.762711864406779|
|   PBI| DCA|        2|   -1.00|-0.391025641025641|
|   PBI| DCA|        2|   33.00|-0.391025641025641|
|   PBI| DCA|        2|  -22.00|-0.391025641025641|
|   BQN| MCO|        3|   29.00| 5.762711864406779|
|   PBI| DCA|        3|   11.00|-0.391025641025641|
|   PBI| DCA|        3|  362.00|-0.391025641025641|
|   PBI| DCA|        3|   43.00|-0.391025641025641|
|   BQN| MCO|        4|   19.00| 5.762711864406779|
|   PBI| DCA|        4|  -12.00|-0.391025641025641|
|   BQN| MCO|        5|  -14.00| 5.762711864406779|
|   BQN| MCO|        1|  -23.00| 5.762711864406779|
|   PBI| DCA

In [0]:
#BONUS (OPCIONAL): crear una nueva columna belowAverage que tenga valor True si ArrDelay es menor que el avgDelay de esa ruta, y False en caso contrario. No utilizar la función when() sino el operador de comparación directamente entre columnas, la cual devolverá una columna booleana.
from pyspark.sql import Window
w = Window().partitionBy("Origin", "Dest")
joinwindow = dfclean.withColumn("avgArrDelay", F.mean("ArrDelay").over(w))\
                    .select("Origin", "Dest", "DayOfWeek", "ArrDelay", "avgArrDelay")\
                    .withColumn("belowAverage", F.col("ArrDelay") < F.col("avgArrDelay"))

joinwindow.show()

+------+----+---------+--------+-----------------+------------+
|Origin|Dest|DayOfWeek|ArrDelay|      avgArrDelay|belowAverage|
+------+----+---------+--------+-----------------+------------+
|   ABE| CLT|        1|   -4.00|5.876494023904383|        true|
|   ABE| CLT|        2|  -14.00|5.876494023904383|        true|
|   ABE| CLT|        3|    1.00|5.876494023904383|        true|
|   ABE| CLT|        4|  -14.00|5.876494023904383|        true|
|   ABE| CLT|        5|  -17.00|5.876494023904383|        true|
|   ABE| CLT|        6|  -18.00|5.876494023904383|        true|
|   ABE| CLT|        7|   -2.00|5.876494023904383|        true|
|   ABE| CLT|        1|   -7.00|5.876494023904383|        true|
|   ABE| CLT|        2|   -5.00|5.876494023904383|        true|
|   ABE| CLT|        3|   -5.00|5.876494023904383|        true|
|   ABE| CLT|        4|  -22.00|5.876494023904383|        true|
|   ABE| CLT|        5|  -11.00|5.876494023904383|        true|
|   ABE| CLT|        6|   -5.00|5.876494