# Desafío - Spark II

**Nombre:** Julio Valdés
<br/>
**Generación:** GC


### Ejercicio 1: Preliminares

<ul>
    <li>
    Genere una instancia de trabajo en AWS EMR con los componentes necesarios de Spark y habilite un puerto dinámico para utilizar un notebook desde JupyterHub.<li>
Genere un objeto con SparkSession y asegúrese de habilitar el soporte con Hive.
    </li>
    <li>
    Utilizando su objeto creado con SparkSession , realice el import de los objetos parquet que se encuentran en la siguiente dirección del bucket del curso s3://bigdata-desafio/challenges/u4act2/ .
Infiera el schema de cada objeto creado.
    </li>
</ul>

In [42]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("spark-II")\
    .enableHiveSupport()\
    .getOrCreate()

In [43]:
spark

<pyspark.sql.session.SparkSession object at 0x7f4682886b00>

In [44]:
flights = spark.read.parquet('s3://bigdata-desafio/challenges/u4act2/flights.parquet')
airports = spark.read.parquet('s3://bigdata-desafio/challenges/u4act2/airports.parquet')
airlines = spark.read.parquet('s3://bigdata-desafio/challenges/u4act2/airlines.parquet')

In [45]:
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [46]:
airports.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)

In [47]:
airlines.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)

### Ejercicio 2: Implementación de Queries

<p>
En su calidad de Científico de Datos, su jefe le genera una serie de consultas que deberá implementar utilizando sus conocimientos en SparkSQL y sus objetos DataFrame . La única
limitante es que estará trabajando en un cluster habilitado sólo con el kernel PySpark3 , por lo que no podrá utilizar librerías como pandas , numpy y matplotlib . Cabe destacar que usted no tendrá permisos de superusuario para instalar librerías.
</p>

#### Query 1: Cantidad de vuelos por mes. Reporte los meses con una mayor cantidad de vuelos.

In [48]:
flights_by_month = flights\
                    .select('MONTH')\
                    .groupBy(['MONTH'])\
                    .count()\
                    .orderBy(['count'], ascending=False)

In [49]:
flights_by_month.show(5)

+-----+------+
|MONTH| count|
+-----+------+
|    7|520718|
|    8|510536|
|    3|504312|
|    6|503897|
|    5|496993|
+-----+------+
only showing top 5 rows

#### Query 2: Cantidad de vuelos por día y mes. Reporte los días con una mayor cantidad de vuelos.

In [50]:
flights_by_month_day = flights\
                    .select('MONTH', 'DAY')\
                    .groupBy(['MONTH', 'DAY'])\
                    .count()\
                    .orderBy(['count'], ascending=False)

In [51]:
flights_by_month_day.show(5)

+-----+---+-----+
|MONTH|DAY|count|
+-----+---+-----+
|   11| 29|17574|
|    8|  7|17517|
|    7| 24|17474|
|    6| 26|17474|
|    7| 10|17471|
+-----+---+-----+
only showing top 5 rows

#### Query 3: Cantidad de aeropuertos por Estado. Reporte los estados con una mayor cantidad de aeropuertos.

In [52]:
airports_by_state = airports\
                    .select('STATE')\
                    .groupBy(['STATE'])\
                    .count()\
                    .orderBy(['count'], ascending=False)

In [53]:
airports_by_state.show(5)

+-----+-----+
|STATE|count|
+-----+-----+
|   TX|   24|
|   CA|   22|
|   AK|   19|
|   FL|   17|
|   MI|   15|
+-----+-----+
only showing top 5 rows

#### Query 4: Excluyendo los aeropuertos que no aparezcan en la tabla airports , identifique los aeropuertos con una mayor cantidad de vuelos.

In [66]:
flights_by_airports = flights\
                    .join(airports, flights.ORIGIN_AIRPORT == airports.IATA_CODE)\
                    .select('ORIGIN_AIRPORT')\
                    .groupBy(['ORIGIN_AIRPORT'])\
                    .count()\
                    .orderBy(['count'], ascending=False)

In [67]:
flights_by_airports.show(5)

+--------------+------+
|ORIGIN_AIRPORT| count|
+--------------+------+
|           ATL|346836|
|           ORD|285884|
|           DFW|239551|
|           DEN|196055|
|           LAX|194673|
+--------------+------+
only showing top 5 rows

#### Query 5: Excluyendo los aeropuertos que no aparezcan en la tabla airports , identifique los estados con una mayor cantidad de vuelos.

In [97]:
flights.registerTempTable('flights')
airports.registerTempTable('airports')
airlines.registerTempTable('airlines')

In [85]:
flights_by_state = spark.sql( """
    SELECT STATE, count(STATE) FROM flights
    JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE    
    GROUP BY STATE
    ORDER BY count(STATE) DESC
""")

In [86]:
flights_by_state.show(5)

+-----+------------+
|STATE|count(STATE)|
+-----+------------+
|   CA|      647911|
|   TX|      631124|
|   FL|      415586|
|   IL|      381644|
|   GA|      360496|
+-----+------------+
only showing top 5 rows

#### Query 6: Excluyendo los aeropuertos que no aparezcan en la tabla airports , identifique el promedio de retraso en partida (con la columna DEPARTURE_DELAY ) y llegada (con la columna ARRIVAL_DELAY ) para cada aeropuerto de origen (con la columna ORIGIN_AIRPORT ). Reporte los cinco aeropuertos con un mayor retraso promedio de partida.

In [87]:
departure_delay_by_airport = spark.sql( """
    SELECT ORIGIN_AIRPORT, avg(DEPARTURE_DELAY), avg(ARRIVAL_DELAY) FROM flights
    JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE    
    GROUP BY ORIGIN_AIRPORT
    ORDER BY avg(DEPARTURE_DELAY) DESC
""")

In [89]:
departure_delay_by_airport.show(5)

+--------------+------------------------------------+----------------------------------+
|ORIGIN_AIRPORT|avg(CAST(DEPARTURE_DELAY AS DOUBLE))|avg(CAST(ARRIVAL_DELAY AS DOUBLE))|
+--------------+------------------------------------+----------------------------------+
|           ILG|                  29.391752577319586|                24.063157894736843|
|           MVY|                   25.90731707317073|                12.980487804878049|
|           HYA|                  23.182926829268293|                 8.621951219512194|
|           STC|                  18.692307692307693|                19.558441558441558|
|           OTH|                  17.777358490566037|                14.333333333333334|
+--------------+------------------------------------+----------------------------------+
only showing top 5 rows

#### Query 7: Excluyendo los aeropuertos que no aparezcan en la tabla airports , identifique las principales razones de cancelación de vuelos.

In [91]:
flights_by_cancellation_reason = spark.sql( """
    SELECT CANCELLATION_REASON, count(CANCELLATION_REASON) FROM flights
    JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE    
    GROUP BY CANCELLATION_REASON
    ORDER BY count(CANCELLATION_REASON) DESC
""")

In [92]:
flights_by_cancellation_reason.show(5)

+-------------------+--------------------------+
|CANCELLATION_REASON|count(CANCELLATION_REASON)|
+-------------------+--------------------------+
|                  B|                     47874|
|                  A|                     24309|
|                  C|                     15225|
|                  D|                        22|
|               null|                         0|
+-------------------+--------------------------+

#### Query 8: Excluyendo los aeropuertos que no aparezcan en la tabla airports y sólo considerando los cinco aeropuertos con un mayor retraso promedio de partida, identifique las principales causas de cancelación de vuelos.

In [93]:
top_cancellation_reason_by_departure_delay = spark.sql( """
    SELECT CANCELLATION_REASON, count(CANCELLATION_REASON) FROM flights
    JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE
    WHERE ORIGIN_AIRPORT IN ('ILG', 'MVY', 'HYA', 'STC', 'OTH')
    GROUP BY CANCELLATION_REASON
    ORDER BY count(CANCELLATION_REASON) DESC
""")

In [94]:
top_cancellation_reason_by_departure_delay.show(5)

+-------------------+--------------------------+
|CANCELLATION_REASON|count(CANCELLATION_REASON)|
+-------------------+--------------------------+
|                  A|                        11|
|                  B|                         9|
|                  C|                         1|
|               null|                         0|
+-------------------+--------------------------+

#### Query 9: Excluyendo los aeropuertos que no aparezcan en la tabla airports , identifique el tiempo promedio de retraso en partida y llegada para cada aerolínea.

In [98]:
delay_by_airline = spark.sql( """
    SELECT airlines.AIRLINE, avg(DEPARTURE_DELAY), avg(ARRIVAL_DELAY) FROM flights
    JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE
    JOIN airlines ON flights.AIRLINE = airlines.IATA_CODE
    GROUP BY airlines.AIRLINE
""")

In [99]:
delay_by_airline.show(10)

+--------------------+------------------------------------+----------------------------------+
|             AIRLINE|avg(CAST(DEPARTURE_DELAY AS DOUBLE))|avg(CAST(ARRIVAL_DELAY AS DOUBLE))|
+--------------------+------------------------------------+----------------------------------+
|Skywest Airlines ...|                   8.181224917577913|                 6.221438197483382|
|American Eagle Ai...|                  10.685482779292608|                 7.316540271458018|
|      Virgin America|                    9.21121121121121|                 4.979538817121459|
|United Air Lines ...|                   15.13171797238203|                  6.21131564254273|
|Frontier Airlines...|                   14.26797727521563|                13.729467715383208|
|Southwest Airline...|                  10.995318443894886|                  4.83783107128739|
|     JetBlue Airways|                  11.828653922739944|                  6.94933917038418|
|     US Airways Inc.|                   6.1411369