# HDFS, Spark SQL y MLlib

Utilizamos Databricks para este proyecto

In [None]:
ruta_hdfs = "dbfs:/FileStore/flights_act1.csv"
flightsDF = "/dbfs/FileStore/flights_act1.csv"

flightsDF = spark.read\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

Imprimimos el esquema para comprobar los tipos de datos

In [None]:
flightsDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [None]:
flightsDF.count()

Out[78]: 162049

Tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [None]:
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

In [None]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .where(F.col("dep_time") == "NA")\
                .count()
cuantos_NA

Out[80]: 857

In [None]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

Out[81]: DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Vemos que ha disminuido ligeramente pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [None]:
flightsLimpiado.count()

Out[82]: 160748

Convertir a tipo entero cada una de esas columnas que eran de tipo string.

In [None]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

Out[99]: DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [None]:
flightsConvertido.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Mostrar las 5 primeras filas del DataFrame limpio.flightsConvertido.show(5)

In [None]:
flightsConvertido.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

* Nuevo DataFrame llamado `aeropuertosOrigenDF` con una columna `origin` y que tenga tantas filas como aeropuertos distintos de *origen* existan. ¿Cuántas filas tiene? 
* Nuevo DataFrame llamado `rutasDistintasDF` con dos columnas `origin`, `dest` y que tenga tantas filas como rutas diferentes existan 

In [None]:
flightsConvertido.createOrReplaceTempView("flight_origin")
flightsConvertido.createOrReplaceTempView("flight_rutas")

In [None]:
aeropuertosOrigenDF = spark.sql("""SELECT DISTINCT origin FROM flight_origin""")
n_origen = aeropuertosOrigenDF.count()
rutasDistintasDF = spark.sql("""SELECT DISTINCT origin, dest FROM flight_rutas""")
n_rutas = rutasDistintasDF.count()

In [None]:
assert(n_origen == 2)
assert(n_rutas == 115)
assert(aeropuertosOrigenDF.count() == n_origen)
assert(rutasDistintasDF.count() == n_rutas)

In [None]:
aeropuertosOrigenDF.show()

+------+
|origin|
+------+
|   SEA|
|   PDX|
+------+



In [None]:
rutasDistintasDF.show(120)

+------+----+
|origin|dest|
+------+----+
|   SEA| RNO|
|   SEA| DTW|
|   SEA| CLE|
|   SEA| LAX|
|   PDX| SEA|
|   SEA| BLI|
|   PDX| IAH|
|   PDX| PHX|
|   SEA| SLC|
|   SEA| SBA|
|   SEA| BWI|
|   PDX| IAD|
|   PDX| SFO|
|   SEA| KOA|
|   SEA| JAC|
|   PDX| MCI|
|   SEA| SJC|
|   SEA| ABQ|
|   SEA| SAT|
|   PDX| ONT|
|   SEA| LAS|
|   SEA| GEG|
|   SEA| ANC|
|   PDX| DEN|
|   PDX| JFK|
|   PDX| ATL|
|   PDX| MDW|
|   PDX| LMT|
|   SEA| HDN|
|   SEA| PHL|
|   PDX| PHL|
|   SEA| SMF|
|   PDX| SJC|
|   SEA| MSP|
|   SEA| SFO|
|   SEA| PDX|
|   SEA| CLT|
|   SEA| IAH|
|   SEA| PSP|
|   SEA| DCA|
|   PDX| KOA|
|   SEA| TUS|
|   PDX| SBA|
|   PDX| SAN|
|   PDX| EWR|
|   PDX| BOS|
|   SEA| LIH|
|   PDX| DCA|
|   SEA| BZN|
|   SEA| COS|
|   PDX| LGB|
|   PDX| ANC|
|   SEA| HNL|
|   PDX| HNL|
|   PDX| LIH|
|   PDX| RNO|
|   SEA| DEN|
|   PDX| OAK|
|   PDX| ABQ|
|   SEA| PHX|
|   SEA| LGB|
|   SEA| FLL|
|   PDX| LAX|
|   PDX| SMF|
|   SEA| MCI|
|   SEA| ATL|
|   SEA| ORD|
|   PDX| DTW|
|   SE

- Hay 2 aeropuestos distintos de origen (SEA y PDX)
- Existen 115 rutas diferentes.

Sólo para los vuelos que llegan con* ***retraso positivo***, el retraso medio a la llegada de dichos vuelos, para cada aeropuerto de destino. 


In [None]:
flightsConvertido.createOrReplaceTempView("flight_retrasos")

import pyspark
from pyspark.sql.functions import avg
from pyspark.sql.functions import col, desc, asc

In [None]:
retrasoMedio = retrasosPositivos.groupBy("dest").avg("arr_delay").sort("avg(arr_delay)")
retrasoMedio.show(5)

+----+------------------+
|dest|    avg(arr_delay)|
+----+------------------+
| LMT|              10.0|
| SAT|12.564705882352941|
| MSY|12.851851851851851|
| SNA|14.538327526132404|
| BLI|15.457627118644067|
+----+------------------+
only showing top 5 rows



In [None]:
def retrasoMedio(flightsConvertido): 
    retrasosPositivos = flightsConvertido.select("dest", "arr_delay").where("arr_delay > 0")
    medio = retrasosPositivos.groupBy("dest").avg("arr_delay").sort("avg(arr_delay)",ascending = False)
    retrasoMedio = medio.withColumnRenamed("avg(arr_delay)","retraso_medio") 
    return retrasoMedio

In [None]:
lista = retrasoMedio(flightsConvertido).take(3)
assert((lista[0].retraso_medio == 64.75) & (lista[0].dest == "BOI"))
assert((lista[1].retraso_medio == 46.8) & (lista[1].dest == "HDN"))
assert((round(lista[2].retraso_medio, 2) == 41.19) & (lista[2].dest == "SFO"))

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

In [None]:
retrasosPositivos = spark.sql("""SELECT arr_delay, dest FROM flight_retrasos WHERE arr_delay > 0 ORDER BY arr_delay DESC""")
retrasosPositivos.show(5)

+---------+----+
|arr_delay|dest|
+---------+----+
|   1539.0| DFW|
|   1454.0| JFK|
|    900.0| MSP|
|    866.0| OGG|
|    844.0| PHL|
+---------+----+
only showing top 5 rows



In [None]:
retrasoMedio = retrasosPositivos.groupBy("dest").avg("arr_delay").sort("avg(arr_delay)")
retrasoMedio.show(5)

+----+------------------+
|dest|    avg(arr_delay)|
+----+------------------+
| LMT|              10.0|
| SAT|12.564705882352941|
| MSY|12.851851851851851|
| SNA|14.538327526132404|
| BLI|15.457627118644067|
+----+------------------+
only showing top 5 rows



In [None]:
retrasoMedio.tail(3)

Out[97]: [Row(dest='SFO', avg(arr_delay)=41.193768844221104),
 Row(dest='HDN', avg(arr_delay)=46.8),
 Row(dest='BOI', avg(arr_delay)=64.75)]

Top 3 de aeropuertos con retraso medio son:
-    1 - BOI con 64.75 minutos,
-    2 - HDN con 46.8 minutos,
-    3 - SFO con 41.19 minutos.