### Importing libraries

In [5]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql.types import DateType, TimestampType
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import findspark as fs

### Create a SparkContext that loads settings from system properties 

It is used to programmatically create Spark RDD, accumulators, and broadcast variables on the cluster. The Spark driver program creates and uses SparkContext to connect to the cluster manager to submit Spark jobs, and know what resource manager (YARN) to communicate to. It is the heart of the Spark application.

In [6]:
sc = SparkContext.getOrCreate()

### Configuration of the session

- setMaster(): denotes where to run your spark application local or cluster
- setMaster(local[*]): is to run Spark locally with as many worker threads as logical cores on your machine.
- SparkSession: entry point to Spark to work with RDD, DataFrame, and Dataset.

In [7]:
conf = pyspark.SparkConf()\
       .setAppName('SparkApp')\
       .setMaster('local[*]') 
#sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

### Read the data


- The dataframe fd will be created with the below statement
- Create a temporary table (fdTable) from the dataframe created
- Now we can query our data in SQL. To do so, we’ll use the spark.sql function (remember, spark is our SparkSession variable) that conveniently returns a new DataFrame

In [8]:
fd = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.load("C:/Users/User/OneDrive/Desktop/Toshiba/Studies/UP/2022 S2/MIT805/Assignment/Assignment 2/Cleaned_Airline_Data.csv")
fd.cache() #we cache the flight dataset so subsequent queries will be faster => store in memory
fd.createOrReplaceTempView("fdTable")

### Display number of RDDs or partitions created by default

In [9]:
print(fd.rdd.getNumPartitions())

59


Schema inference: we want Spark to take a best guess at what the schema of our DataFrame should be. We also want to specify that the first row is the header in the file, so we’ll specify that as an option, too.

To get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark. You also have the option of strictly specifying a schema when you read in data (which we recommend in production scenarios):

Each of these DataFrames (in Scala and Python) have a set of columns with an unspecified number of rows. The reason the number of rows is unspecified is because reading data is a transformation, and is therefore a lazy operation. Spark peeked at only a couple of rows of data to try to guess what types each column should be.

In [10]:
#view the dataframe schema
fd.printSchema() 

root
 |-- _c0: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- FLIGHTS: 

In [11]:
pwd #find directory where the notebook is stored

'C:\\Users\\User\\MIT805\\Assignment 2'

In [12]:
#fix the delay logic in the dataset
fd = fd.withColumn('DELAY', 
                   F.when(fd.ARR_DELAY <= 0, 0)
                   .when(fd.ARR_DELAY > 0, 1)
                   
                  .otherwise(''))
fd.limit(5).toPandas() #convert the first 5 rows to pandas df  

Unnamed: 0,_c0,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN,DEST_AIRPORT_ID,DEST,...,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,DIV_REACHED_DEST,DIV_ACTUAL_ELAPSED_TIME,DIV_ARR_DELAY,DELAY,AIRLINE_NAME,ORIG_AIRPORT_NAME,DEST_AIRPORT_NAME
0,0,2018,1,22,1,UA,14771,SFO,10693,BNA,...,0.0,0.0,0.0,NOT_CANCELLED,,,1,United Air Lines Inc.,"San Francisco, CA: San Francisco International","Nashville, TN: Nashville International"
1,1,2018,1,22,1,UA,12892,LAX,14679,SAN,...,0.0,0.0,0.0,NOT_CANCELLED,,,0,United Air Lines Inc.,"Los Angeles, CA: Los Angeles International","San Diego, CA: San Diego International"
2,2,2018,1,22,1,UA,11292,DEN,12266,IAH,...,0.0,0.0,0.0,NOT_CANCELLED,,,0,United Air Lines Inc.,"Denver, CO: Denver International","Houston, TX: George Bush Intercontinental/Houston"
3,3,2018,1,22,1,UA,11618,EWR,14679,SAN,...,0.0,0.0,0.0,NOT_CANCELLED,,,0,United Air Lines Inc.,"Newark, NJ: Newark Liberty International","San Diego, CA: San Diego International"
4,4,2018,1,22,1,UA,11298,DFW,11292,DEN,...,0.0,0.0,0.0,NOT_CANCELLED,,,0,United Air Lines Inc.,"Dallas/Fort Worth, TX: Dallas/Fort Worth International","Denver, CO: Denver International"


In [13]:
fd.createOrReplaceTempView("fdTable")

In [14]:
#show the first 2 rows
fd.show(1,truncate= False,vertical=True) 

-RECORD 0-----------------------------------------------------------------
 _c0                     | 0                                              
 YEAR                    | 2018                                           
 MONTH                   | 1                                              
 DAY_OF_MONTH            | 22                                             
 DAY_OF_WEEK             | 1                                              
 OP_UNIQUE_CARRIER       | UA                                             
 ORIGIN_AIRPORT_ID       | 14771                                          
 ORIGIN                  | SFO                                            
 DEST_AIRPORT_ID         | 10693                                          
 DEST                    | BNA                                            
 CRS_DEP_TIME            | 1110                                           
 DEP_TIME                | 1120.0                                         
 DEP_DELAY               

In [15]:
#number of columns
len(fd.columns) 

37

In [16]:
#list the tables created
print(spark.catalog.listTables())

[Table(name='fdtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Overall percentage of delayed flights

In [18]:

spark.sql(
    """
    SELECT DELAY,
           COUNT(*) AS count,
           COUNT(*) / (SELECT COUNT(*) FROM fdTable) AS ratio
          
    FROM fdTable
    GROUP BY DELAY
    """
).show()

+-----+--------+--------------------+
|DELAY|   count|               ratio|
+-----+--------+--------------------+
|    0|18296443|  0.6536250034518467|
|    1| 8879967|  0.3172293358346912|
|     |  815853|0.029145660713462144|
+-----+--------+--------------------+



In [19]:
spark.sql(
    """
    SELECT DELAY,
           COUNT(*) AS count,
           COUNT(*) / (SELECT COUNT(*) FROM fdTable WHERE DELAY <> '') AS ratio
           
    FROM fdTable
    GROUP BY DELAY HAVING DELAY <> ''
    """
).show()

+-----+--------+-------------------+
|DELAY|   count|              ratio|
+-----+--------+-------------------+
|    0|18296443| 0.6732472390576975|
|    1| 8879967|0.32675276094230254|
+-----+--------+-------------------+



### Summary statistics of some features

In [22]:
fd.describe('ARR_DELAY','DEP_DELAY','AIR_TIME','DISTANCE').show() #arrival delay summary statistics

+-------+------------------+-----------------+------------------+-----------------+
|summary|         ARR_DELAY|        DEP_DELAY|          AIR_TIME|         DISTANCE|
+-------+------------------+-----------------+------------------+-----------------+
|  count|          27176410|         27255215|          27170032|         27992263|
|   mean|3.4599673393211243|9.204037135645416|110.06370816935365|788.0306251766783|
| stddev| 48.84417998169425|46.82668415236928| 69.31017374810538|582.9018961437572|
|    min|           -1290.0|          -1280.0|           -1244.0|             16.0|
|    max|            3864.0|           3890.0|            1557.0|           5812.0|
+-------+------------------+-----------------+------------------+-----------------+



### Number of departure airports and airlines

In [23]:
spark.sql(
    """
    SELECT COUNT(distinct(ORIGIN_AIRPORT_ID)) AIRPORTS,
           COUNT(distinct(OP_UNIQUE_CARRIER)) AS AIRLINES
    FROM fdTable
    """
).show()

+--------+--------+
|AIRPORTS|AIRLINES|
+--------+--------+
|     388|      28|
+--------+--------+



### Which route has the most flights

In [25]:
query = "SELECT ORIG_AIRPORT_NAME, DEST_AIRPORT_NAME, COUNT(*) as N FROM fdTable GROUP BY ORIG_AIRPORT_NAME, DEST_AIRPORT_NAME order by COUNT(*) desc"
flight_counts = (spark.sql(query)).show(5,truncate= False)

+----------------------------------------------+----------------------------------------------+-----+
|ORIG_AIRPORT_NAME                             |DEST_AIRPORT_NAME                             |N    |
+----------------------------------------------+----------------------------------------------+-----+
|San Francisco, CA: San Francisco International|Los Angeles, CA: Los Angeles International    |55141|
|Los Angeles, CA: Los Angeles International    |San Francisco, CA: San Francisco International|55132|
|Los Angeles, CA: Los Angeles International    |Las Vegas, NV: McCarran International         |43677|
|Las Vegas, NV: McCarran International         |Los Angeles, CA: Los Angeles International    |43583|
|Chicago, IL: Chicago O'Hare International     |New York, NY: LaGuardia                       |43371|
+----------------------------------------------+----------------------------------------------+-----+
only showing top 5 rows



### What departure airports tend to have delays?

In [27]:
query1 = "SELECT ORIG_AIRPORT_NAME, SUM(DELAY) as DELAYS, COUNT(*) As FLIGHTS, SUM(DELAY)/COUNT(*) As DELAY_PCT FROM fdTable GROUP BY ORIG_AIRPORT_NAME order by SUM(DELAY)/COUNT(*) desc"
delays_per_departure_airport = (spark.sql(query1)).show(5,truncate= False)

+-------------------------------------------------+------+-------+------------------+
|ORIG_AIRPORT_NAME                                |DELAYS|FLIGHTS|DELAY_PCT         |
+-------------------------------------------------+------+-------+------------------+
|Youngstown/Warren, OH: Youngstown-Warren Regional|2.0   |2      |1.0               |
|Cold Bay, AK: Cold Bay Airport                   |165.0 |262    |0.6297709923664122|
|Pago Pago, TT: Pago Pago International           |174.0 |291    |0.5979381443298969|
|Wilmington, DE: New Castle                       |94.0  |165    |0.5696969696969697|
|Ogden, UT: Ogden-Hinckley                        |250.0 |460    |0.5434782608695652|
+-------------------------------------------------+------+-------+------------------+
only showing top 5 rows



### What airlines tend to have delays?

In [29]:
query2 = "SELECT AIRLINE_NAME, SUM(DELAY) as DELAYS, COUNT(*) As FLIGHTS, SUM(DELAY)/COUNT(*) As DELAY_PCT FROM fdTable GROUP BY AIRLINE_NAME order by SUM(DELAY)/COUNT(*) desc"
delays_per_airline = (spark.sql(query2)).show(5,truncate= False)

+-----------------------------------------+--------+-------+-------------------+
|AIRLINE_NAME                             |DELAYS  |FLIGHTS|DELAY_PCT          |
+-----------------------------------------+--------+-------+-------------------+
|Peninsula Airways Inc.                   |1351.0  |2783   |0.4854473589651455 |
|Commutair Aka Champlain Enterprises, Inc.|53847.0 |124937 |0.4309932205831739 |
|Allegiant Air                            |188193.0|476967 |0.3945618879293536 |
|JetBlue Airways                          |414425.0|1083212|0.38258900381458105|
|Trans States Airlines                    |61387.0 |161590 |0.379893557769664  |
+-----------------------------------------+--------+-------+-------------------+
only showing top 5 rows



### What year&month has the highest delay percentage?

In [30]:
query3 = "SELECT MONTH, SUM(DELAY) as DELAYS, COUNT(*) As FLIGHTS, SUM(DELAY)/COUNT(*) As DELAY_PCT FROM fdTable GROUP BY MONTH order by SUM(DELAY)/COUNT(*) desc"
delays_per_yrmonth = (spark.sql(query3)).toPandas()

In [31]:
!ls -lh

total 90M
-rw-r--r-- 1 User None 46K Oct 25 23:47 MIT 805 Assignment 2-Copy1.ipynb
-rw-r--r-- 1 User None 45M Oct 29 12:37 MIT 805 Assignment 2-Copy2.ipynb
-rw-r--r-- 1 User None 31K Oct 30 19:54 MIT 805 Assignment 2-Final.ipynb
-rw-r--r-- 1 User None 45M Oct 29 10:53 MIT 805 Assignment 2.ipynb
-rw-r--r-- 1 User None  72 Oct 29 10:51 Untitled.ipynb
