In [1]:
import pyspark
from pyspark.sql import *
try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')


In [25]:
spark = SparkSession.builder.appName("finger3-magali-marijuan").getOrCreate()

flights = spark.read.load("data/flights.csv",format="csv", sep=",", inferSchema="true", header="true")
airlines = spark.read.load("data/airlines.csv",format="csv", sep=",", inferSchema="true", header="true")
airports = spark.read.load("data/airports.csv",format="csv", sep=",", inferSchema="true", header="true")
# tengo que registrar el df:flights / airlines como una vista SQL temporal
flights.createOrReplaceTempView("flights")
airlines.createOrReplaceTempView("airlines")
airports.createOrReplaceTempView("airports")


**Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones**

In [28]:
flightsCanceladosYOrigen = spark.sql('SELECT ORIGIN_AIRPORT FROM FLIGHTS WHERE CANCELLED >0')
print("La solución es:")
#resuelvo utilizando elementos de la api. 
flightsCanceladosYOrigen = flightsCanceladosYOrigen.groupBy("ORIGIN_AIRPORT").count().orderBy("COUNT",ascending = False)
flightsCanceladosYOrigen.createOrReplaceTempView("flightsCanceladosYOrigen")
flightsCanceladosYOrigen = spark.sql('FROM flightsCanceladosYOrigen JOIN airports WHERE flightsCanceladosYOrigen.ORIGIN_AIRPORT = airports.IATA_CODE')
flightsCanceladosYOrigen.createOrReplaceTempView("flightsCanceladosYOrigen")
flightsCanceladosYorigen = spark.sql('SELECT AIRPORT, count FROM flightsCanceladosYorigen').show(5)
#flightsCanceladosYOrigen.show(5)

La solución es:
+--------------------+-----+
|             AIRPORT|count|
+--------------------+-----+
|Chicago O'Hare In...| 8548|
|Dallas/Fort Worth...| 6254|
|LaGuardia Airport...| 4531|
|Newark Liberty In...| 3110|
|Gen. Edward Lawre...| 2654|
+--------------------+-----+
only showing top 5 rows



In [None]:
flightsCanceladosYOrigen = spark.sql()

**Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX)
ordenadas cantidad de vuelos**

In [68]:
flightAerolineas = spark.sql('SELECT AIRLINE AS AIR FROM FLIGHTS WHERE ORIGIN_AIRPORT == "ATL" AND DESTINATION_AIRPORT == "LAX"')


print("La solución es:")
flightAerolineas = flightAerolineas.groupBy("AIR").count().orderBy("COUNT",ascending = False)
flightAerolineas.createOrReplaceTempView('flightAerolineas')
flightAerolineas = spark.sql('FROM flightAerolineas JOIN airlines WHERE flightAerolineas.AIR = airlines.IATA_CODE')
flightAerolineas.createOrReplaceTempView('flightAerolineas')
flightAerolineas = spark.sql('SELECT AIRLINE, COUNT FROM flightAerolineas')
flightAerolineas.show()

La solución es:
+--------------------+-----+
|             AIRLINE|COUNT|
+--------------------+-----+
|Delta Air Lines Inc.| 3624|
|Southwest Airline...|  962|
|American Airlines...|  765|
|Frontier Airlines...|  215|
|    Spirit Air Lines|  103|
+--------------------+-----+



**Mostrar y Analizar el Query Plan del punto 2 para entender las optimizaciones que realiza Catalyst Optimizer, contestando las siguientes preguntas:**
    Catalyst Optimizer es el componente del framework de ejecución que se encarga de interpretar y optimizar, la query planteada, para despues llevarla a código en ejecución en el API de RDD.
    -¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?
    Si, se realiza filterpushdown antes del join. Para hacer un join con menos cantidad de datos. 
    -¿Que tipo de Join Físico se realiza? ¿En qué etapa?


In [69]:
query = 'FROM flightsCanceladosYOrigen JOIN airports WHERE flightsCanceladosYOrigen.ORIGIN_AIRPORT = airports.IATA_CODE'
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Filter ('flightsCanceladosYOrigen.ORIGIN_AIRPORT = 'airports.IATA_CODE)
+- 'Join Inner
   :- 'UnresolvedRelation `flightsCanceladosYOrigen`
   +- 'UnresolvedRelation `airports`

== Analyzed Logical Plan ==
ORIGIN_AIRPORT: string, count: bigint, IATA_CODE: string, AIRPORT: string, CITY: string, STATE: string, COUNTRY: string, LATITUDE: double, LONGITUDE: double, IATA_CODE: string, AIRPORT: string, CITY: string, STATE: string, COUNTRY: string, LATITUDE: double, LONGITUDE: double
Filter (ORIGIN_AIRPORT#1293 = IATA_CODE#2150)
+- Join Inner
   :- SubqueryAlias flightscanceladosyorigen
   :  +- Filter (ORIGIN_AIRPORT#1293 = IATA_CODE#1372)
   :     +- Join Inner
   :        :- SubqueryAlias flightscanceladosyorigen
   :        :  +- Sort [COUNT#1503L DESC NULLS LAST], true
   :        :     +- Aggregate [ORIGIN_AIRPORT#1293], [ORIGIN_AIRPORT#1293, count(1) AS count#1503L]
   :        :        +- Project [ORIGIN_AIRPORT#1293]
   :        :           +- Filter (CANCE

**Lo que se obtuvo de la linea anterior es:**
...
== Physical Plan ==
*(5) BroadcastHashJoin [ORIGIN_AIRPORT#1293], [IATA_CODE#2150], Inner, BuildRight
:- *(5) BroadcastHashJoin [ORIGIN_AIRPORT#1293], [IATA_CODE#1372], Inner, BuildRight
:  :- *(5) Sort [COUNT#1503L DESC NULLS LAST], true, 0
:  :  +- Exchange rangepartitioning(COUNT#1503L DESC NULLS LAST, 200)
:  :     +- *(2) HashAggregate(keys=[ORIGIN_AIRPORT#1293], functions=[count(1)], output=[ORIGIN_AIRPORT#1293, count#1503L])
...

Entonces el join que se utiliza es: **BroadcastHashJoin**, en la etapa del physical plan.