In [1]:
from pyspark.sql.types import * 

In [3]:
appName = "Data Preprocessing"
spark = SparkSession.builder.appName(appName).config("spark.some.config.option","some-value").getOrCreate()

In [4]:
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('dataset/raw-flight-data.csv', 
                         schema=flightSchema, header=True)
flights.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows



In [5]:
airportSchema = StructType([
  StructField("airport_id", IntegerType(), False),
  StructField("city", StringType(), False),
  StructField("state", StringType(), False),
  StructField("name", StringType(), False),
])

airports = spark.read.csv('dataset/airports.csv', header=True, 
                          schema=airportSchema)
airports.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



In [7]:
flightsByOrigin = flights.join(airports,
                               flights.OriginAirportID == 
                               airports.airport_id).groupBy("city").count()
flightsByOrigin.show(5)

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|90281|
|         Omaha|13537|
|Raleigh/Durham|28436|
|     Anchorage| 7777|
|        Dallas|19503|
+--------------+-----+
only showing top 5 rows



In [8]:
#count the number of original data rows
n1 = flights.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = flights.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

number of original data rows:  2719418
number of data rows after deleting duplicated data:  2696983
number of duplicated data:  22435


In [9]:
df = spark.createDataFrame([("Jan",27, 168), 
                            ("Jan",15, 165), 
                            ("Jan",27, 168)], 
                           ["name","age","height"])
df.show()
df.dropDuplicates().show()

+----+---+------+
|name|age|height|
+----+---+------+
| Jan| 27|   168|
| Jan| 15|   165|
| Jan| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
| Jan| 27|   168|
| Jan| 15|   165|
+----+---+------+



In [10]:

df.dropDuplicates(['name']).show()

+----+---+------+
|name|age|height|
+----+---+------+
| Jan| 27|   168|
+----+---+------+



In [11]:
flightsNoMissingValue = flights.dropDuplicates().dropna(
    how="any", subset=["ArrDelay", "DepDelay"])# use how="all" for all column missing data
numberOfMissingValueAny = n1 - flightsNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  46233


In [12]:
#take mean value
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]
print("mean ArrDelay: ", meanArrDelay)
meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]
print("mean DepDelay: ", meanDepDelay)
#drop duplicated data and fill missing data with mean value
flightsCleanData=flights.fillna(
    {'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})
#just for experiment
flights.groupBy().avg("ArrDelay").show()

mean ArrDelay:  6.63768791455498
mean DepDelay:  10.53686662649788
+----------------+
|   avg(ArrDelay)|
+----------------+
|6.63768791455498|
+----------------+



In [13]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

+-------+------------------+-----------------+
|summary|          DepDelay|         ArrDelay|
+-------+------------------+-----------------+
|  count|           2719418|          2719418|
|   mean|10.531448640848888|6.630879842672218|
| stddev| 35.91695039008106|38.44200618946955|
|    min|               -63|              -94|
|    max|              1863|             1845|
+-------+------------------+-----------------+



In [14]:

correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("correlation between departure delay and arrival delay: ", 
      correlation)

correlation between departure delay and arrival delay:  0.9393538215572607
