# Fingers 3

## Lectura de datos

In [1]:
from pyspark.sql import SparkSession
from os.path import join

In [2]:
spark = SparkSession \
    .builder \
    .getOrCreate()

In [3]:
airlines = df = spark.read.csv(join('datasets', 'airlines.csv'), header=True)
flights = df = spark.read.csv(join('datasets', 'flights.csv'), header=True)
airports = df = spark.read.csv(join('datasets', 'airports.csv'), header=True)

In [4]:
airlines.createOrReplaceTempView("airlines")
flights.createOrReplaceTempView("flights")
airports.createOrReplaceTempView("airports")

Ahora me fijo que columnas hay en los datos:

In [5]:
spark.sql('SELECT * from airlines LIMIT 5').show()

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
+---------+--------------------+



In [6]:
spark.sql('SELECT * from airports LIMIT 5').show()

+---------+--------------------+-----------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+-----------+-----+-------+--------+----------+
|      ABE|Lehigh Valley Int...|  Allentown|   PA|    USA|40.65236| -75.44040|
|      ABI|Abilene Regional ...|    Abilene|   TX|    USA|32.41132| -99.68190|
|      ABQ|Albuquerque Inter...|Albuquerque|   NM|    USA|35.04022|-106.60919|
|      ABR|Aberdeen Regional...|   Aberdeen|   SD|    USA|45.44906| -98.42183|
|      ABY|Southwest Georgia...|     Albany|   GA|    USA|31.53552| -84.19447|
+---------+--------------------+-----------+-----+-------+--------+----------+



In [7]:
spark.sql('SHOW COLUMNS from flights').show()

+-------------------+
|           col_name|
+-------------------+
|               YEAR|
|              MONTH|
|                DAY|
|        DAY_OF_WEEK|
|            AIRLINE|
|      FLIGHT_NUMBER|
|        TAIL_NUMBER|
|     ORIGIN_AIRPORT|
|DESTINATION_AIRPORT|
|SCHEDULED_DEPARTURE|
|     DEPARTURE_TIME|
|    DEPARTURE_DELAY|
|           TAXI_OUT|
|         WHEELS_OFF|
|     SCHEDULED_TIME|
|       ELAPSED_TIME|
|           AIR_TIME|
|           DISTANCE|
|          WHEELS_ON|
|            TAXI_IN|
+-------------------+
only showing top 20 rows



## Aeropuertos con mayor cantidad de cancelaciones

Ahora vamos a mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones. Como el enunciado no lo aclara mostraremos tanto el código iata como el nombre del aeropuerto:

In [8]:
spark.sql('SELECT first(flights.ORIGIN_AIRPORT) as IATA_CODE, airports.AIRPORT, COUNT(*) as total_cancelled\
    FROM flights INNER JOIN airports\
    ON flights.ORIGIN_AIRPORT = airports.IATA_CODE\
    WHERE CANCELLED = 1\
    GROUP BY airports.AIRPORT\
    ORDER BY total_cancelled DESC\
    LIMIT 5\
').show()

+---------+--------------------+---------------+
|IATA_CODE|             AIRPORT|total_cancelled|
+---------+--------------------+---------------+
|      ORD|Chicago O'Hare In...|           8548|
|      DFW|Dallas/Fort Worth...|           6254|
|      LGA|LaGuardia Airport...|           4531|
|      EWR|Newark Liberty In...|           3110|
|      BOS|Gen. Edward Lawre...|           2654|
+---------+--------------------+---------------+



## Aeropuertos con mayor cantidad de cancelaciones

Ahora mostraremos el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenados por cantidad de vuelos:

In [9]:
query = spark.sql('SELECT airlines.AIRLINE, count(*) as cantidad_de_vuelos FROM flights\
    INNER JOIN airlines ON \
    flights.AIRLINE = airlines.IATA_CODE \
    WHERE flights.ORIGIN_AIRPORT = "ATL" AND flights.DESTINATION_AIRPORT = "LAX" \
    GROUP BY airlines.AIRLINE \
    ORDER BY cantidad_de_vuelos DESC \
')

query.show()

+--------------------+------------------+
|             AIRLINE|cantidad_de_vuelos|
+--------------------+------------------+
|Delta Air Lines Inc.|              3624|
|Southwest Airline...|               962|
|American Airlines...|               765|
|Frontier Airlines...|               215|
|    Spirit Air Lines|               103|
+--------------------+------------------+



## Análisis de query
Analizaremos la query ejecutada en el punto anterior. Para ello primero mostramos las optimizaciones realizadas por Catalyst Optimizer:

In [10]:
query.explain(True)

== Parsed Logical Plan ==
'Sort ['cantidad_de_vuelos DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'count(1) AS cantidad_de_vuelos#194]
   +- 'Filter (('flights.ORIGIN_AIRPORT = ATL) && ('flights.DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, cantidad_de_vuelos: bigint
Sort [cantidad_de_vuelos#194L DESC NULLS LAST], true
+- Aggregate [AIRLINE#11], [AIRLINE#11, count(1) AS cantidad_de_vuelos#194L]
   +- Filter ((ORIGIN_AIRPORT#31 = ATL) && (DESTINATION_AIRPORT#32 = LAX))
      +- Join Inner, (AIRLINE#28 = IATA_CODE#10)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#24,MONTH#25,DAY#26,DAY_OF_WEEK#27,AIRLINE#28,FLIGHT_NUMBER#29,TAIL_NUMBER#30,ORIGIN_AIRPORT#31,DESTINATION_AIRPORT#32,SCHEDULED_DEPARTURE#33,DEPARTURE_TIME#34,DEPARTURE_DELAY#35,TAXI_OUT#36,WHEELS_OFF#37,SC

**¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?**

Se realiza un filter pushdown en la etapa de Pysical Plan. Esto se ve dado que en la etapa anterior el filter se realizaba después del join, y luego se termina realizando antes del join.

**¿Que tipo de Join Físico se realiza? ¿En qué etapa?**

Se ve que en las etapas anteriores al Phisical Plan se utilizaría un inner join, sin embargo en esta última etapa vemos que termina utilizando un BroadcastHashJoin.