In [1]:
import datetime, warnings, scipy 
import pandas as pd
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
warnings.filterwarnings("ignore")
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

pd.options.display.max_columns = 50

In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "16g") \
    .config("spark.driver.maxResultSize", "8g") \
    .appName('my-cool-app') \
    .getOrCreate()



## 1. Data description 

### 1.1  Flight.csv
contains data of all the flights with day source and destination and delays

In [3]:
flights_df = spark.read.csv("../flight_data/flights.csv", header = True)
rows = flights_df.count()
column =  len(flights_df.columns)
print('Dataframe dimensions:', (rows,column))
flights_df.printSchema()


Dataframe dimensions: (5819079, 31)
root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DE

In [4]:
null_columns = flights_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in flights_df.columns]
   )


In [5]:
pc_missing = []
df = null_columns.toPandas()
for column in df:
    pc_missing.append((column,(df[column][0]/rows)*100))

In [6]:
a = pd.DataFrame(flights_df.dtypes).set_index(0)[1]
b = pd.DataFrame(pc_missing).set_index(0)[1]
column_info=pd.DataFrame(a).T.rename(index={1:'column data type'})
column_info=column_info.append(df.rename(index={0:'null values (nb)'}))
column_info=column_info.append(b.rename(index={0:'null values (%)'}))
#                          
column_info


Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
column data type,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string
null values (nb),0,0,0,0,0,0,14721,0,0,0,86153,86153,89047,89047,6,105071,105071,0,92513,92513,0,92513,105071,0,0,5729195,4755640,4755640,4755640,4755640,4755640
1,0.0,0.0,0.0,0.0,0.0,0.0,0.252978,0.0,0.0,0.0,1.480526,1.480526,1.530259,1.530259,0.000103,1.805629,1.805629,0.0,1.589822,1.589822,0.0,1.589822,1.805629,0.0,0.0,98.455357,81.72496,81.72496,81.72496,81.72496,81.72496


### 1.2 Airline.csv
Contains name of the airline with thier codes

In [7]:
airlines_names = spark.read.csv("../flight_data/airlines.csv", header = True)
rows = airlines_names.count()
column =  len(airlines_names.columns)
print('Dataframe dimensions:', (rows,column))
airlines_names.printSchema()

Dataframe dimensions: (14, 2)
root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



In [8]:
airlines_names.show(5)

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
+---------+--------------------+
only showing top 5 rows



### 1.3 Airports.csv
contains code of airport and name of the airport, location data

In [9]:
airports = spark.read.csv("../flight_data/airports.csv", header = True)
rows = airports.count()
column =  len(airports.columns)
print('Dataframe dimensions:', (rows,column))
airports.printSchema()

Dataframe dimensions: (322, 7)
root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)



In [10]:
airports.show(5)

+---------+--------------------+-----------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+-----------+-----+-------+--------+----------+
|      ABE|Lehigh Valley Int...|  Allentown|   PA|    USA|40.65236| -75.44040|
|      ABI|Abilene Regional ...|    Abilene|   TX|    USA|32.41132| -99.68190|
|      ABQ|Albuquerque Inter...|Albuquerque|   NM|    USA|35.04022|-106.60919|
|      ABR|Aberdeen Regional...|   Aberdeen|   SD|    USA|45.44906| -98.42183|
|      ABY|Southwest Georgia...|     Albany|   GA|    USA|31.53552| -84.19447|
+---------+--------------------+-----------+-----+-------+--------+----------+
only showing top 5 rows



## 2. Data cleaning and wrangling

#### 2.1 merging year , month and day to date

In [11]:
flights_df = flights_df.withColumn("DATE",concat_ws("-",col("YEAR"),col("MONTH"),col("DAY")).cast("date"))

#### 2.2 making time to hh:mm formate

In [12]:
flights_df.count()

5819079

In [13]:
#_________________________________________________________
# Function that convert the 'HHMM' string to datetime.time
def merge_column(df,coulmn_name,column_value):
    b = spark.createDataFrame([(l,) for l in column_value], [coulmn_name])
    df = df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
    b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
    df = df.drop(coulmn_name)
    df = df.join(b, df.row_idx == b.row_idx).\
             drop("row_idx")
    return df
    
    
def format_heure(chaine):
    if chaine == None : 
        return "" 
    if int(chaine) >= 2400: 
        chaine = 0
    chaine = "{0:04d}".format(int(chaine))
    
    heure = datetime.time(int(chaine[0:2]), int(chaine[2:4]))
    return heure
#_____________________________________________________________________
# Function that combines a date and time to produce a datetime.datetime
def combine_date_heure(date,time):
        return str(datetime.datetime.combine(date,time))
#_______________________________________________________________________________
# Function that combine two columns of the dataframe to create a datetime format
def apply_format_heure(df, field):
    liste = []
    
    for time in df.select(field).collect():
        liste.append(str(format_heure(time[field])))
    return liste
        
    
def create_flight_time(df, field):    
    liste = []
    for date, time in df.select('DATE', field).collect():
#         print(type(time))
        if time == "2400":
            
            date += datetime.timedelta(days=1)
            time = datetime.time(0,0)
            liste.append(combine_date_heure(date,time))
        else:
            time = format_heure(time)
            liste.append(combine_date_heure(date,time))
    return liste

In [14]:
SCHEDULED_DEPARTURE = create_flight_time(flights_df, 'SCHEDULED_DEPARTURE')
flights_df = merge_column(flights_df,'SCHEDULED_DEPARTURE',SCHEDULED_DEPARTURE)

In [15]:
DEPARTURE_TIME = apply_format_heure(flights_df, 'DEPARTURE_TIME')
flights_df = merge_column(flights_df,'DEPARTURE_TIME',DEPARTURE_TIME)

In [16]:
SCHEDULED_ARRIVAL = apply_format_heure(flights_df, 'SCHEDULED_ARRIVAL')
flights_df = merge_column(flights_df,'SCHEDULED_ARRIVAL',SCHEDULED_ARRIVAL)


In [17]:
# format_heure(2430)
ARRIVAL_TIME = apply_format_heure(flights_df, 'ARRIVAL_TIME')
flights_df = merge_column(flights_df,'ARRIVAL_TIME',ARRIVAL_TIME)

#### 2.3 Removing cancelled or diverted flights

In [18]:
flights_df = flights_df.filter(col("CANCELLED") == 0)
flights_df = flights_df.filter(col("DIVERTED") == 0)

#### 2.4 removing unused columns

In [19]:
variables_to_remove = ('TAXI_OUT', 'TAXI_IN', 'WHEELS_ON', 'WHEELS_OFF', 'YEAR', 
                       'MONTH','DAY','DAY_OF_WEEK','DATE', 'AIR_SYSTEM_DELAY',
                       'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY',
                       'WEATHER_DELAY', 'DIVERTED', 'CANCELLED', 'CANCELLATION_REASON',
                       'FLIGHT_NUMBER', 'TAIL_NUMBER')
flights_df = flights_df.drop(*variables_to_remove)
flights_df.printSchema()

root
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)



In [20]:
view_data = flights_df.limit(100).toPandas()

In [21]:
view_data

Unnamed: 0,AIRLINE,ORIGIN_AIRPORT,DESTINATION_AIRPORT,DEPARTURE_DELAY,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,ARRIVAL_DELAY,SCHEDULED_DEPARTURE,DEPARTURE_TIME,SCHEDULED_ARRIVAL,ARRIVAL_TIME
0,AS,ANC,SEA,-11,205,194,169,1448,-22,2015-01-01 00:05:00,23:54:00,04:30:00,04:08:00
1,AA,LAX,PBI,-8,280,279,263,2330,-9,2015-01-01 00:10:00,00:02:00,07:50:00,07:41:00
2,US,SFO,CLT,-2,286,293,266,2296,5,2015-01-01 00:20:00,00:18:00,08:06:00,08:11:00
3,AA,LAX,MIA,-5,285,281,258,2342,-9,2015-01-01 00:20:00,00:15:00,08:05:00,07:56:00
4,AS,SEA,ANC,-1,235,215,199,1448,-21,2015-01-01 00:25:00,00:24:00,03:20:00,02:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,EV,ORD,DTW,15,85,67,41,235,-3,2015-01-01 05:35:00,05:50:00,08:00:00,07:57:00
96,OO,FAT,LAX,-5,75,80,42,209,0,2015-01-01 05:35:00,05:30:00,06:50:00,06:50:00
97,UA,SMF,DEN,56,142,164,129,909,78,2015-01-01 05:38:00,06:34:00,09:00:00,10:18:00
98,OO,AUS,LAX,-3,212,199,173,1242,-16,2015-01-01 05:38:00,05:35:00,07:10:00,06:54:00


## 3. Storing cleaned data to disk

In [23]:
flights_df.write.options(header='True', delimiter=',').csv('../flight_data/cleaned_flight_data.csv')