# Importing Spark SQL and creating a session

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
# importing required modules

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# create SparkSession
spark = SparkSession.builder.appName("Data Preprocessing in Apache Spark") \
.config("spark.some.config.option", "some-value").getOrCreate()

# Loading flights and airports data

In [3]:
# Create flights db schema
flightSchema = StructType([
StructField("DayofMonth", IntegerType(), False),
StructField("DayOfWeek", IntegerType(), False),
StructField("Carrier", StringType(), False),
StructField("OriginAirportID", IntegerType(), False),
StructField("DestAirportID", IntegerType(), False),
StructField("DepDelay", IntegerType(), False),
StructField("ArrDelay", IntegerType(), False)])

# Read data from file into DataFrame
flightDataFrame = spark.read.csv('raw-flight-data.csv', schema=flightSchema, header=True)
flightDataFrame.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



In [4]:
# Create airports db schema
airportSchema = StructType([
StructField("Airport_ID", IntegerType(), False),
StructField("City", StringType(), False),
StructField("State", StringType(), False),
StructField("Name", StringType(), False),
])

# Reading data from file into DataFrame
airportDataFrame = spark.read.csv('airports.csv', schema=airportSchema, header=True)
airportDataFrame.show(3)

+----------+-----------+-----+--------------------+
|Airport_ID|       City|State|                Name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
+----------+-----------+-----+--------------------+
only showing top 3 rows



# Handling duplicated data

In [5]:
# Count the number of flights
n1 = flightDataFrame.count()
print("Number of rows in Flight DataFrame: ", n1)

# Count unique flights
n2 = flightDataFrame.dropDuplicates().count()
print("Number of rows after removing duplicates: ", n2)

# Count the number of duplicated flights
n3 = n1 - n2
print("Number of duplicate rows in Flight DataFrame: ", n3)

Number of rows in Flight DataFrame:  2719418
Number of rows after removing duplicates:  2696983
Number of duplicate rows in Flight DataFrame:  22435


In [6]:
# Drop duplicated flights
UniqueflightDataFrame = flightDataFrame.dropDuplicates()
UniqueflightDataFrame.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          12889|        13487|       7|      16|
|        18|        4|     DL|          12892|        13487|       5|      -4|
|        18|        4|     DL|          14635|        11193|      -7|     -12|
|        18|        4|     DL|          10397|        11298|      -3|      44|
|        18|        4|     DL|          10397|        12451|       1|      -7|
|        19|        5|     DL|          13487|        11433|      -3|     -21|
|        19|        5|     DL|          14771|        13487|     340|     325|
|        19|        5|     DL|          10397|        11298|      20|      20|
|        19|        5|     DL|          12953|        13487|       4|     -28|
|        19|        5|     DL|          14869|      

# Handling missing data

In [7]:
# Remove the flight if the day of the month or the day of the week is missing
flightsNoMissingDays = UniqueflightDataFrame.dropDuplicates().dropna(how="any", subset=["DayofMonth", "DayofWeek"]) 
numberOfFlightsWithoutDays = n1 - flightsNoMissingDays.count()
print("Number of flights to be removed: ", numberOfFlightsWithoutDays)

Number of flights to be removed:  22435


In [8]:
# Count the number of flights with missing departure or arrival delays
flightsNoMissingDelay = flightsNoMissingDays.dropDuplicates().dropna(how="any", subset=["DepDelay", "ArrDelay"]) 
numberOfMissingDelay = n1 - flightsNoMissingDelay.count()
print("Number of flights to be corrected: ", numberOfMissingDelay)

Number of flights to be corrected:  46233


In [9]:
# Filling missing data with the average value in the respective column
meanArrDelay = flightsNoMissingDays.groupBy().avg("ArrDelay").take(1)[0][0]
print("Average value of ArrDelay column: ", meanArrDelay)
meanDepDelay = flightsNoMissingDays.groupBy().avg("DepDelay").take(1)[0][0]
print("Average value of DepDelay column: ", meanDepDelay)

# Removing duplicate data and filling missing data with the average value in the respective column
flightsCleanData = flightsNoMissingDays.fillna({'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})

Average value of ArrDelay column:  6.7272897311633875
Average value of DepDelay column:  10.618575625454712


# Displaying dataset statistics

In [10]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

+-------+------------------+------------------+
|summary|          DepDelay|          ArrDelay|
+-------+------------------+------------------+
|  count|           2696983|           2696983|
|   mean|10.613481805409972|  6.72087217457433|
| stddev| 36.04900147972999|38.578791794541594|
|    min|               -63|               -94|
|    max|              1863|              1845|
+-------+------------------+------------------+



In [11]:
# view the correlation value of the DepDelay and ArrDelay columns
correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("Correlation value of DepDelay and ArrDelay columns: ", correlation)

Correlation value of DepDelay and ArrDelay columns:  0.9394056759019337


# Display flight count per city

In [12]:
flightByOrigin = flightsCleanData.join(airportDataFrame,flightsCleanData.OriginAirportID == airportDataFrame.Airport_ID) \
                                .groupBy("City").count() \
                                .withColumnRenamed("count", "Total Flights") \
                                .sort(desc("Total Flights"))

flightByOrigin.show()

+-----------------+-------------+
|             City|Total Flights|
+-----------------+-------------+
|          Chicago|       176144|
|          Atlanta|       148462|
|      Los Angeles|       117446|
|         New York|       117392|
|Dallas/Fort Worth|       103947|
|          Houston|       102187|
|           Denver|        97208|
|          Phoenix|        89686|
|    San Francisco|        83818|
|       Washington|        80300|
|        Las Vegas|        77841|
|        Charlotte|        76383|
|           Boston|        66567|
|           Newark|        63995|
|          Detroit|        62423|
|          Seattle|        60149|
|      Minneapolis|        60106|
|          Orlando|        58972|
|        Baltimore|        51711|
|     Philadelphia|        47386|
+-----------------+-------------+
only showing top 20 rows

