# Read data file and create data schema

In [1]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

#create session
appName = "data preprocessing in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('dataset/flights.csv', 
                         schema=flightSchema, header=True)
flights.show(2)

2024-03-16 10:21:03,301 WARN util.Utils: Your hostname, BDS-2023 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2024-03-16 10:21:03,331 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-03-16 10:21:12,125 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-03-16 10:21:24,910 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [2]:
airportSchema = StructType([
  StructField("airport_id", IntegerType(), False),
  StructField("city", StringType(), False),
  StructField("state", StringType(), False),
  StructField("name", StringType(), False),
])

airports = spark.read.csv('dataset/airports.csv', header=True, 
                          schema=airportSchema)
airports.show(2)

                                                                                

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



Merge two dataFrame (flight and airports), and show how many flights from each City

In [10]:
flightsByOrigin = flights.join(airports,
                               flights.OriginAirportID == 
                               airports.airport_id).groupBy("city").count()
flightsByOrigin.show(3)

                                                                                

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|89720|
|         Omaha|13487|
|Raleigh/Durham|28301|
+--------------+-----+
only showing top 3 rows



# Handle duplicated data

Drop duplicated data and calculate how many duplicated data

In [3]:
#count the number of original data rows
n1 = flights.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = flights.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

                                                                                

number of original data rows:  2702218




number of data rows after deleting duplicated data:  2696657
number of duplicated data:  5561


                                                                                

We can also specify the duplicate criterion by certain column

In [4]:
df = spark.createDataFrame([("Rony",27, 168), 
                            ("Rony",15, 165), 
                            ("Rony",27, 168)], 
                           ["name","age","height"])
df.show()
df.dropDuplicates().show()

                                                                                

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
|Rony| 27|   168|
+----+---+------+



[Stage 16:>                                                         (0 + 1) / 1]

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
+----+---+------+



                                                                                

In [5]:
df.dropDuplicates(['name']).show()

                                                                                

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
+----+---+------+



# Handle missing data

1. Delete row if there is at least one (column) missing data

In [6]:
flightsNoMissingValue = flights.dropDuplicates().dropna(
    how="any", subset=["ArrDelay", "DepDelay"])# use how="all" for all column missing data
numberOfMissingValueAny = n1 - flightsNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)



number of missing value rows:  5561


                                                                                

2. Fill the missing data using mean value of each corresponding column data

In [7]:
#take mean value
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]
print("mean ArrDelay: ", meanArrDelay)
meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]
print("mean DepDelay: ", meanDepDelay)
#drop duplicated data and fill missing data with mean value
flightsCleanData=flights.fillna(
    {'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})
#just for experiment
flights.groupBy().avg("ArrDelay").show()

                                                                                

mean ArrDelay:  6.6550108096386005


                                                                                

mean DepDelay:  10.510732294729737


                                                                                

+------------------+
|     avg(ArrDelay)|
+------------------+
|6.6550108096386005|
+------------------+



# Explore the statistic of our data

In [8]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

                                                                                

+-------+------------------+------------------+
|summary|          DepDelay|          ArrDelay|
+-------+------------------+------------------+
|  count|           2702218|           2702218|
|   mean|10.510732294729737|6.6550108096386005|
| stddev| 36.02975608466116| 38.54758423679124|
|    min|               -63|               -94|
|    max|              1863|              1845|
+-------+------------------+------------------+



We can also calculate the correlation between two variables to know whether the varible is related each other or not

In [9]:
correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("correlation between departure delay and arrival delay: ", 
      correlation)

                                                                                

correlation between departure delay and arrival delay:  0.9392560468773492
