#### Import Libraries

In [None]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import count_distinct,lit, col, udf, regexp_replace

from pyspark.sql.types import IntegerType

import matplotlib.pyplot as plt                                                                              

import os

spark = SparkSession.builder.config("spark.driver.memory", "20g").appName("project").getOrCreate()

#### Merge data files and Save to Parquet file

In [9]:
# create a SparkSession
spark = SparkSession.builder.appName("Merged_csv").getOrCreate()

# define the path to the CSV files
path = r"2004_2008"

# load each CSV file and add a column for the year
dfs = []
for year in range(2004, 2009):
    filename = f"{year}.csv"
    files_path = os.path.join(path, filename)
    df = spark.read.csv(files_path, header=True, inferSchema=True)
    df = df.withColumn("year", lit(year))
    dfs.append(df)

# union all the dataframes together
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.union(df)


In [10]:
merged_df.show(n=30)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2004|    1|        12|        1|    623|       630|    901|       915|           UA|      462

In [11]:
merged_df.tail(5)

[Row(year=2008, Month=4, DayofMonth=17, DayOfWeek=4, DepTime='1025', CRSDepTime=1025, ArrTime='1234', CRSArrTime=1237, UniqueCarrier='DL', FlightNum=1207, TailNum='N393DA', ActualElapsedTime='129', CRSElapsedTime='132', AirTime='108', ArrDelay='-3', DepDelay='0', Origin='BOS', Dest='CVG', Distance=752, TaxiIn='5', TaxiOut='16', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(year=2008, Month=4, DayofMonth=17, DayOfWeek=4, DepTime='1319', CRSDepTime=1320, ArrTime='1527', CRSArrTime=1524, UniqueCarrier='DL', FlightNum=1208, TailNum='N952DL', ActualElapsedTime='128', CRSElapsedTime='124', AirTime='107', ArrDelay='3', DepDelay='-1', Origin='CVG', Dest='BOS', Distance=752, TaxiIn='9', TaxiOut='12', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(year=2008, Month=4, DayofMonth=17, DayOfWee

In [12]:
merged_df.count()

25173525

In [13]:
# write the merged dataframe to parquet format
parquet_path = '2004_2008_data.parquet'

merged_df.write.parquet(parquet_path)

# stop the SparkSession
spark.stop()


#### To load the parquet file into a dataframe use this code 

In [3]:
# load the Parquet file as a DataFrame
df = spark.read.parquet("2004_2008_data.parquet")

# show the first 20 rows of the DataFrame
df.show(5)



23/04/11 15:15:57 WARN Utils: Your hostname, mbzuai-metaverse-06 resolves to a loopback address: 127.0.1.1; using 10.127.94.181 instead (on interface eno2)
23/04/11 15:15:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 15:15:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

23/04/11 15:16:04 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------

In [12]:
print(f'The combined dataset has a total of {df.count()} rows and {len(df.columns)} columns.')

The combined dataset has a total of 25173525 rows and 29 columns.


In [6]:
# stop the SparkSession
spark.stop()

#### Data Description

In [33]:
# load the Parquet file as a DataFrame
df = spark.read.parquet("2004_2008_data.parquet")

In [21]:
print('Data Schema')
df.printSchema()

Data Schema
root
 |-- year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = tr

In [22]:
print('Data Statistics')
df.describe().show()

Data Statistics




+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+--------+------------------+------------------+-----------------+-----------------+------------------+--------+--------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+------------------+-----------------+------------------+--------------------+------------------+
|summary|              year|            Month|       DayofMonth|         DayOfWeek|           DepTime|        CRSDepTime|           ArrTime|        CRSArrTime|UniqueCarrier|         FlightNum| TailNum| ActualElapsedTime|    CRSElapsedTime|          AirTime|         ArrDelay|          DepDelay|  Origin|    Dest|         Distance|           TaxiIn|           TaxiOut|           Cancelled|CancellationCode|            Diverted|      CarrierDelay|     WeatherDelay|          NASD

                                                                                

From the statistics summary, some columns seems to include null values, so we'll be examining those columns further.

##### Data Pre-Processing

In [24]:
columns_with_null = [column for column in df.columns if df.where(df[column].isNull()).count() > 0]
for column in columns_with_null:
    print(f'The column {column} has {df.where(df[column].isNull()).count()} null values, \
          which is {round(df.where(df[column].isNull()).count() / df.count() * 100, 2)}% of the total.')

The column TailNum has 42601 null values,           which is 0.17% of the total.
The column CancellationCode has 24667125 null values,           which is 97.99% of the total.


In [25]:
columns_with_na = [column for column in df.columns if df.where( df[column] == 'NA').count() > 0]
for column in columns_with_na:
    print(f'Column {column} has {df.where(df[column] == "NA").count()} missing values,which is {round(df.where(df[column] == "NA").count() / df.count() * 100, 2)}% of the total.')


Column DepTime has 506397 missing values,which is 2.01% of the total.
Column ArrTime has 561058 missing values,which is 2.23% of the total.
Column ActualElapsedTime has 561058 missing values,which is 2.23% of the total.
Column CRSElapsedTime has 1405 missing values,which is 0.01% of the total.
Column AirTime has 561058 missing values,which is 2.23% of the total.
Column ArrDelay has 561058 missing values,which is 2.23% of the total.
Column DepDelay has 506397 missing values,which is 2.01% of the total.
Column TaxiIn has 70096 missing values,which is 0.28% of the total.
Column TaxiOut has 64442 missing values,which is 0.26% of the total.
Column CarrierDelay has 1804634 missing values,which is 7.17% of the total.
Column WeatherDelay has 1804634 missing values,which is 7.17% of the total.
Column NASDelay has 1804634 missing values,which is 7.17% of the total.
Column SecurityDelay has 1804634 missing values,which is 7.17% of the total.
Column LateAircraftDelay has 1804634 missing values,whi

Looking at columns that have equal percentage of missing values: these columns have to do with cancellation or delay

In [7]:
df.select([count_distinct(column).alias(column) for column in columns_with_na]).show()



+-------+-------+-----------------+--------------+-------+--------+--------+------+-------+------------+------------+--------+-------------+-----------------+
|DepTime|ArrTime|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|TaxiIn|TaxiOut|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-------+-------+-----------------+--------------+-------+--------+--------+------+-------+------------+------------+--------+-------------+-----------------+
|   1604|   1683|              880|           634|   1549|    1470|    1592|   955|    449|        1258|         794|     724|          245|              700|
+-------+-------+-----------------+--------------+-------+--------+--------+------+-------+------------+------------+--------+-------------+-----------------+



                                                                                

In [8]:
df.select('CancellationCode').distinct().show()

+----------------+
|CancellationCode|
+----------------+
|            null|
|               B|
|               D|
|               C|
|               A|
+----------------+



In [40]:
df.select('ArrDelay').distinct().sort(col('DepDelay').desc()).show(5)



+--------+
|ArrDelay|
+--------+
|      NA|
|     978|
|     997|
|    1022|
|    1002|
+--------+
only showing top 5 rows



                                                                                

Looking at the unique values in CancellationCode, it seems null was used for flights that weren't cancelled.

We'll look at how the other missing values relates with whether the fight was cancelled after replacing null with 'NA' in CancellationCode and TailNum.

In [47]:
df = df.fillna('UNK', subset=['TailNum', 'CancellationCode'])

In [48]:
df.select('CancellationCode').distinct().show()

+----------------+
|CancellationCode|
+----------------+
|               B|
|               D|
|               C|
|             UNK|
|               A|
+----------------+



#### Exploring null values in canceled flight

ArrTime,AirTime,ArrDelay,ActualElapsedTime will be replaced with 0 since they don't count for canceled flights.


Also for canceled flights, it makes sense that some factors that cause delays are not recorded and could be 0 since the flight never took place and since it's same percentage, we replace with 0.


In [11]:
cancelled_flights = df.where(df['Cancelled'] == 1).select(columns_with_na)

In [12]:

columns_with_na = [column for column in cancelled_flights.columns if cancelled_flights.where( cancelled_flights[column] == 'NA').count() > 0]
print('Cancelled Flights missing Values: \n \n') 
for column in columns_with_na:
    print(f'Column {column} has {cancelled_flights.where(cancelled_flights[column] == "NA").count()} NA values,which is {round(cancelled_flights.where(cancelled_flights[column] == "NA").count() / cancelled_flights.count() * 100, 2)}% of the total.')


Cancelled Flights missing Values: 
 

Column DepTime has 506397 NA values,which is 100.0% of the total.
Column ArrTime has 506397 NA values,which is 100.0% of the total.
Column ActualElapsedTime has 506397 NA values,which is 100.0% of the total.
Column CRSElapsedTime has 390 NA values,which is 0.08% of the total.
Column AirTime has 506397 NA values,which is 100.0% of the total.
Column ArrDelay has 506397 NA values,which is 100.0% of the total.
Column DepDelay has 506397 NA values,which is 100.0% of the total.
Column TaxiIn has 64442 NA values,which is 12.73% of the total.
Column TaxiOut has 64442 NA values,which is 12.73% of the total.
Column CarrierDelay has 64442 NA values,which is 12.73% of the total.
Column WeatherDelay has 64442 NA values,which is 12.73% of the total.
Column NASDelay has 64442 NA values,which is 12.73% of the total.
Column SecurityDelay has 64442 NA values,which is 12.73% of the total.
Column LateAircraftDelay has 64442 NA values,which is 12.73% of the total.


In [46]:
df = df.withColumns({
    'ArrDelay': regexp_replace('ArrDelay', 'NA', '0').cast(IntegerType()), \
    'DepDelay': regexp_replace('DepDelay', 'NA', '0').cast(IntegerType()), \
    'ActualElapsedTime': regexp_replace('ActualElapsedTime', 'NA', '0').cast(IntegerType()), \
    'AirTime': regexp_replace('AirTime', 'NA', '0').cast(IntegerType()), \
    'TaxiIn': regexp_replace('TaxiIn', 'NA', '0').cast(IntegerType()), \
    'TaxiOut': regexp_replace('TaxiOut', 'NA', '0').cast(IntegerType()), \
    'CarrierDelay': regexp_replace('CarrierDelay', 'NA', '0').cast(IntegerType()), \
    'WeatherDelay': regexp_replace('WeatherDelay', 'NA', '0').cast(IntegerType()), \
    'NASDelay': regexp_replace('NASDelay', 'NA', '0').cast(IntegerType()), \
    'SecurityDelay': regexp_replace('SecurityDelay', 'NA', '0').cast(IntegerType()), \
    'LateAircraftDelay': regexp_replace('LateAircraftDelay', 'NA', '0').cast(IntegerType()), \
    'DepTime': regexp_replace('DepTime', 'NA', '0').cast(IntegerType()), \
    'CRSElapsedTime': regexp_replace('CRSElapsedTime', 'NA', '0').cast(IntegerType()), \
    'ArrTime': regexp_replace('ArrTime', 'NA', '0').cast(IntegerType()), \
})

In [15]:
df.where(df['Cancelled'] == 0).groupBy('CancellationCode').count().show()



+----------------+--------+
|CancellationCode|   count|
+----------------+--------+
|             UNK|24667125|
|               B|       2|
|               A|       1|
+----------------+--------+



                                                                                

It's observed that 3 flights that weren't cancelled have cancellation codes

In [49]:
df.describe().show()



+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+--------+-----------------+------------------+------------------+-----------------+-----------------+--------+--------+-----------------+------------------+------------------+--------------------+----------------+--------------------+------------------+------------------+------------------+--------------------+------------------+
|summary|              year|            Month|       DayofMonth|         DayOfWeek|           DepTime|        CRSDepTime|           ArrTime|        CRSArrTime|UniqueCarrier|         FlightNum| TailNum|ActualElapsedTime|    CRSElapsedTime|           AirTime|         ArrDelay|         DepDelay|  Origin|    Dest|         Distance|            TaxiIn|           TaxiOut|           Cancelled|CancellationCode|            Diverted|      CarrierDelay|      WeatherDelay|          NA

                                                                                

In [17]:
del cancelled_flights

In [None]:
# write the cleaned dataframe to parquet format
parquet_path = 'cleaned_data.parquet'

df.write.parquet(parquet_path)

spark.stop()

                                                                                

#### Adding columns.

In [2]:
df = spark.read.parquet("cleaned_data.parquet")

##### Add column to indicate if flight was delayed or not

In [50]:
def delayed(row):
    return True if int(row) < 15 else False    

delayedUDF = udf(lambda x:delayed(x))

df = df.withColumn('OnTime', delayedUDF(col('ArrDelay')))

In [4]:
df.show(5)

23/04/12 22:49:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------+
|year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|OnTime|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+----

##### Export updated data for subsequent use

In [51]:
# write the cleaned dataframe to parquet format
parquet_path = 'updated_data.parquet'

df.write.parquet(parquet_path)

spark.stop()

                                                                                