In [1]:
sc

In [2]:
spark

#### a) Create a new Spark Session with new SparkConfig

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, HiveContext

In [4]:
sc.stop()

In [5]:
config = SparkConf().setAppName('Flights Delay Analysis').setMaster('local[4]')
sc = SparkContext.getOrCreate(conf=config)

In [6]:
#Spark integration with Hive with Spark Session
spark = (SparkSession.builder.appName("pyspark-hive-integration")
         .config('spark.sql.warehouse.dir','/user/hive/warehouse/')
        .enableHiveSupport().getOrCreate())

In [7]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|  banking_db|
|     default|
+------------+



In [8]:
spark

#### b) Create new instance of Spark SQL session and define new DataFrame using Flights_Delay.csv dataset.

In [9]:
flights_df = spark.read.csv("file:///home/hadoop/Downloads/Flights_Delay.csv", header=True, inferSchema=True)

#### Create table Spark HIVE table flights_table

In [10]:
flights_df.createOrReplaceTempView('flights_table')

#### d.Show Top 10 Rows

In [11]:

spark.sql("select * from flights_table").show(10)

+---+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
| ID|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+---+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+-----------

In [12]:
#d.Describe the table
#spark.sql("EXEC sp_help N'flights_table'").show
flights_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true

#### e) Apply Query performance optimization techniques like – creating Partitioning DataFrame by
#### a specific column, parquet data, caching, predicate pushdown methods etc.

In [13]:
partitioned_df = flights_df.repartition(3)

In [14]:
#partitioned_df.write.parquet("/FlightPartition/")

In [15]:
flights_df.cache()

DataFrame[ID: int, YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

In [16]:
#Persistance of dataframe with a specific storage level
from pyspark import StorageLevel
flights_df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[ID: int, YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

#### f) Average arrival delay caused by airlines

In [17]:
spark.sql("select AIRLINE , round(avg(ARRIVAL_DELAY),2) as average_arrival_delay  from flights_table group by AIRLINE having average_arrival_delay > 1").show()

+-------+---------------------+
|AIRLINE|average_arrival_delay|
+-------+---------------------+
|     UA|                  6.7|
|     NK|                14.21|
|     AA|                 8.39|
|     EV|                10.88|
|     B6|                13.96|
|     DL|                 2.81|
|     OO|                10.15|
|     F9|                 24.1|
|     US|                 5.98|
|     MQ|                19.23|
|     HA|                 4.07|
|     VX|                 5.13|
|     WN|                  3.7|
+-------+---------------------+



#### g) Days of months with respected to average of arrival delays

In [18]:
spark.sql("select DAY,round(avg(ARRIVAL_DELAY),2) as average_arrival_delay from flights_table group by DAY order by DAY").show()

+---+---------------------+
|DAY|average_arrival_delay|
+---+---------------------+
|  1|                14.81|
|  2|                15.05|
|  3|                18.14|
|  4|                17.16|
|  5|                16.24|
|  6|                10.61|
|  7|                 2.83|
|  8|                 5.23|
|  9|                 4.42|
| 10|                -0.05|
| 11|                 3.99|
| 12|                11.25|
| 13|                 3.38|
| 14|                 1.33|
| 15|                 2.97|
| 16|                 9.12|
| 17|                 8.76|
| 18|                 3.57|
| 19|                 1.63|
| 20|                 3.88|
+---+---------------------+
only showing top 20 rows



#### h) Arrange weekdays with respect to the average arrival delays caused

In [19]:
spark.sql("select DAY_OF_WEEK , round(avg(ARRIVAL_DELAY),2) as average_arrival_delay from flights_table group by DAY_OF_WEEK order by DAY_OF_WEEK").show()

+-----------+---------------------+
|DAY_OF_WEEK|average_arrival_delay|
+-----------+---------------------+
|          1|                10.81|
|          2|                 8.03|
|          3|                 5.59|
|          4|                 7.17|
|          5|                 6.01|
|          6|                 4.89|
|          7|                10.11|
+-----------+---------------------+



#### i) Arrange Days of month as per cancellations done in Descending

In [20]:
spark.sql("select DAY,count(CANCELLED) as cancellation_done from flights_table group by DAY order by cancellation_done desc").show()

+---+-----------------+
|DAY|cancellation_done|
+---+-----------------+
|  4|             2660|
|  2|             2644|
|  9|             2620|
|  5|             2619|
|  6|             2596|
|  3|             2481|
|  8|             2403|
|  7|             2267|
|  1|             2248|
| 10|             1813|
| 13|             1788|
| 16|             1724|
| 23|             1712|
| 26|             1700|
| 20|             1700|
| 27|             1678|
| 22|             1671|
| 18|             1663|
| 12|             1655|
| 19|             1654|
+---+-----------------+
only showing top 20 rows



#### j) Find Top 10 busiest airports with respect to day of week

In [21]:
spark.sql("select DAY_OF_WEEK from flights_table group by DAY_OF_WEEK \
          limit 10").show()

+-----------+
|DAY_OF_WEEK|
+-----------+
|          1|
|          6|
|          3|
|          5|
|          4|
|          7|
|          2|
+-----------+



#### k) Finding airlines that make the maximum number of cancellations    

In [22]:
spark.sql("select AIRLINE,COUNT(CANCELLED) as cancellation from flights_table group by AIRLINE order by cancellation desc limit 1").show()

+-------+------------+
|AIRLINE|cancellation|
+-------+------------+
|     WN|       11738|
+-------+------------+



#### l) Find and order airlines in descending that make the most number of diversions

In [23]:
spark.sql("select AIRLINE,COUNT(DIVERTED) as total_diversion FROM flights_table \
          group by AIRLINE ORDER BY total_diversion desc ").show()

+-------+---------------+
|AIRLINE|total_diversion|
+-------+---------------+
|     WN|          11738|
|     DL|           7989|
|     EV|           5916|
|     OO|           5708|
|     AA|           5250|
|     UA|           4701|
|     US|           3925|
|     MQ|           3502|
|     B6|           2548|
|     AS|           1586|
|     NK|           1048|
|     F9|            794|
|     HA|            722|
|     VX|            573|
+-------+---------------+



#### m) Finding days of month that see the most number of diversion

In [24]:
spark.sql("select DAY, count(DIVERTED) as most_diversion from flights_table \
           group by DAY order by most_diversion desc").show()

+---+--------------+
|DAY|most_diversion|
+---+--------------+
|  4|          2660|
|  2|          2644|
|  9|          2620|
|  5|          2619|
|  6|          2596|
|  3|          2481|
|  8|          2403|
|  7|          2267|
|  1|          2248|
| 10|          1813|
| 13|          1788|
| 16|          1724|
| 23|          1712|
| 26|          1700|
| 20|          1700|
| 27|          1678|
| 22|          1671|
| 18|          1663|
| 12|          1655|
| 19|          1654|
+---+--------------+
only showing top 20 rows



#### n) Calculating mean and standard deviation of departure delay for all flights in minutes

In [25]:
spark.sql("select round(avg(DEPARTURE_DELAY),2) as mean ,round(stddev(DEPARTURE_DELAY),2) as standard_deviation from flights_table group by AIRLINE").show()

+-----+------------------+
| mean|standard_deviation|
+-----+------------------+
|14.29|             36.37|
|15.58|              46.0|
| 11.5|             50.59|
|11.53|              40.6|
|16.07|             44.45|
| 9.94|             44.58|
| 11.6|              41.9|
|23.51|             55.22|
| 7.81|             29.95|
|17.07|             43.47|
| 1.18|             30.28|
| 2.34|             29.15|
| 9.86|             35.18|
|10.12|             28.66|
+-----+------------------+



#### o) Calculating mean and standard deviation of arrival delay for all flights in minutes

In [26]:
spark.sql("select round(avg(ARRIVAL_DELAY),2) as mean ,round(stddev(ARRIVAL_DELAY),2) as standard_deviation from flights_table group by AIRLINE").show()

+-----+------------------+
| mean|standard_deviation|
+-----+------------------+
|  6.7|             38.97|
|14.21|             47.58|
| 8.39|             53.57|
|10.88|             43.39|
|13.96|             47.64|
| 2.81|             46.96|
|10.15|             43.76|
| 24.1|             56.27|
| 5.98|             34.11|
|19.23|              46.4|
| 4.07|             32.38|
|-1.53|             31.37|
| 5.13|             40.87|
|  3.7|             31.23|
+-----+------------------+



#### p) Finding all diverted Route from a source to destination Airport &amp; which route is the most diverted

In [50]:
spark.sql("SELECT ORIGIN_AIRPORT, DESTINATION_AIRPORT, COUNT(DIVERTED) AS diversion_count \
FROM flights_table WHERE DIVERTED = 1 GROUP BY ORIGIN_AIRPORT, DESTINATION_AIRPORT order by diversion_count desc").show()

+--------------+-------------------+---------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|diversion_count|
+--------------+-------------------+---------------+
|           HOU|                DAL|              2|
|           PHL|                SAN|              2|
|           STT|                PHL|              2|
|           IAH|                ASE|              2|
|           TPA|                LGA|              2|
|           JFK|                EGE|              2|
|           JFK|                SEA|              2|
|           ORD|                ASE|              2|
|           CLT|                IAH|              2|
|           CAK|                LGA|              1|
|           FLL|                BWI|              1|
|           SFO|                BOI|              1|
|           KOA|                SFO|              1|
|           BOS|                LAS|              1|
|           SLC|                SUN|              1|
|           EWR|                STL|          

#### q) Finding AIRLINES with its total flight count, total number of flights arrival delayed by more
#### than 30 Minutes, % of such flights delayed by more than 30 minutes when it is not
#### Weekends with minimum count of flights from Airlines by more than 10. Also Exclude some
#### of Airlines &#39;AK&#39;, &#39;HI&#39;, &#39;PR&#39;, &#39;VI&#39; and arrange output in descending order by % of such #### count of
#### flights.

In [28]:
spark.sql("SELECT AIRLINE,COUNT(*) AS total_flights,\
SUM(CASE WHEN ARRIVAL_DELAY > 30 AND DAY_OF_WEEK NOT IN (6, 7) THEN 1 ELSE 0 END) AS delayed_flights,\
round(CAST(SUM(CASE WHEN ARRIVAL_DELAY > 30 AND DAY_OF_WEEK NOT IN (6, 7) THEN 1 ELSE 0 END) AS float) / COUNT(*) * 100,2) \
AS delay_percentage FROM flights_table WHERE AIRLINE NOT IN ('AK', 'HI', 'PR', 'VI') \
GROUP BY AIRLINE HAVING COUNT(*) > 10 order by delay_percentage desc").show()


+-------+-------------+---------------+----------------+
|AIRLINE|total_flights|delayed_flights|delay_percentage|
+-------+-------------+---------------+----------------+
|     F9|          794|            139|           17.51|
|     MQ|         3502|            601|           17.16|
|     B6|         2548|            360|           14.13|
|     NK|         1048|            139|           13.26|
|     EV|         5916|            665|           11.24|
|     OO|         5708|            633|           11.09|
|     UA|         4701|            497|           10.57|
|     AA|         5250|            484|            9.22|
|     VX|          573|             47|             8.2|
|     US|         3925|            310|             7.9|
|     DL|         7989|            592|            7.41|
|     WN|        11738|            869|             7.4|
|     AS|         1586|             64|            4.04|
|     HA|          722|             23|            3.19|
+-------+-------------+--------

#### r) Finding AIRLINES with its total flight count with total number of flights departure delayed by
##### less than 30 Minutes, % of such flights delayed by less than 30 minutes when it is Weekends
##### with minimum count of flights from Airlines by more than 10. Also Exclude some of Airlines
##### AK,HI,PR,VI  and arrange output in descending order by % of such count of  flights.  

In [29]:
spark.sql("SELECT AIRLINE,COUNT(*) AS total_flights,\
SUM(CASE WHEN DEPARTURE_DELAY < 30 AND DAY_OF_WEEK IN (6, 7) THEN 1 ELSE 0 END) AS delayed_flights,\
round(CAST(SUM(CASE WHEN DEPARTURE_DELAY < 30 AND DAY_OF_WEEK IN (6, 7) THEN 1 ELSE 0 END) AS float) / COUNT(*) * 100,2) \
AS delay_percentage FROM flights_table WHERE AIRLINE NOT IN ('AK', 'HI', 'PR', 'VI') \
GROUP BY AIRLINE HAVING COUNT(*) > 10 order by delay_percentage desc").show()


+-------+-------------+---------------+----------------+
|AIRLINE|total_flights|delayed_flights|delay_percentage|
+-------+-------------+---------------+----------------+
|     AS|         1586|            412|           25.98|
|     HA|          722|            179|           24.79|
|     NK|         1048|            253|           24.14|
|     AA|         5250|           1214|           23.12|
|     DL|         7989|           1814|           22.71|
|     VX|          573|            129|           22.51|
|     WN|        11738|           2636|           22.46|
|     US|         3925|            867|           22.09|
|     OO|         5708|           1244|           21.79|
|     B6|         2548|            543|           21.31|
|     EV|         5916|           1203|           20.33|
|     UA|         4701|            950|           20.21|
|     MQ|         3502|            622|           17.76|
|     F9|          794|            133|           16.75|
+-------+-------------+--------

#### s) When is the best time of day/day of week/time of a year to fly with minimum delays?

In [49]:
spark.sql("SELECT DAY_OF_WEEK,round(AVG(DEPARTURE_DELAY),2) as DepDelay,round(AVG(ARRIVAL_DELAY),2) AS ArrivalDelay \
FROM flights_table \
WHERE DEPARTURE_DELAY IS NOT NULL and ARRIVAL_DELAY IS NOT NULL \
GROUP BY DAY_OF_WEEK order by DepDelay,ArrivalDelay ").show()

+-----------+--------+------------+
|DAY_OF_WEEK|DepDelay|ArrivalDelay|
+-----------+--------+------------+
|          3|    9.11|        5.59|
|          6|    9.63|        4.89|
|          5|   10.06|        6.01|
|          4|   10.82|        7.17|
|          2|   11.23|        8.03|
|          7|   13.61|       10.11|
|          1|   14.09|       10.81|
+-----------+--------+------------+



#### t) Which airlines are best airline to travel considering number of cancellations, arrival,
#### departure delays and all reasons affecting performance of airline industry.

In [44]:
spark.sql(" SELECT AIRLINE,\
        CAST(SUM(CANCELLED) AS float) / COUNT(*) AS CancellationRate, \
        round(AVG(CASE WHEN ARRIVAL_DELAY IS NOT NULL THEN ARRIVAL_DELAY ELSE 0 END),2) AS AvgArrivalDelay, \
        round(AVG(CASE WHEN DEPARTURE_DELAY IS NOT NULL THEN DEPARTURE_DELAY ELSE 0 END),2) AS AvgDepartureDelay,\
1 - (AVG(CASE WHEN ELAPSED_TIME IS NOT NULL THEN ELAPSED_TIME ELSE 0 END) / AVG(SCHEDULED_TIME)) AS OnTimePerformance,\
CAST(SUM(DIVERTED) AS float) / COUNT(*) AS DiversionRate \
FROM Flights_table \
WHERE CANCELLED IS NOT NULL AND ARRIVAL_DELAY IS NOT NULL AND DEPARTURE_DELAY IS NOT NULL \
GROUP BY AIRLINE").show()

+-------+----------------+---------------+-----------------+--------------------+-------------+
|AIRLINE|CancellationRate|AvgArrivalDelay|AvgDepartureDelay|   OnTimePerformance|DiversionRate|
+-------+----------------+---------------+-----------------+--------------------+-------------+
|     UA|             0.0|            6.7|            14.17| 0.03801405586052342|          0.0|
|     NK|             0.0|          14.21|            15.53|0.008174748040650881|          0.0|
|     AA|             0.0|           8.39|            11.41|0.017303799929101582|          0.0|
|     EV|             0.0|          10.88|            11.42|0.005404058690049007|          0.0|
|     B6|             0.0|          13.96|            15.89|0.011217511592565232|          0.0|
|     DL|             0.0|           2.81|             9.92| 0.04750602378645363|          0.0|
|     OO|             0.0|          10.15|            11.47|0.013052100816226986|          0.0|
|     F9|             0.0|           24.