In [4]:
import os
os.environ['PYSPARK_PYTHON'] = 'python'

from pyspark.sql.functions import *
from pyspark.sql.types import *


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Join").getOrCreate()

In [25]:
flights = spark.read.csv("dataset/nyc_flights.csv", header = True, inferSchema = True)
airlines = spark.read.csv("dataset/nyc_airlines.csv", header = True, inferSchema = True)
airports = spark.read.csv("dataset/nyc_airports.csv", header = True, inferSchema = True)

In [39]:
flights.createOrReplaceTempView("flights_table")
airlines.createOrReplaceTempView("airlines_table")
airports.createOrReplaceTempView("airports_table")

In [43]:
query1 = "SELECT * FROM flights_table"
result = spark.sql(query1)
result.show()

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|     517|           515|        2|     830|           819|       11|     UA|  1545| N14228|   EWR| IAH|     227|    1400|   5|    15|2013-01-01 15:45:00|
|2013|    1|  1|     533|           529|        4|     850|           830|       20|     UA|  1714| N24211|   LGA| IAH|     227|    1416|   5|    29|2013-01-01 15:45:00|
|2013|    1|  1|     542|           540|        2|     923|           850|       33|     AA|  1141| N619AA|   JFK| MIA|     160|    1089|   5|    40|2

In [42]:
# Challenge 1: Which airline had the highest no. of delayed departures?
query2 = """
SELECT a.name,
       COUNT(*) AS delayed_departures
FROM flights_table f
LEFT JOIN airlines_table a
  ON f.carrier = a.carrier
WHERE try_cast(f.dep_delay AS INT) > 0
GROUP BY a.name
ORDER BY delayed_departures DESC
LIMIT 1;
"""

result2 = spark.sql(query2)
result2.show()



+--------------------+------------------+
|                name|delayed_departures|
+--------------------+------------------+
|United Air Lines ...|             27261|
+--------------------+------------------+



In [53]:
# Challange2: Which airport has flights arriving most early on average
query3 = """SELECT a.name,
            AVG(try_cast(f.arr_delay AS INT)) AS avg_arr_delay
            FROM flights_table f
            JOIN airports_table a on f.dest = a.faa
            WHERE try_cast(f.arr_delay AS INT) <= 0
            GROUP BY a.name
            ORDER BY avg_arr_delay ASC
            LIMIT 1;"""

result2 = spark.sql(query3).show()



+--------------------+-------------------+
|                name|      avg_arr_delay|
+--------------------+-------------------+
|Ted Stevens Ancho...|-27.333333333333332|
+--------------------+-------------------+



In [59]:
# Exercise: Find out the avg, min, and max air time when travelling from JFK to SEA airports.

query4 = """SELECT 
            AVG(TRY_CAST(air_time AS INT)) AS avg_air_time,
            MIN(TRY_CAST(air_time AS INT)) AS min_air_time,
            MAX(TRY_CAST(air_time AS INT)) AS max_air_time
            FROM flights_table
            WHERE origin = 'JFK' AND dest = 'SEA';"""
result = spark.sql(query4).show()

+-----------------+------------+------------+
|     avg_air_time|min_air_time|max_air_time|
+-----------------+------------+------------+
|329.3744578313253|         275|         389|
+-----------------+------------+------------+

