* The project Airline Analysis comprises of various Data Frame Operations, such as reading, filtering, aggregations, sorting, joining, windowing functions, writing etc.
* This project is  based on 2 datasets namely, airlines flights' details data and airport codes data. The datasets are in huge volume hence, best suited to be a use case for spark application.
* The project demonstrates, various data frame operations in form of solutions to different problem scenarios, output of which could be used in futher analysis or future predictions.

#### Creating Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
    builder.\
    config('spark.ui.port', '0').\
    appName('Airlines Analysis').\
    master('yarn').\
    getOrCreate()

#### Reading Airlines dataset into a DF

In [3]:
airlines_data_path = '/user/monahadoop/airlines_all/airlines0821'

In [4]:
airport_codes_path = '/public/airlines_all/airport-codes'

In [5]:
airlinesSchema = spark.read.\
                format('csv').\
                option('header', True).\
                option('inferSchema', True).\
                load(airlines_data_path + '/part-00000*').\
                schema

In [11]:
type(airlinesSchema)

pyspark.sql.types.StructType

In [6]:
airlinesDF = spark.read.\
                format('csv').\
                schema(airlinesSchema).\
                option('header', True).\
                load(airlines_data_path)

In [6]:
airlinesDF.rdd.getNumPartitions() # executor on spark ui shows these many tasks

999

In [6]:
airlinesDF.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Ca

In [8]:
airlinesDF.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2001|    8|    

In [7]:
airportCodesDF = spark.read.\
                    format('csv').\
                    option('inferSchema', True).\
                    option('sep', '\t').\
                    option('header', True).\
                    load(airport_codes_path)

In [None]:
%%sh
hdfs dfs -ls /public/airlines_all/airport-codes

In [None]:
%%sh
hdfs dfs -cat /public/airlines_all/airport-codes/airport-codes-na.txt | more

In [7]:
airportCodesDF.show(10)

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+
only showing top 10 rows



#### DataFrame Transformations - Filtering, Aggregations, Sorting

#### Problem 1 
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival.
* Output should contain 3 columns - FlightCount, DepDelayedCount, ArrDelayedCount

#### Solution1

In [11]:
from pyspark.sql.functions import col, lit, count, sum, expr
airlinesOP = airlinesDF. \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       )

In [11]:
#airlinesOP.rdd.getNumPartitions()

1

In [12]:
#df = airlinesOP.repartition(16) # same effect ddin't improve anything

In [13]:
#df.rdd.getNumPartitions()

16

In [12]:
airlinesOP.show()

+-----------+---------------+---------------+
|FlightCount|DepDelayedCount|ArrDelayedCount|
+-----------+---------------+---------------+
| 1235347780|      522956034|      604003720|
+-----------+---------------+---------------+



#### Problem 2
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
* Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
* FlightDate should be of YYYY-MM-dd format.
* Data should be sorted in ascending order by flightDate

#### Solution2



In [13]:
airlinesDF.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [5]:
from pyspark.sql.functions import concat_ws, col, lpad, sum, expr, lit, count

df = airlinesDF.groupBy(concat_ws('-',col('Year'), lpad(col('Month'),2,'0'), lpad(col('DayofMonth'),2,'0')).alias('FlightDate')).\
    agg(count(lit(1)).alias('FlightCount'),
        sum(expr('CASE WHEN IsDepDelayed = "YES" THEN 1 ELSE 0 END')).alias('DepDelayedCount'),
        sum(expr('CASE WHEN IsArrDelayed = "YES" THEN 1 ELSE 0 END')).alias('ArrDelayedCount')
       ).\
    orderBy('FlightDate')

#df.printSchema()

df.show()

root
 |-- FlightDate: string (nullable = false)
 |-- FlightCount: long (nullable = false)
 |-- DepDelayedCount: long (nullable = true)
 |-- ArrDelayedCount: long (nullable = true)

+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2001-04-22|     167310|          84170|          90450|
|2001-11-16|     144780|          72330|          70920|
|1994-12-17|     134760|          84230|          73770|
|1987-11-11|     147780|          69790|          93380|
|1999-01-16|     129010|          51130|          51380|
|2000-11-13|     161850|          72380|          83530|
|2003-11-06|     185580|          66490|          94400|
|1994-03-01|     140920|          75260|          86500|
|2007-04-29|     199979|          67010|          72340|
|1999-11-18|     158410|          54000|          64080|
|2008-10-26|     176910|          66470|          67550|
|2008-11-19|     1825

#### Problem 3
Get all the flights which are departed late but arrived early (IsArrDelayed is NO).
* Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay
* FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime
* FlightCRSDepTime should be displayed using YYYY-MM-dd HH:mm format.
* Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay
* Also get the count of such flights.

#### Solution3

In [None]:
airlinesDF.printSchema()

In [7]:
airlinesDF.select('CRSDepTime', 'UniqueCarrier', 'FlightNum', 'Origin', 'Dest', 'DepDelay', 'ArrDelay').show(10)

+----------+-------------+---------+------+----+--------+--------+
|CRSDepTime|UniqueCarrier|FlightNum|Origin|Dest|DepDelay|ArrDelay|
+----------+-------------+---------+------+----+--------+--------+
|      1047|           AA|     1056|   MCI| ORD|       1|     -12|
|      1047|           AA|     1056|   MCI| ORD|      -4|     -23|
|      1047|           AA|     1056|   MCI| ORD|      -4|     -19|
|      1047|           AA|     1056|   MCI| ORD|      -2|     -23|
|      1047|           AA|     1056|   MCI| ORD|       0|     -14|
|      1047|           AA|     1056|   MCI| ORD|       0|     -19|
|      1047|           AA|     1056|   MCI| ORD|       7|       2|
|      1047|           AA|     1056|   MCI| ORD|       5|     -17|
|      1047|           AA|     1056|   MCI| ORD|      -2|     -17|
|      1047|           AA|     1056|   MCI| ORD|      35|      13|
+----------+-------------+---------+------+----+--------+--------+
only showing top 10 rows



In [9]:
#l = [(2008, 1, 23, 700),
#     (2008, 1, 10, 1855),
#    ]
#df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")
#df.show()

+----+-----+----------+-------+
|Year|Month|DayOfMonth|DepTime|
+----+-----+----------+-------+
|2008|    1|        23|    700|
|2008|    1|        10|   1855|
+----+-----+----------+-------+



In [9]:
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr

flightsFiltered = airlinesDF. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
    select(concat("Year", lit("-"), 
                  lpad("Month", 2, "0"), lit("-"), 
                  lpad("DayOfMonth", 2, "0"), lit(" "),
                  lpad("CRSDepTime", 4, "0")
                 ).alias("FlightCRSDepTime"),
           "UniqueCarrier", "FlightNum", "Origin", 
           "Dest", "DepDelay", "ArrDelay"
          ). \
    orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")). \
    show()

+----------------+-------------+---------+------+----+--------+--------+
|FlightCRSDepTime|UniqueCarrier|FlightNum|Origin|Dest|DepDelay|ArrDelay|
+----------------+-------------+---------+------+----+--------+--------+
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0015|           TW|      545|   DFW| SAT|       5|       0|
| 1987-10-01 0030|           HP|      170|   LAS| O

In [22]:
from pyspark.sql.functions import concat_ws, col, lpad, date_format, substring

airlinesDF.filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
            select(date_format(concat_ws(' ',(concat_ws('-', col('Year'),
                     lpad(col('Month'), 2 ,'0'),
                     lpad(col('DayOfMonth'), 2, '0'))),
                     (concat_ws(':', substring(lpad(col('CRSDepTime'), 4, "0"), 0, 2), 
                                substring(col('CRSDepTime'), -2, 2)))
                    ), 'YYYY-MM-dd HH:mm').alias('FlightCRSDepTime'), 'UniqueCarrier', 'FlightNum', 'Origin', 'Dest', 'DepDelay', 'ArrDelay'
          ). \
            where("FlightCRSDepTime is Not NULL"). \
orderBy('FlightCRSDepTime').show()

+----------------+-------------+---------+------+----+--------+--------+
|FlightCRSDepTime|UniqueCarrier|FlightNum|Origin|Dest|DepDelay|ArrDelay|
+----------------+-------------+---------+------+----+--------+--------+
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:30|           HP|      170|   LAS| O

In [None]:
#from pyspark.sql.functions import col
#airlinesDF.\
#    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
#    select('CRSDepTime','UniqueCarrier', 'FlightNum', 'Origin', 'Dest', 'DepDelay', 'ArrDelay').\
#    orderBy('CRSDepTime', (col('DepDelay')-col('ArrDelay'))).\
#    show()

In [27]:
from pyspark.sql.functions import concat_ws, col, lpad, date_format, substring
airlinesDF.\
    filter((col('IsArrDelayed') == 'NO') & (col('IsDepDelayed') == 'YES')).\
    select(date_format(concat_ws(' ',(concat_ws('-', col('Year'),
                     lpad(col('Month'), 2 ,'0'),
                     lpad(col('DayOfMonth'), 2, '0'))),
                     (concat_ws(':', substring(lpad(col('CRSDepTime'), 4, "0"), 0, 2), 
                                substring(col('CRSDepTime'), -2, 2)))
                    ), 'YYYY-MM-dd HH:mm').alias('FlightCRSDepTime'),
           'UniqueCarrier', 'FlightNum', 'Origin', 'Dest', 'DepDelay', 'ArrDelay'
          ).\
            where("FlightCRSDepTime is Not NULL"). \
    orderBy('FlightCRSDepTime', (col('DepDelay') - col('ArrDelay'))).\
show()

+----------------+-------------+---------+------+----+--------+--------+
|FlightCRSDepTime|UniqueCarrier|FlightNum|Origin|Dest|DepDelay|ArrDelay|
+----------------+-------------+---------+------+----+--------+--------+
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:15|           TW|      545|   DFW| SAT|       5|       0|
|1987-10-01 00:30|           HP|      170|   LAS| O

#### Joining Dataframes

* For below set of problems we need January 2008 Data.
* Following line of code addresses this.

In [10]:
from pyspark.sql.functions import col, lpad

Jan_2008_Data = airlinesDF.filter((col('Year') == '2008') & (lpad(col('Month'), 2, '0') == '01'))
#Jan_2008_Data.show()

#### Problem 4
* Get number of flights departed from each of the airport in USA  for January 2008

#### Solution 4

In [10]:
from pyspark.sql.functions import col, count

USA_airport_departed_count = airportCodesDF.filter((col('Country') == 'USA') & ((col('State') != 'Hawaii') | (col('IATA') != 'Big'))). \
                       join(Jan_2008_Data, airportCodesDF.IATA == Jan_2008_Data.Origin). \
                       groupBy(Jan_2008_Data.Origin). \
                       agg(count(Jan_2008_Data.Origin).alias('Flight_Count'))

In [11]:
USA_airport_departed_count.show()

+------+-------------+
|Origin|count(Origin)|
+------+-------------+
|   BGM|          620|
|   MSY|        34529|
|   GEG|        13730|
|   SNA|        42730|
|   BUR|        27970|
|   GRB|         6750|
|   GTF|         2170|
|   IDA|         3000|
|   GRR|        12730|
|   EUG|         5520|
|   PVD|        19830|
|   GSO|        10830|
|   MYR|         3600|
|   OAK|        59320|
|   COD|          930|
|   MSN|        11020|
|   BTM|          620|
|   FAR|         4020|
|   FSM|         2400|
|   MQT|          850|
+------+-------------+
only showing top 20 rows



#### Problem 5
* Get number of flights departed from each of the state in USA for January 2008

#### Solution 5

In [21]:
from pyspark.sql.functions import col, count, lit

USA_state_departed_count = airportCodesDF.filter((col('Country') == 'USA') & ((col('State') != 'Hawaii') | (col('IATA') != 'Big'))). \
                            join(Jan_2008_Data, airportCodesDF.IATA == Jan_2008_Data.Origin). \
                            groupBy(airportCodesDF.State). \
                            agg(count(lit(1)).alias('Flight_Count'))

In [22]:
USA_state_departed_count.show()

+-----+------------+
|State|Flight_Count|
+-----+------------+
|   AZ|      207679|
|   SC|       35250|
|   LA|       58839|
|   MN|      123570|
|   NJ|      124979|
|   OR|       62210|
|   VA|       40930|
| null|      140900|
|   RI|       19830|
|   KY|       27770|
|   WY|        6860|
|   NH|       16219|
|   MI|      178240|
|   NV|      177629|
|   WI|       53560|
|   ID|       24970|
|   CA|      728529|
|   CT|       27290|
|   NE|       25470|
|   MT|       17340|
+-----+------------+
only showing top 20 rows



#### Problem 6
* Get the list of airports in the USA from which flights are not departed for January 2008

#### Solution 6

In [19]:
from pyspark.sql.functions import col

airports_noflight_departed = airportCodesDF.filter((col('Country') == 'USA') & ((col('State') != 'Hawaii') | (col('IATA') != 'Big'))). \
                                join(Jan_2008_Data, airportCodesDF.IATA == Jan_2008_Data.Origin, 'LEFT'). \
                                filter("Origin IS NULL"). \
                                select('City', 'State', 'Country')

In [20]:
airports_noflight_departed.show()

+-------------------+-----+-------+
|               City|State|Country|
+-------------------+-----+-------+
|            Lebanon|   NH|    USA|
|         Dillingham|   AK|    USA|
|International Falls|   MN|    USA|
|         Wolf Point|   MT|    USA|
|              Havre|   MT|    USA|
|         Fort Dodge|   IA|    USA|
|              Homer|   AK|    USA|
|         Greenbrier|   WV|    USA|
|             Joplin|   MO|    USA|
|          Watertown|   SD|    USA|
|            Kearney|   NE|    USA|
|           Rockland|   ME|    USA|
|           Escanaba|   MI|    USA|
|            Kingman|   AZ|    USA|
|           Columbus|   NE|    USA|
|       Saranac Lake|   NY|    USA|
|             Laurel|   MS|    USA|
|   Sault Ste. Marie|   MI|    USA|
|  Lanai City, Lanai|   HI|    USA|
|        Silver City|   NM|    USA|
+-------------------+-----+-------+
only showing top 20 rows



#### Problem 7
* Check if there are any origins in airlines data which do not have record in airport-codes for January 2008

#### Solution 7

In [15]:
no_airport_codes = airportCodesDF. \
                    join(Jan_2008_Data, airportCodesDF.IATA == Jan_2008_Data.Origin, 'RIGHT'). \
                    filter("IATA IS NULL"). \
                    select('Origin').distinct()

In [16]:
no_airport_codes.show()

+------+
|Origin|
+------+
|   PSE|
|   PSG|
|   SCC|
|   ADK|
|   CEC|
|   CDC|
|   SJU|
|   ITO|
|   SLE|
|   STX|
|   HDN|
|   BQN|
|   KOA|
|   OTZ|
|   PMD|
|   STT|
+------+



#### Problem 8
* Get the total number of flights from the airports that do not contain entries in airport-codes for January 2008

#### Solution 8

In [13]:
from pyspark.sql.functions import count, lit

noAirportCodes_FlightCount = airportCodesDF. \
                                join(Jan_2008_Data, airportCodesDF.IATA == Jan_2008_Data.Origin, 'RIGHT'). \
                                filter("IATA IS NULL"). \
                                agg(count(lit(1)).alias('Flight_Count'))

In [14]:
noAirportCodes_FlightCount.show()

+------------+
|Flight_Count|
+------------+
|       55850|
+------------+



#### Problem 9
* Get the total number of flights per airport that do not contain entries in airport-codes for January 2008

#### Solution 9

In [17]:
from pyspark.sql.functions import count, lit

flights_per_airport_count = Jan_2008_Data. \
                                join(airportCodesDF, airportCodesDF.IATA == Jan_2008_Data.Origin, 'LEFT'). \
                                filter("IATA IS NULL"). \
                                groupBy('Origin'). \
                                agg(count(lit(1)).alias('Flight_Count'))

In [1]:
flights_per_airport_count.show()

NameError: name 'flights_per_airport_count' is not defined

#### Windowing Functions¶

* For below set of problems we need January 2008 Data.
* Following line of code addresses this.

In [9]:
from pyspark.sql.functions import col, lpad

Jan_2008_Data = airlinesDF.filter((col('Year') == '2008') & (lpad(col('Month'), 2, '0') == '01'))
#Jan_2008_Data.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|    

#### Problem 10

* Get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport for January 2008.

#### Solution 10

In [29]:
from pyspark.sql.functions import col, lit, lpad, concat_ws, date_format, substring

In [23]:
from pyspark.sql.functions import min, max, sum, avg

In [24]:
from pyspark.sql.window import Window

In [33]:
spec = Window. \
    partitionBy('FlightCRSDepDate', 'Origin')

In [34]:
dep_delay_data = Jan_2008_Data. \
                    filter((col('IsDepDelayed') == 'YES') & (col('Cancelled') == 0)). \
                    select(date_format(concat_ws('-', col('Year'),
                                 lpad(col('Month'), 2 ,'0'),
                                 lpad(col('DayOfMonth'), 2, '0')), 'YYYY-MM-dd').alias('FlightCRSDepDate'),
                          'Origin', 
                          'UniqueCarrier',
                          'FlightNum', 
                           concat_ws(':', substring(lpad(col('CRSDepTime'), 4, "0"), 0, 2), 
                                            substring(col('CRSDepTime'), -2, 2)).alias('FlightCRSDepTime'),
                           'IsDepDelayed',
                           'DepDelay'    
                          ). \
                    withColumn('DepDelay',col('DepDelay').cast('int')). \
                    withColumn('Min_Dep_Delay', min('DepDelay').over(spec)). \
                    withColumn('Max_Dep_Delay', max('DepDelay').over(spec)). \
                    withColumn('Avg_Dep_Delay', avg('DepDelay').over(spec)). \
                    withColumn('Min_Dep_Delay', min('DepDelay').over(spec)). \
                    withColumn('Total_Dep_Delay', sum('DepDelay').over(spec)). \
                    orderBy('FlightCRSDepDate', 'Origin', 'DepDelay')

In [35]:
dep_delay_data.show()

+----------------+------+-------------+---------+----------------+------------+--------+-------------+-------------+-------------+---------------+
|FlightCRSDepDate|Origin|UniqueCarrier|FlightNum|FlightCRSDepTime|IsDepDelayed|DepDelay|Min_Dep_Delay|Max_Dep_Delay|Avg_Dep_Delay|Total_Dep_Delay|
+----------------+------+-------------+---------+----------------+------------+--------+-------------+-------------+-------------+---------------+
|      2008-01-01|   ABE|           OO|     5873|           07:20|         YES|       1|            1|          175|       60.875|           4870|
|      2008-01-01|   ABE|           OO|     5873|           07:20|         YES|       1|            1|          175|       60.875|           4870|
|      2008-01-01|   ABE|           OO|     5873|           07:20|         YES|       1|            1|          175|       60.875|           4870|
|      2008-01-01|   ABE|           OO|     5873|           07:20|         YES|       1|            1|          175|  