In [16]:
import pyspark
from pyspark.sql import SparkSession

sc = pyspark.SparkContext.getOrCreate()

spark = SparkSession.builder.appName('finger3').getOrCreate()

flights = spark.read.csv('data/flights.csv', header = True)
airports = spark.read.csv('data/airports.csv', header = True)
airlines = spark.read.csv('data/airlines.csv', header = True)

flights.createOrReplaceTempView('flights')
airports.createOrReplaceTempView('airports')
airlines.createOrReplaceTempView('airlines')

## Exploro un poco los sets de datos

In [11]:
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [12]:
airlines.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



In [13]:
airports.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)



# Punto 1. Quiero entender cómo presenta los datos el campo 'CANCELLED'

In [66]:
spark.sql('SELECT CANCELLED, COUNT(CANCELLED) FROM flights GROUP BY (CANCELLED)').show()

+---------+----------------+
|CANCELLED|count(CANCELLED)|
+---------+----------------+
|        0|         5729195|
|        1|           89884|
+---------+----------------+



### Los aeropuertos de origen con mayor cantidad de cancelaciones son:

In [39]:
spark.sql('SELECT airports.AIRPORT, COUNT(airports.AIRPORT) as CANTIDAD FROM airports \
          INNER JOIN flights \
          ON airports.IATA_CODE = flights.ORIGIN_AIRPORT \
          WHERE flights.CANCELLED = 1 \
          GROUP BY airports.AIRPORT \
          ORDER BY CANTIDAD DESC \
          LIMIT 5').show()

+--------------------+--------+
|             AIRPORT|CANTIDAD|
+--------------------+--------+
|Chicago O'Hare In...|    8548|
|Dallas/Fort Worth...|    6254|
|LaGuardia Airport...|    4531|
|Newark Liberty In...|    3110|
|Gen. Edward Lawre...|    2654|
+--------------------+--------+



# Punto 2. Vuelos desde Atlanta a Los Angeles

In [42]:
spark.sql('SELECT ORIGIN_AIRPORT FROM flights GROUP BY (ORIGIN_AIRPORT)').show()

+--------------+
|ORIGIN_AIRPORT|
+--------------+
|           BGM|
|           PSE|
|           INL|
|           DLG|
|         12888|
|           MSY|
|           PPG|
|         12003|
|         15041|
|           GEG|
|           SNA|
|           BUR|
|           GRB|
|           GTF|
|         14986|
|         13851|
|           IDA|
|         11150|
|         15412|
|           GRR|
+--------------+
only showing top 20 rows



### Averiguo qué formato posee la referencia de la Aerolínea en el vuelo:

In [57]:
spark.sql('SELECT flights.AIRLINE FROM flights LIMIT 10').show()

+-------+
|AIRLINE|
+-------+
|     AS|
|     AA|
|     US|
|     AA|
|     AS|
|     DL|
|     NK|
|     US|
|     AA|
|     DL|
+-------+



In [59]:
spark.sql('SELECT airlines.IATA_CODE FROM airlines LIMIT 10').show()

+---------+
|IATA_CODE|
+---------+
|       UA|
|       AA|
|       US|
|       F9|
|       B6|
|       OO|
|       AS|
|       NK|
|       WN|
|       DL|
+---------+



**La referencia en la tabla flights de la Aerolinea está en formato IATA_CODE**

Las aerolineas que realizaron vuelos de ATL a LAX en orden descendiente son las siguientes

In [65]:
spark.sql('SELECT airlines.AIRLINE, COUNT(airlines.AIRLINE) as CANTIDAD FROM airlines \
            JOIN flights \
            ON airlines.IATA_CODE = flights.AIRLINE \
            WHERE flights.ORIGIN_AIRPORT = "ATL" \
            AND flights.DESTINATION_AIRPORT = "LAX" \
            GROUP BY airlines.AIRLINE \
            ORDER BY CANTIDAD DESC').show()

+--------------------+--------+
|             AIRLINE|CANTIDAD|
+--------------------+--------+
|Delta Air Lines Inc.|    3624|
|Southwest Airline...|     962|
|American Airlines...|     765|
|Frontier Airlines...|     215|
|    Spirit Air Lines|     103|
+--------------------+--------+



# Punto 3. Optimizaciones

In [64]:
# Analizamos la query del punto 2.

query = 'SELECT airlines.AIRLINE, COUNT(airlines.AIRLINE) as CANTIDAD FROM airlines \
            JOIN flights \
            ON airlines.IATA_CODE = flights.AIRLINE \
            WHERE flights.ORIGIN_AIRPORT = "ATL" \
            AND flights.DESTINATION_AIRPORT = "LAX" \
            GROUP BY airlines.AIRLINE \
            ORDER BY CANTIDAD DESC'

spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Sort ['CANTIDAD DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT('airlines.AIRLINE) AS CANTIDAD#784]
   +- 'Filter (('flights.ORIGIN_AIRPORT = ATL) && ('flights.DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('airlines.IATA_CODE = 'flights.AIRLINE)
         :- 'UnresolvedRelation `airlines`
         +- 'UnresolvedRelation `flights`

== Analyzed Logical Plan ==
AIRLINE: string, CANTIDAD: bigint
Sort [CANTIDAD#784L DESC NULLS LAST], true
+- Aggregate [AIRLINE#453], [AIRLINE#453, count(AIRLINE#453) AS CANTIDAD#784L]
   +- Filter ((ORIGIN_AIRPORT#363 = ATL) && (DESTINATION_AIRPORT#364 = LAX))
      +- Join Inner, (IATA_CODE#452 = AIRLINE#360)
         :- SubqueryAlias airlines
         :  +- Relation[IATA_CODE#452,AIRLINE#453] csv
         +- SubqueryAlias flights
            +- Relation[YEAR#356,MONTH#357,DAY#358,DAY_OF_WEEK#359,AIRLINE#360,FLIGHT_NUMBER#361,TAIL_NUMBER#362,ORIGIN_AIRPORT#363,DESTINATION_AIRPORT#364,SCHEDUL

**A.** Se produce un Filter pushdown en el *Optimized Logical Plan*, para reducir la cantidad de elementos que van a tener que analizarse en el JOIN.

**B.** Se realiza un join de tipo 'inner' en la etapa *Analyzed Logical Plan*