In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder\
                    .appName("Analysing Airline Data")\
                    .getOrCreate()

In [3]:
from pyspark.sql.types import Row
from datetime import datetime

In [4]:
airlinesPath = './Datasets/airlines.csv'
flightsPath = './Datasets/flights.csv'
airportsPath = './Datasets/airports.csv'

In [5]:
airlines = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load(airlinesPath)

In [6]:
airlines.show()

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
|19036|Golden West Airli...|
|19037|Puerto Rico Intl ...|
|19038|Air America Inc.:...|
|19039|Swift Aire Lines ...|
|19040|American Central ...|
|19041|Valdez Airlines: VEZ|
|19042|Southeast Alaska ...|
|19043|Altair Airlines I...|
|19044|Chitina Air Servi...|
|19045|Marco Island Airw...|
|19046|Caribbean Air Ser...|
|19047|Sundance Airlines...|
|19048|Seair Alaska Airl...|
|19049|Southeast Airline...|
|19050|Alaska Aeronautic...|
+-----+--------------------+
only showing top 20 rows



In [7]:
sqlcontext = SQLContext(sc)

In [26]:
airlines.createOrReplaceTempView('airlines')

In [27]:
airlines = sqlcontext.sql('Select * from airlines')
airlines.columns

['Code', 'Description']

In [28]:
airlines.show(5)

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
+-----+--------------------+
only showing top 5 rows



In [22]:
flights = spark.read\
               .format('csv')\
               .option('header', 'true')\
               .load(flightsPath) 

In [31]:
flights.createOrReplaceTempView('flights')
flights.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [32]:
flights.show(5)

+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|      date|airlines|flight_number|origin|destination|departure|departure_delay|arrival|arrival_delay|air_time|distance|
+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|2014-04-01|   19805|            1|   JFK|        LAX|     0854|          -6.00|   1217|         2.00|  355.00| 2475.00|
|2014-04-01|   19805|            2|   LAX|        JFK|     0944|          14.00|   1736|       -29.00|  269.00| 2475.00|
|2014-04-01|   19805|            3|   JFK|        LAX|     1224|          -6.00|   1614|        39.00|  371.00| 2475.00|
|2014-04-01|   19805|            4|   LAX|        JFK|     1240|          25.00|   2028|       -27.00|  264.00| 2475.00|
|2014-04-01|   19805|            5|   DFW|        HNL|     1300|          -5.00|   1650|        15.00|  510.00| 3784.00|
+----------+--------+-----------

In [33]:
flights.count(), airlines.count()

(476881, 1579)

In [34]:
flights_count = spark.sql('select count(*) from flights')
airlines_count = spark.sql('select count(*) from airlines')

In [35]:
flights_count, airlines_count

(DataFrame[count(1): bigint], DataFrame[count(1): bigint])

In [36]:
flights_count.collect()[0][0], airlines_count.collect()[0][0]

(476881, 1579)

In [37]:
total_distance_df = spark.sql('select distance from flights')\
                         .agg({'distance':'sum'})\
                         .withColumnRenamed('sum(distance)', 'total_distance')

In [38]:
total_distance_df.show()

+--------------+
|total_distance|
+--------------+
|  3.79052917E8|
+--------------+



In [47]:
all_delays_2012 = spark.sql(
'select date, airlines, flight_number, departure_delay ' +
'from flights where departure_delay > 0 and year(date) = 2012')

In [48]:
all_delays_2012.show(5)

+----+--------+-------------+---------------+
|date|airlines|flight_number|departure_delay|
+----+--------+-------------+---------------+
+----+--------+-------------+---------------+



In [50]:
all_delays_2014 = spark.sql(
'select date, airlines, flight_number, departure_delay ' +
'from flights where departure_delay > 0 and year(date) = 2014')

all_delays_2014.show()

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-01|   19805|            2|          14.00|
|2014-04-01|   19805|            4|          25.00|
|2014-04-01|   19805|            6|         126.00|
|2014-04-01|   19805|            7|         125.00|
|2014-04-01|   19805|            8|           4.00|
|2014-04-01|   19805|           10|          21.00|
|2014-04-01|   19805|           14|           5.00|
|2014-04-01|   19805|           16|          17.00|
|2014-04-01|   19805|           18|         112.00|
|2014-04-01|   19805|           20|         135.00|
|2014-04-01|   19805|           24|          24.00|
|2014-04-01|   19805|           27|         136.00|
|2014-04-01|   19805|           30|          48.00|
|2014-04-01|   19805|           32|           3.00|
|2014-04-01|   19805|           33|           4.00|
|2014-04-01|   19805|           34|           8.00|
|2014-04-01|

In [51]:
all_delays_2014.createOrReplaceTempView('all_delays')


In [55]:
all_delays_2014.orderBy(all_delays_2014.departure_delay.desc()).show()

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-20|   20398|         3001|          99.00|
|2014-04-24|   19393|         2261|          99.00|
|2014-04-22|   20398|         2981|          99.00|
|2014-04-18|   20409|         1205|          99.00|
|2014-04-22|   20355|         1994|          99.00|
|2014-04-18|   20366|         4971|          99.00|
|2014-04-22|   19393|          583|          99.00|
|2014-04-18|   19977|          560|          99.00|
|2014-04-23|   19805|          197|          99.00|
|2014-04-18|   19393|          108|          99.00|
|2014-04-23|   19930|           70|          99.00|
|2014-04-17|   19393|          661|          99.00|
|2014-04-23|   19393|         4551|          99.00|
|2014-04-19|   20355|          839|          99.00|
|2014-04-23|   19393|          344|          99.00|
|2014-04-20|   19790|         2043|          99.00|
|2014-04-23|

In [56]:
delay_count = spark.sql('select count(departure_delay) from all_delays')

In [57]:
delay_count.show()

+----------------------+
|count(departure_delay)|
+----------------------+
|                179015|
+----------------------+



In [58]:
delay_count()

TypeError: 'DataFrame' object is not callable

In [59]:
delay_count.collect()[0][0]

179015

In [62]:
delay_percent = delay_count.collect()[0][0] / flights_count.collect()[0][0] * 100
delay_percent

37.53871510922012

In [65]:
delay_per_airline = spark.sql ('select airlines, departure_delay from flights')\
                         .groupBy('airlines')\
                         .agg({'departure_delay' : 'avg'})\
                         .withColumnRenamed('avg(departure_delay)', 'departure_delay')   

In [66]:
delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows

