In [1]:
import pyspark
from pyspark.sql import *

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
df_airline = spark.read.csv("airlines.csv", sep=",", inferSchema="true", header="true")
df_airports = spark.read.csv("airports.csv", sep=",", inferSchema="true", header="true")
df_flights = spark.read.csv("flights.csv", sep=",", inferSchema="true", header="true")

In [4]:
df_airline.createOrReplaceTempView("airlines")
df_airports.createOrReplaceTempView("airports")
df_flights.createOrReplaceTempView("flights")

In [5]:
df_airline.columns

['IATA_CODE', 'AIRLINE']

In [6]:
df_flights.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

# PUNTO 1

In [7]:
query = 'SELECT airports.AIRPORT, COUNT(*) as total_cancelled \
          FROM airports INNER JOIN flights \
          ON airports.IATA_CODE = flights.ORIGIN_AIRPORT \
          WHERE flights.CANCELLED = 1 \
          GROUP BY airports.AIRPORT \
          ORDER BY total_cancelled DESC\
          LIMIT 5'

df_sql = spark.sql(query);

spark.sql(query).show()

+--------------------+---------------+
|             AIRPORT|total_cancelled|
+--------------------+---------------+
|Chicago O'Hare In...|           8548|
|Dallas/Fort Worth...|           6254|
|LaGuardia Airport...|           4531|
|Newark Liberty In...|           3110|
|Gen. Edward Lawre...|           2654|
+--------------------+---------------+



# PUNTO 2

In [8]:
query = 'SELECT airlines.AIRLINE, COUNT(*) as total_vuelos\
        FROM flights INNER JOIN airlines\
        ON airlines.IATA_CODE = flights.AIRLINE\
        WHERE ORIGIN_AIRPORT="ATL" AND DESTINATION_AIRPORT = "LAX"\
        GROUP BY airlines.AIRLINE\
        ORDER BY total_vuelos DESC'


df_sql = spark.sql(query);

spark.sql(query).show()

+--------------------+------------+
|             AIRLINE|total_vuelos|
+--------------------+------------+
|Delta Air Lines Inc.|        3624|
|Southwest Airline...|         962|
|American Airlines...|         765|
|Frontier Airlines...|         215|
|    Spirit Air Lines|         103|
+--------------------+------------+



# PUNTO 3

In [9]:
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Sort ['total_vuelos DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT(1) AS total_vuelos#159]
   +- 'Filter (('ORIGIN_AIRPORT = ATL) && ('DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('airlines.IATA_CODE = 'flights.AIRLINE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, total_vuelos: bigint
Sort [total_vuelos#159L DESC NULLS LAST], true
+- Aggregate [AIRLINE#11], [AIRLINE#11, count(1) AS total_vuelos#159L]
   +- Filter ((ORIGIN_AIRPORT#55 = ATL) && (DESTINATION_AIRPORT#56 = LAX))
      +- Join Inner, (IATA_CODE#10 = AIRLINE#52)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#48,MONTH#49,DAY#50,DAY_OF_WEEK#51,AIRLINE#52,FLIGHT_NUMBER#53,TAIL_NUMBER#54,ORIGIN_AIRPORT#55,DESTINATION_AIRPORT#56,SCHEDULED_DEPARTURE#57,DEPARTURE_TIME#58,DEPARTURE_DELAY#59,TAXI_OUT#60,WHEELS_OFF#61,SCHEDULED_TIME#62,ELAPSED_TIME#63,AIR_TIME#64,DI

a) Se produce un filter pushdown en Optimized Logical Plan.
b) Se produce un BroadcastHashJoin 