Read data File and Create Data Schema

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark=SparkSession.builder.appName("Data Processing").getOrCreate()

In [2]:
df=spark.read.csv("C:/Users/User/Desktop/SparkFolder/Data/raw-flight-data.csv",inferSchema=True,header=True)

In [3]:
df.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [4]:
df.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



Redefine Schema for Flights

In [5]:
from pyspark.sql.types import *
flightschema=StructType([

                            StructField("DayofMonth",IntegerType(), False),
                            StructField("DayOfWeek",IntegerType(), False),
                            StructField("Carrier",StringType(), False),
                            StructField("OriginAirportID",IntegerType(), False),
                            StructField("DestAirportID",IntegerType(), False),
                            StructField("DepDelay",IntegerType(), False),
                            StructField("ArrDelay",IntegerType(), False)
])

In [6]:
df=spark.read.csv("C:/Users/User/Desktop/SparkFolder/Data/raw-flight-data.csv",schema=flightschema,header=True)

In [7]:
df.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

Load the Airport Data

In [8]:
df1=spark.read.csv("C:/Users/User/Desktop/SparkFolder/Data/airports.csv",inferSchema=True,header=True)

In [9]:
df1.show(5)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
+----------+-----------+-----+--------------------+
only showing top 5 rows



In [10]:
df1.printSchema()

root
 |-- airport_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)



Redefine Schema for Airports

In [11]:
airportschema=StructType([

                            StructField("airport_id",IntegerType(), False),
                            StructField("city",StringType(), False),
                            StructField("state",StringType(), False),
                            StructField("name",StringType(), False)
                            
])

In [12]:
df1=spark.read.csv("C:/Users/User/Desktop/SparkFolder/Data/airports.csv",schema=airportschema,header=True)

In [13]:
df1.printSchema()

root
 |-- airport_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)



In [14]:
df1.show(5)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
+----------+-----------+-----+--------------------+
only showing top 5 rows



Merge the Two Data to show the flights from each city

In [15]:
flightbyOrigin=df1.join(df,df["OriginAirportID"]==df1["airport_id"]).groupBy("city").count()

In [16]:
flightbyOrigin.show()

+-----------------+------+
|             city| count|
+-----------------+------+
|          Phoenix| 90281|
|            Omaha| 13537|
|   Raleigh/Durham| 28436|
|        Anchorage|  7777|
|           Dallas| 19503|
|          Oakland| 25503|
|      San Antonio| 23090|
|     Philadelphia| 47659|
|       Louisville| 10953|
|Dallas/Fort Worth|105024|
|      Los Angeles|118684|
|       Sacramento| 25193|
|     Indianapolis| 18099|
|        Cleveland| 25261|
|        San Diego| 45783|
|    San Francisco| 84675|
|        Nashville| 34927|
|    Oklahoma City| 13967|
|          Detroit| 62879|
|         Portland| 30640|
+-----------------+------+
only showing top 20 rows



Handling Duplicated Data
* Drop duplicated and calculate duplicated Data

In [17]:
#Total number of rows
n1=df.count()
print("The number of rows is :",n1)
#Drop Duplicates
n2=df.dropDuplicates().count()
print("The number of duplicates dropped :",n2)
n3=n1-n2
print("The number of duplicated data :", n3)

The number of rows is : 2719418
The number of duplicates dropped : 2696983
The number of duplicated data : 22435


Create DataFrame from Tuples

In [18]:
from pyspark.sql.functions import *
df2=spark.createDataFrame([
                        ("Sol", 54,58),
                        ("Sol", 57,55),
                        ("Sol", 54,58)],
                        ["Name","age","height"])

In [19]:
# df2=df2.withColumn("ID",monotonically_increasing_id())

In [20]:
df2.show()
df2.dropDuplicates().show()

+----+---+------+
|Name|age|height|
+----+---+------+
| Sol| 54|    58|
| Sol| 57|    55|
| Sol| 54|    58|
+----+---+------+

+----+---+------+
|Name|age|height|
+----+---+------+
| Sol| 54|    58|
| Sol| 57|    55|
+----+---+------+



In [21]:
df2.dropDuplicates(["Name"]).show()

+----+---+------+
|Name|age|height|
+----+---+------+
| Sol| 54|    58|
+----+---+------+



Handling Missing Data
* Delete row iff there is at least one ( column) missing Data

In [24]:
dfmissingvalue=df.dropDuplicates().dropna(how="any",subset=["ArrDelay","DepDelay"])
numberofmissingvalue=n1-dfmissingvalue.count()
print("the number of missing value is", numberofmissingvalue)

the number of missing value is 46233


In [23]:
dfmissingvalue.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|         6|        1|     WN|          10821|        10140|       1|     -22|
|         8|        1|     AA|          11298|        10140|       0|       6|
|        15|        1|     WN|          14747|        10140|      -6|       3|
|        27|        1|     AA|          11298|        10140|     113|     117|
|         7|        2|     OO|          12266|        10140|      -3|     -11|
|        28|        2|     WN|          14107|        10140|      -3|       0|
|        30|        2|     OO|          12266|        10140|      -4|     -11|
|         1|        3|     EV|          12266|        10140|     -11|     -26|
|         3|        3|     OO|          12892|        10140|      -4|      -2|
|        17|        3|     WN|          12892|      

Filling missing value with mean value of the corresponding column

In [26]:
df.groupby().avg("ArrDelay").show()

+----------------+
|   avg(ArrDelay)|
+----------------+
|6.63768791455498|
+----------------+



In [33]:
mean_arrdelay=df.groupby().avg("ArrDelay").collect()[0][0]

In [32]:
#Alternatively
df.groupby().avg("ArrDelay").take(1)[0][0]

6.63768791455498

In [37]:
type(df.groupby().avg("ArrDelay").take(1)[0][0])

float

In [38]:
mean_depdelay=df.groupby().avg("ArrDelay").collect()[0][0]

In [44]:
# # Fill missing values with mean
flightsclean=df.fillna({"ArrDelay":mean_arrdelay,"DepDelay":mean_depdelay})



Explore Stats of the Data

In [41]:
df1.describe().show()

+-------+------------------+--------+-----+--------------------+
|summary|        airport_id|    city|state|                name|
+-------+------------------+--------+-----+--------------------+
|  count|               365|     365|  365|                 365|
|   mean|12761.016438356164|    null| null|                null|
| stddev|1660.7966378387537|    null| null|                null|
|    min|             10135|Aberdeen|   AK|   Aberdeen Regional|
|    max|             16440|    Yuma|   WY|Yuma MCAS/Yuma In...|
+-------+------------------+--------+-----+--------------------+



In [51]:
correlation=flightsclean.select(flightsclean["ArrDelay"].cast("integer"),
                                flightsclean["DepDelay"].cast("integer"))
correlation.show()

+--------+--------+
|ArrDelay|DepDelay|
+--------+--------+
|       1|      -3|
|      -8|       0|
|     -15|      -4|
|      24|      28|
|     -11|      -6|
|     -19|      -1|
|      -1|       0|
|      24|      15|
|      34|      33|
|     322|     323|
|     -13|      -7|
|      41|      22|
|      20|      40|
|      -7|      -2|
|      75|      71|
|      57|      75|
|      10|      -1|
|     -10|      -3|
|      38|      31|
|      25|       8|
+--------+--------+
only showing top 20 rows



In [62]:
print(
    correlation.corr('ArrDelay','DepDelay') * 100, "%")

93.92984568919158 %
