In [2]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [3]:
spark = pyspark.sql.SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
df_flight = spark.read.load("flight-delays/flights.csv",format="csv", sep=",", inferSchema="true", header="true")

In [5]:
df_airlines = spark.read.load("flight-delays/airlines.csv",format="csv", sep=",", inferSchema="true", header="true")

In [6]:
df_airports = spark.read.load("flight-delays/airports.csv",format="csv", sep=",", inferSchema="true", header="true")

In [7]:
df_flight.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

In [8]:
df_airlines.columns

['IATA_CODE', 'AIRLINE']

In [10]:
df_airports.columns

['IATA_CODE', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE']

In [7]:
df_airlines.createOrReplaceTempView("airlines")
df_flight.createOrReplaceTempView("flights")
df_airports.createOrReplaceTempView("airports")

TOP 5 DE AEROPUERTOS CON MAS CANCELACIONES

In [9]:
query = 'SELECT airports.AIRPORT,COUNT(*)\
                as total_cancelled\
                FROM flights INNER JOIN airports\
                ON airports.IATA_CODE = flights.ORIGIN_AIRPORT\
                WHERE CANCELLED = 1\
                GROUP BY  airports.AIRPORT\
                ORDER BY total_cancelled DESC\
                LIMIT 5'
                       
df_sql = spark.sql(query);

spark.sql(query).show()

+--------------------+---------------+
|             AIRPORT|total_cancelled|
+--------------------+---------------+
|Chicago O'Hare In...|           8548|
|Dallas/Fort Worth...|           6254|
|LaGuardia Airport...|           4531|
|Newark Liberty In...|           3110|
|Gen. Edward Lawre...|           2654|
+--------------------+---------------+



NOMBRE DE AEROLINEAS DE ATLANTA A LOS ANGELES Y CANTIDAD DE VUELOS

In [30]:
query1 = 'SELECT airlines.AIRLINE, COUNT(flights.FLIGHT_NUMBER) as total_flights\
                        FROM flights INNER JOIN airlines\
                        ON flights.AIRLINE = airlines.IATA_CODE\
                        WHERE ORIGIN_AIRPORT = "ATL" and DESTINATION_AIRPORT = "LAX"\
                        GROUP BY airlines.AIRLINE\
                        ORDER BY total_flights DESC'

df_sql = spark.sql(query1);

spark.sql(query1).show()

+--------------------+-------------+
|             AIRLINE|total_flights|
+--------------------+-------------+
|Delta Air Lines Inc.|         3624|
|Southwest Airline...|          962|
|American Airlines...|          765|
|Frontier Airlines...|          215|
|    Spirit Air Lines|          103|
+--------------------+-------------+



ANALISIS DEL QUERY PLAN DEL PUNTO ANTERIOR

In [31]:
spark.sql(query1).explain(True)

== Parsed Logical Plan ==
'Sort ['total_flights DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT('flights.FLIGHT_NUMBER) AS total_flights#273]
   +- 'Filter (('ORIGIN_AIRPORT = ATL) && ('DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, total_flights: bigint
Sort [total_flights#273L DESC NULLS LAST], true
+- Aggregate [AIRLINE#83], [AIRLINE#83, count(FLIGHT_NUMBER#15) AS total_flights#273L]
   +- Filter ((ORIGIN_AIRPORT#17 = ATL) && (DESTINATION_AIRPORT#18 = LAX))
      +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDUL

Se produce un filter pushdown en la etapa optimized logical plan.
En la etapa Physical plan se realiza un BroadcastHashJoin.