In [1]:
# Inicialización del ambiente de Spark y la SparkSession
import pyspark

sc = pyspark.SparkContext.getOrCreate()

spark = SparkSession \
    .builder \
    .appName("Spark SQL Finger 3") \
    .getOrCreate()

In [2]:
df_flights = spark.read.csv("data/flights.csv",
                    sep=",", inferSchema="true", header="true")
df_flights.createOrReplaceTempView("flights")

df_airlines = spark.read.csv("data/airlines.csv",
                    sep=",", inferSchema="true", header="true")
df_airlines.createOrReplaceTempView("airlines")

df_airports = spark.read.csv("data/airports.csv",
                    sep=",", inferSchema="true", header="true")
df_airports.createOrReplaceTempView("airports")

## Veamos qué columnas tenemos en cada tabla.

In [3]:
df_flights.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [4]:
df_airlines.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



In [5]:
df_airports.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)



# Punto 1: Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones.

In [6]:
top5_origin_airports = spark.sql("SELECT airports.AIRPORT as origin_airport, COUNT(*) as total_cancelled\
                                FROM flights INNER JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE\
                                WHERE CANCELLED = 1 GROUP BY airports.AIRPORT ORDER BY total_cancelled DESC")
top5_origin_airports.show(5)

+--------------------+---------------+
|      origin_airport|total_cancelled|
+--------------------+---------------+
|Chicago O'Hare In...|           8548|
|Dallas/Fort Worth...|           6254|
|LaGuardia Airport...|           4531|
|Newark Liberty In...|           3110|
|Gen. Edward Lawre...|           2654|
+--------------------+---------------+
only showing top 5 rows



# Punto 2: Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas por cantidad de vuelos.

In [7]:
query = "SELECT airlines.AIRLINE as airline_name, COUNT(flights.FLIGHT_NUMBER) as total_flights_atl_lax\
                                    FROM flights INNER JOIN airlines ON flights.AIRLINE = airlines.IATA_CODE\
                                    WHERE flights.ORIGIN_AIRPORT = 'ATL' AND flights.DESTINATION_AIRPORT = 'LAX'\
                                    GROUP BY airlines.AIRLINE ORDER BY total_flights_atl_lax DESC"
airlines_flights_atl_lax = spark.sql(query)
airlines_flights_atl_lax.show()

+--------------------+---------------------+
|        airline_name|total_flights_atl_lax|
+--------------------+---------------------+
|Delta Air Lines Inc.|                 3624|
|Southwest Airline...|                  962|
|American Airlines...|                  765|
|Frontier Airlines...|                  215|
|    Spirit Air Lines|                  103|
+--------------------+---------------------+



# Punto 3: Analizar la query del punto 2.

In [8]:
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Sort ['total_flights_atl_lax DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE AS airline_name#152, 'COUNT('flights.FLIGHT_NUMBER) AS total_flights_atl_lax#153]
   +- 'Filter (('flights.ORIGIN_AIRPORT = ATL) && ('flights.DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
airline_name: string, total_flights_atl_lax: bigint
Sort [total_flights_atl_lax#153L DESC NULLS LAST], true
+- Aggregate [AIRLINE#83], [AIRLINE#83 AS airline_name#152, count(FLIGHT_NUMBER#15) AS total_flights_atl_lax#153L]
   +- Filter ((ORIGIN_AIRPORT#17 = ATL) && (DESTINATION_AIRPORT#18 = LAX))
      +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIR

1. Efectivamente se realiza una optimización lógica, en la etapa de análisis del plan a ejecutar, en particular es un filter pushdown.
Como se puede observar en el Optimized Logical Plan, el filtrado por origen y destino se hace antes que el JOIN (además de que ahora se verifica que los campos no sean nulos), a diferencia de lo propuesto en el Analyzed Plan en donde se ejecutaba primero el JOIN y luego el FILTER.
De esta forma se puede trabajar con menos datos en la unión de las tablas.

2. El Join _físico_  es un BroadcastHashJoin. Esto se da en la etapa de Physical Planning, en la cual Catalyst va a armar distintos planes, evaluarlos según su _costo_ y eventualmente seleccionar el que tenga mejor performance.