# Ejercicio 2. Analísis de datos de vuelos y aeropuertos con Spark SQL

In [66]:
from pyspark.sql import SparkSession

In [67]:
spark = SparkSession.builder.appName('vuelos').getOrCreate()

En este ejercicio trataremos los datos de vuelos que tienen por origen o destino aeropuertos de EEUU, disponibles en las web https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays-on-time-data y https://openflights.org/data.html. Los datos de los vuelos están contenidos en el fichero departuredelays.csv incluyendo entre otros el identificador (código IATA) del aeropuerto origen y el destino, la distancia recorrida y el retraso en minutos. Además, en el archivo airport-codes-na.txt se tiene para cada código IATA, la siguiente información detallada del aeropuerto: Ciudad, Estado, País y Código IATA.

**Carga los datos del archivo departuredelays.csv en un Data Frame**

In [68]:
ruta = "file:///home/ubuntu/datos/sql"

In [69]:
vuelosDf = spark.read.format('csv') \
            .option('header','true') \
            .option('inferSchema','true') \
            .load(ruta + "/departuredelays.csv")

In [70]:
vuelosDf.show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

**Carga los datos del archivo airport-codes-na.txt en un Data Frame.**
Fijate en el archivo de entrada abriéndolo previamente: En este caso el separador no es el estándar ',' si no que es un tabulador '\t'.

In [71]:
aeroDf = spark.read.format('csv') \
            .option('header','true') \
            .option('inferSchema','true') \
            .option('sep','\t') \
            .load(ruta + "/airport-codes-na.txt")

In [72]:
aeroDf.show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows



## Práctica de las operaciones de unión (join), agregación y filtrado

**Calcula las 10 ciudades desde las que han despegado más vuelos**. Para ello, tendrás que: 
1. Hacer un Join entre ambos data frames cruzando el código IATA de origen (origin) con el código IATA del fata frame con información de los aeropuertos.
2. Agrupar por ciudad y hacer el recuento de vuelos. Renombra la columna de recuento a 'NumVuelos' y la columna City a 'CiudadOrigen'
3. Ordenar el resultado para mostrarlo por pantalla.

In [73]:
vuelosOriJoin = vuelosDf.join(aeroDf,vuelosDf['origin']==aeroDf['IATA'])

In [74]:
from pyspark.sql.functions import count, desc

In [75]:
ciudadesVuelos = vuelosOriJoin.groupBy(vuelosOriJoin['City'].alias('CiudadOrigen')) \
        .agg(count(vuelosOriJoin['City']).alias('NumVuelos'))

In [76]:
ciudadesVuelos.orderBy(ciudadesVuelos['NumVuelos'].desc()).show(10)

+-------------+---------+
| CiudadOrigen|NumVuelos|
+-------------+---------+
|      Atlanta|    91484|
|      Chicago|    84284|
|       Dallas|    79754|
|      Houston|    58101|
|  Los Angeles|    54086|
|       Denver|    53148|
|     New York|    49030|
|      Phoenix|    40155|
|San Francisco|    39483|
|    Las Vegas|    33107|
+-------------+---------+
only showing top 10 rows



**Además del número de vuelos 'NumVuelos', añade una columna con los retrasos acumulados por ciudad de origen 'RetrasoAcumulado'**

In [77]:
from pyspark.sql.functions import sum

In [78]:
ciudadesRetrasos = vuelosOriJoin.groupBy(vuelosOriJoin['City'].alias('CiudadOrigen')) \
        .agg(count(vuelosOriJoin['City']).alias('NumVuelos'),sum('delay').alias('RetrasoAcumulado'))

Muestra ahora solo las 10 ciudades con más retrasos acumulados.

In [79]:
ciudadesRetrasos.orderBy(ciudadesRetrasos['RetrasoAcumulado'].desc()).show(10)

+-------------+---------+----------------+
| CiudadOrigen|NumVuelos|RetrasoAcumulado|
+-------------+---------+----------------+
|      Chicago|    84284|         1588183|
|      Atlanta|    91484|         1151087|
|       Denver|    53148|          899406|
|       Dallas|    79754|          849448|
|      Houston|    58101|          808480|
|     New York|    49030|          691817|
|  Los Angeles|    54086|          565490|
|San Francisco|    39483|          501670|
|       Newark|    27656|          452791|
|      Orlando|    28313|          445070|
+-------------+---------+----------------+
only showing top 10 rows



**Haz ahora el cálculo del retraso acumulado por ciudad pero filtrando el resultado para los aeropuertos cuyo país de origen sea 'USA' y el estado sea 'WA' (Washington)**

In [80]:
ciudadesRetrasosWA = vuelosOriJoin.where("Country == 'USA' and State== 'WA'") \
        .groupBy(vuelosOriJoin['City'].alias('CiudadOrigen')) \
        .agg(count(vuelosOriJoin['City']).alias('NumVuelos'),sum('delay').alias('RetrasoAcumulado'))

In [81]:
ciudadesRetrasosWA.orderBy(ciudadesRetrasosWA['RetrasoAcumulado'].desc()).show(10)

+------------+---------+----------------+
|CiudadOrigen|NumVuelos|RetrasoAcumulado|
+------------+---------+----------------+
|     Seattle|    23078|          159086|
|     Spokane|     2044|           12404|
|       Pasco|      623|             949|
+------------+---------+----------------+



## Práctica de la persistencia en disco de los resultados

Trabajaremos con el Data Frame con el número de vuelos y los retrasos calculados para todas las ciudades (sin filtrar).
Antes hemos llamado a ese Data Frame ciudadesRetrasos.

In [82]:
ciudadesRetrasos.show(5)

+------------+---------+----------------+
|CiudadOrigen|NumVuelos|RetrasoAcumulado|
+------------+---------+----------------+
|   Fairbanks|      812|           -2403|
|       Tyler|      661|            2358|
|  Charleston|     3597|           39197|
|  Harrisburg|     1054|           13096|
|       Pasco|      623|             949|
+------------+---------+----------------+
only showing top 5 rows



**Persiste en formato CSV en "file:///home/ubuntu/datos/salida/ciudadesRetrasosCSV"**

Para facilitar el uso de las rutas en los ejercicios de persistencia, puedes usar una variable con la ruta base como la siguiente. Puedes usarla para componer la ruta de salida en cada caso. Ejemplo: rutaSalida + "/ciudadesRetrasosCSV"

In [83]:
rutaSalida = "file:///home/ubuntu/datos/salida"

In [84]:
ciudadesRetrasos.write.format("csv") \
    .save(rutaSalida + "/ciudadesRetrasosCSV")

**Comprueba que los datos se han escrito en el directorio y fijate en el número de archivos que se han generado**

**¿Se ha generado más de un archivo? ¿Por que?**

**Respuesta:** Sí. Porque el DF ciudadesRetrasos está particionado en la memoria del clúster Spark y se generan tantos archivos como particiones

**Persiste de nuevo el DF en la misma ruta sobrescribiendo los datos y almacenando la cabecera**

In [85]:
ciudadesRetrasos.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(rutaSalida + "/ciudadesRetrasosCSV")

Si lo has hecho bien, puedes comprobar que todos los archivos generados en la ruta incluyen la cabecera

**Carga ahora los datos guardados en un nuevo data frame**

In [86]:
retrasosDF = spark.read.format("csv"). \
    option("header", "true"). \
    option("inferSchema", "true"). \
    load(rutaSalida + "/ciudadesRetrasosCSV")

In [87]:
retrasosDF.show(5)

+-------------+---------+----------------+
| CiudadOrigen|NumVuelos|RetrasoAcumulado|
+-------------+---------+----------------+
|    Nashville|    13733|          212243|
|        Flint|      981|            9799|
|Oklahoma City|     4935|           58120|
|    San Diego|    18005|          197190|
|San Francisco|    39483|          501670|
+-------------+---------+----------------+
only showing top 5 rows



Haz un recuento de filas

In [88]:
retrasosDF.count()

229

Ahora prueba a guardar los datos de este nuevo Dataframe en el mismo directorio que los que ya teniamos, como si fuesen nuevos datos (filas) que queremos añadir a los que ya existian.

In [89]:
retrasosDF.write.format("csv") \
    .option("header","True") \
    .mode("append") \
    .save(rutaSalida + "/ciudadesRetrasosCSV")

Carga ahora los datos guardados en disco en nuevo DF y haz un recuento

In [90]:
retrasosDFmas = spark.read.format("csv"). \
    option("header", "true"). \
    option("inferSchema", "true"). \
    load(rutaSalida + "/ciudadesRetrasosCSV")

In [91]:
retrasosDFmas.count()

458

Volvamos ahora al DF ciudadesRetrasos

In [92]:
ciudadesRetrasos.show(5)

+------------+---------+----------------+
|CiudadOrigen|NumVuelos|RetrasoAcumulado|
+------------+---------+----------------+
|   Fairbanks|      812|           -2403|
|       Tyler|      661|            2358|
|  Charleston|     3597|           39197|
|  Harrisburg|     1054|           13096|
|       Pasco|      623|             949|
+------------+---------+----------------+
only showing top 5 rows



Para ver el número de particiones que tiene actualmente, puedes hacer lo siguiente: 

In [93]:
ciudadesRetrasos.rdd.getNumPartitions()

200

Reparticiona ahora el Data Frame para conseguir solo 3 particiones y crea un nuevo DF

In [94]:
ciudadesRetrasosRep3 = ciudadesRetrasos.repartition(3)

Comprueba ahora el número de particiones del nuevo DF

In [95]:
ciudadesRetrasosRep3.rdd.getNumPartitions()

3

Guarda ahora ciudadesRetrasosRep3 a disco en formato CSV, con cabecera y con sobrescritura, usando la misma ruta de antes

In [96]:
ciudadesRetrasosRep3.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(rutaSalida + "/ciudadesRetrasosCSV")

Comprueba cuantos archivos se han generado ahora con el siguiente comando. 

In [97]:
ls /home/ubuntu/datos/salida/ciudadesRetrasosCSV

part-00000-51ea5b23-f94d-4f72-91a3-ba4d78aa0216-c000.csv
part-00001-51ea5b23-f94d-4f72-91a3-ba4d78aa0216-c000.csv
part-00002-51ea5b23-f94d-4f72-91a3-ba4d78aa0216-c000.csv
_SUCCESS


**Respuesta** : 3

**Ahora prueba a escribir el mismo resultado en formato Parquet en la ruta** file:///home/ubuntu/datos/salida/ciudadesRetrasosParquet

In [99]:
ciudadesRetrasosRep3.write.format("parquet") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(rutaSalida + "/ciudadesRetrasosParquet")

**Abre el directorio de salida y comprueba el contenido de los archivos. Responde a las siguientes preguntas** 
1. ¿puedes ver su contenido desde el explorador, notepad, comando cat....? 
2. ¿Están comprimidos? En caso afirmativo, indica la compresión usada.

**Respuesta**: El formato Parquet no es texto plano, por lo que no se puede visualizar con un editor de textos o con cualquier herramienta o comando que no sepa leer este formato. Por defecto, la compresión usada en Spark para el formato Parquet es Snappy.

**Carga ahora desde disco los datos en Parquet a un nuevo DF**

In [100]:
ciudadesFromParquet = spark.read.format('parquet') \
    .load(rutaSalida + "/ciudadesRetrasosParquet")

In [101]:
ciudadesFromParquet.show(5)

+------------+---------+----------------+
|CiudadOrigen|NumVuelos|RetrasoAcumulado|
+------------+---------+----------------+
|   Fairbanks|      812|           -2403|
|  Charleston|     3597|           39197|
| Springfield|     1641|           25896|
|   Allentown|      448|            5113|
|    Valdosta|      253|            2598|
+------------+---------+----------------+
only showing top 5 rows



Cierra ahora la sesión de Spark

In [102]:
spark.stop()