In [1]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [2]:
type(sc)

pyspark.context.SparkContext

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
type(spark)

pyspark.sql.session.SparkSession

In [5]:
df_airports= spark.read.load(".../flight-delays/airports.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [7]:
df_airports.columns

['IATA_CODE', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE']

In [8]:
df_airlines= spark.read.load(".../flight-delays/airlines.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [9]:
df_airlines.columns

['IATA_CODE', 'AIRLINE']

In [10]:
df_flights= spark.read.load(".../flight-delays/flights.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [11]:
df_flights.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

## Creacion de Vista Temporal

In [12]:
#registramos los dataframe's flight,airlines y airports como una vista de SQL temporal
df_flights.createOrReplaceTempView("flights")
df_airlines.createOrReplaceTempView("airlines")
df_airports.createOrReplaceTempView("airports")


# Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones.

In [41]:
query_1='SELECT airports.AIRPORT, COUNT(flights.FLIGHT_NUMBER) as total_originAirport_cancelled\
                       FROM flights INNER JOIN airports\
                       ON flights.ORIGIN_AIRPORT=airports.IATA_CODE\
                       WHERE CANCELLED=1\
                       GROUP BY airports.AIRPORT\
                       ORDER BY total_originAirport_cancelled DESC\
                       LIMIT 5'
data_sql= spark.sql(query_1);
spark.sql(query_1).show()

+--------------------+-----------------------------+
|             AIRPORT|total_originAirport_cancelled|
+--------------------+-----------------------------+
|Chicago O'Hare In...|                         8548|
|Dallas/Fort Worth...|                         6254|
|LaGuardia Airport...|                         4531|
|Newark Liberty In...|                         3110|
|Gen. Edward Lawre...|                         2654|
+--------------------+-----------------------------+



# Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas cantidad de vuelos

In [35]:
#analizamos como estan expresados cada campo 
spark.sql('SELECT ORIGIN_AIRPORT from flights LIMIT 10').show()

+--------------+
|ORIGIN_AIRPORT|
+--------------+
|           ANC|
|           LAX|
|           SFO|
|           LAX|
|           SEA|
|           SFO|
|           LAS|
|           LAX|
|           SFO|
|           LAS|
+--------------+



In [36]:
#analizamos como estan expresados cada campo 
spark.sql('SELECT DESTINATION_AIRPORT from flights LIMIT 10').show()

+-------------------+
|DESTINATION_AIRPORT|
+-------------------+
|                SEA|
|                PBI|
|                CLT|
|                MIA|
|                ANC|
|                MSP|
|                MSP|
|                CLT|
|                DFW|
|                ATL|
+-------------------+



In [37]:
spark.sql('SELECT AIRLINE from flights LIMIT 10').show()

+-------+
|AIRLINE|
+-------+
|     AS|
|     AA|
|     US|
|     AA|
|     AS|
|     DL|
|     NK|
|     US|
|     AA|
|     DL|
+-------+



In [32]:
spark.sql('SELECT IATA_CODE from airlines LIMIT 10').show()

+---------+
|IATA_CODE|
+---------+
|       UA|
|       AA|
|       US|
|       F9|
|       B6|
|       OO|
|       AS|
|       NK|
|       WN|
|       DL|
+---------+



In [38]:
spark.sql('SELECT AIRLINE from airlines LIMIT 10').show()

+--------------------+
|             AIRLINE|
+--------------------+
|United Air Lines ...|
|American Airlines...|
|     US Airways Inc.|
|Frontier Airlines...|
|     JetBlue Airways|
|Skywest Airlines ...|
|Alaska Airlines Inc.|
|    Spirit Air Lines|
|Southwest Airline...|
|Delta Air Lines Inc.|
+--------------------+



***
## Resolución

In [39]:
query_2='SELECT airlines.AIRLINE, COUNT(flights.FLIGHT_NUMBER) as total_vuelosATL_LAX\
                       FROM flights INNER JOIN airlines\
                       ON flights.AIRLINE=airlines.IATA_CODE\
                       WHERE flights.ORIGIN_AIRPORT="ATL"\
                             AND flights.DESTINATION_AIRPORT="LAX"\
                       GROUP BY airlines.AIRLINE\
                       ORDER BY total_vuelosATL_LAX DESC'
data_sql= spark.sql(query_2);
spark.sql(query_2).show()

+--------------------+-------------------+
|             AIRLINE|total_vuelosATL_LAX|
+--------------------+-------------------+
|Delta Air Lines Inc.|               3624|
|Southwest Airline...|                962|
|American Airlines...|                765|
|Frontier Airlines...|                215|
|    Spirit Air Lines|                103|
+--------------------+-------------------+



# Mostrar y Analizar el Query Plan del punto 2 para entender las optimizaciones que realiza Catalyst Optimizer, contestando las siguientes preguntas:
### * a) ¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?
### * b)¿Que tipo de Join Físico se realiza? ¿En qué etapa?



In [40]:
spark.sql(query_2).explain(True)

== Parsed Logical Plan ==
'Sort ['total_vuelosATL_LAX DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT('flights.FLIGHT_NUMBER) AS total_vuelosATL_LAX#341]
   +- 'Filter (('flights.ORIGIN_AIRPORT = ATL) && ('flights.DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, total_vuelosATL_LAX: bigint
Sort [total_vuelosATL_LAX#341L DESC NULLS LAST], true
+- Aggregate [AIRLINE#35], [AIRLINE#35, count(FLIGHT_NUMBER#53) AS total_vuelosATL_LAX#341L]
   +- Filter ((ORIGIN_AIRPORT#55 = ATL) && (DESTINATION_AIRPORT#56 = LAX))
      +- Join Inner, (AIRLINE#52 = IATA_CODE#34)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#48,MONTH#49,DAY#50,DAY_OF_WEEK#51,AIRLINE#52,FLIGHT_NUMBER#53,TAIL_NUMBER#54,ORIGIN_AIRPORT#55,DESTINATION_AIRPORT#56,SCHEDULED_DEPARTURE#57,DEPARTURE_TIME#58,DEPART

***
### RPTA a) :
###        Efectivamente se realiza un filter pushdown : 
##      == Analyzed Logical Plan ==
     AIRLINE: string, total_vuelosATL_LAX: bigint
    Sort [total_vuelosATL_LAX#280L DESC NULLS LAST], true
    +- Aggregate [AIRLINE#35], [AIRLINE#35, count(FLIGHT_NUMBER#53) AS total_vuelosATL_LAX#280L]
####    +- Filter ((ORIGIN_AIRPORT#55 = ATL) && (DESTINATION_AIRPORT#56 = LAX))
      +- Join Inner, (AIRLINE#52 = IATA_CODE#34)
      
###      Antes de efectuarse un JOIN INNER donde hace el procesamiento a menos datos.
      
###      El filtro lo podemos observar en:
      
##      == Optimized Logical Plan ==
    Sort [total_vuelosATL_LAX#280L DESC NULLS LAST], true
    +- Aggregate [AIRLINE#35], [AIRLINE#35, count(FLIGHT_NUMBER#53) AS total_vuelosATL_LAX#280L]
     +- Project [FLIGHT_NUMBER#53, AIRLINE#35]
      +- Join Inner, (AIRLINE#52 = IATA_CODE#34)
         :- Project [AIRLINE#52, FLIGHT_NUMBER#53]
####       :  +- Filter ((((isnotnull(ORIGIN_AIRPORT#55) && isnotnull(DESTINATION_AIRPORT#56)) && (ORIGIN_AIRPORT#55 = ATL)) && (DESTINATION_AIRPORT#56 = LAX)) && isnotnull(AIRLINE#52))
         :     +- Relation[YEAR#48,MONTH#49,DAY#50,DAY_OF_WEEK#51,AIRLINE#52,FLIGHT_NUMBER#53,TAIL_NUMBER#54,ORIGIN_AIRPORT#55,DESTINATION_AIRPORT#56,SCHEDULED_DEPARTURE#57,DEPARTURE_TIME#58,DEPARTURE_DELAY#59,TAXI_OUT#60,WHEELS_OFF#61,SCHEDULED_TIME#62,ELAPSED_TIME#63,AIR_TIME#64,DISTANCE#65,WHEELS_ON#66,TAXI_IN#67,SCHEDULED_ARRIVAL#68,ARRIVAL_TIME#69,ARRIVAL_DELAY#70,DIVERTED#71,... 7 more fields] csv
####      +- Filter isnotnull(IATA_CODE#34)
            +- Relation[IATA_CODE#34,AIRLINE#35] csv
            
###            Sea filtrando los valores no nulos en dos momentos que hacen mayor efectividad. 


***
### RPTA b) :

### El tipo de join fisico que se realiza es el de , BroadcastHashJoin, y lo podemos observa en la etapa de Physical plan generados por Catalys.
    == Physical Plan ==
    *(4) Sort [total_vuelosATL_LAX#280L DESC NULLS LAST], true, 0
    +- Exchange rangepartitioning(total_vuelosATL_LAX#280L DESC NULLS LAST, 200)
     +- *(3) HashAggregate(keys=[AIRLINE#35], functions=[count(FLIGHT_NUMBER#53)], output=[AIRLINE#35, total_vuelosATL_LAX#280L])
      +- Exchange hashpartitioning(AIRLINE#35, 200)
         +- *(2) HashAggregate(keys=[AIRLINE#35], functions=[partial_count(FLIGHT_NUMBER#53)], output=[AIRLINE#35, count#286L])
            +- *(2) Project [FLIGHT_NUMBER#53, AIRLINE#35]
####               +- *(2) BroadcastHashJoin [AIRLINE#52], [IATA_CODE#34], Inner, BuildRight
                  :- *(2) Project [AIRLINE#52, FLIGHT_NUMBER#53]
                  :  +- *(2) Filter ((((isnotnull(ORIGIN_AIRPORT#55) && isnotnull(DESTINATION_AIRPORT#56)) && (ORIGIN_AIRPORT#55 = ATL)) && (DESTINATION_AIRPORT#56 = LAX)) &