Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## DataFrame Exercises
In this notebook your job is to implement multiple small methods that process and analyze airtraffic data with DataFrames. DataFrames can be queried with SQL language and through SparkSQL API. Both of them can be used to implement methods in these exercises. The links below may be helpful:

- http://spark.apache.org/docs/latest/sql-programming-guide.html
- https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

We will use a sample of "2008.csv.bz2" which contains airtraffic data from https://dataverse.harvard.edu/api/access/datafile/1374917?gbrecs=true.

There are already two Spark SQL tables available from the start:

- table "carriers" inlcudes information of airlines
- table "airports" includes information of airports


In [1]:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
%matplotlib inline

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("main")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()\

#names of tables
airTraffic = "airtraffic"
carriers = "carriers"
airports = "airports"

carriersTable = spark.read.csv("carriers.csv", inferSchema="true", header="true")
carriersTable.createOrReplaceTempView(carriers)

airportsTable = spark.read.csv("airports.csv", inferSchema="true", header="true")
airportsTable.createOrReplaceTempView(airports)



22/10/10 17:52:45 WARN Utils: Your hostname, bigdata2022-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/10/10 17:52:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/10 17:52:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Methods and variables that will be used in more than one tests

# Test if arrays that contain Row are equal
def correctRows(testArray, correctArray):
    for i in range(0, len(correctArray)):
        assert testArray[i].asDict() == correctArray[i].asDict(), "the row was expected to be %s but it was %s" % (correctArray[i].asDict(), testArray[i].asDict())

# Path of smaller airtraffic data set
sampleFile = "2008_sample.csv"
testFile = "2008_testsample.csv"
testFile2 = "2008_testsample2.csv"

## Load Data and Register 
`loadDataAndRegister` loads airtraffic data and registers it as a table so that we can use it later for Spark SQL. 

param `path`: path of file that should be loaded and registered.

`return`: DataFrame containing airtraffic information.

The schema of returned DataFrame should be:

Name | Type
------| :-----
Year  | integer (nullable = true)
Month | integer (nullable = true)
DayofMonth | integer (nullable = true)
DayOfWeek | integer (nullable = true)
DepTime | integer (nullable = true)
CRSDepTime | integer (nullable = true)
ArrTime | integer (nullable = true)
CRSArrTime | integer (nullable = true)
UniqueCarrier | string (nullable = true)
FlightNum | integer (nullable = true)
TailNum | string (nullable = true)
ActualElapsedTime | integer (nullable = true)
CRSElapsedTime | integer (nullable = true)
AirTime | integer (nullable = true)
ArrDelay | integer (nullable = true)
DepDelay | integer (nullable = true)
Origin | string (nullable = true)
Dest | string (nullable = true)
Distance | integer (nullable = true)
TaxiIn | integer (nullable = true)
TaxiOut | integer (nullable = true)
Cancelled | integer (nullable = true)
CancellationCode | string (nullable = true)
Diverted | integer (nullable = true)
CarrierDelay | integer (nullable = true)
WeatherDelay | integer (nullable = true)
NASDelay | integer (nullable = true)
SecurityDelay | integer (nullable = true)
LateAircraftDelay | integer (nullable = true)

Hints:
- How to load csv data: https://spark.apache.org/docs/latest/api/python//reference/api/pyspark.sql.DataFrameReader.csv.html
- If you just load data using `inferSchema="true"`, some of the fields which shoud be Integers are casted to Strings because null values are represented as "NA" strings in the data. E.g. 2008,7,2,3,733,735,858,852,DL,1551,N957DL,85,77,42,6,-2,CAE, ATL,191,15,28,0,,0,NA,NA,NA,NA,NA. Therefore you need to replace all "NA" strings with null. Option "nullValue" is helpful.
- Please use the variable `airTraffic` as table name.

In [3]:
def loadDataAndRegister(path):
    df = spark.read.csv(path, inferSchema="true", header=True, nullValue="NA")
    df = df.replace("NA", None)
    df.createOrReplaceTempView(airTraffic)
    
    return(df)

In [4]:
# example print

data = loadDataAndRegister(testFile)
data.show(5)
data.schema

22/10/10 17:52:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         4|        7|    632|       615|    756|       735|          02Q|     4794

StructType(List(StructField(Year,IntegerType,true),StructField(Month,IntegerType,true),StructField(DayofMonth,IntegerType,true),StructField(DayOfWeek,IntegerType,true),StructField(DepTime,IntegerType,true),StructField(CRSDepTime,IntegerType,true),StructField(ArrTime,IntegerType,true),StructField(CRSArrTime,IntegerType,true),StructField(UniqueCarrier,StringType,true),StructField(FlightNum,IntegerType,true),StructField(TailNum,StringType,true),StructField(ActualElapsedTime,IntegerType,true),StructField(CRSElapsedTime,IntegerType,true),StructField(AirTime,IntegerType,true),StructField(ArrDelay,IntegerType,true),StructField(DepDelay,IntegerType,true),StructField(Origin,StringType,true),StructField(Dest,StringType,true),StructField(Distance,IntegerType,true),StructField(TaxiIn,IntegerType,true),StructField(TaxiOut,IntegerType,true),StructField(Cancelled,IntegerType,true),StructField(CancellationCode,StringType,true),StructField(Diverted,IntegerType,true),StructField(CarrierDelay,IntegerType

In [5]:
'''loadDataAndRegister tests'''

df = loadDataAndRegister(testFile)

# Table "airtraffic" should exists
assert spark.sql("SHOW TABLES Like 'airtraffic'").count() == 1, "there was expected to be a table called 'airtraffic'"

# Columns should have correct values
third = df.collect()[2]
correctRow = Row(Year=2008, Month=5, DayofMonth=6, DayOfWeek=2, DepTime=611,
                             CRSDepTime=615, ArrTime=729, CRSArrTime=735, UniqueCarrier='EV',
                             FlightNum=4794, TailNum='N916EV', ActualElapsedTime=78,
                             CRSElapsedTime=80, AirTime=58, ArrDelay=-6, DepDelay=-4,
                             Origin='ROA', Dest='ATL', Distance=357, TaxiIn=9, TaxiOut=11,
                             Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=None,
                             WeatherDelay=None, NASDelay=None, SecurityDelay=None,
                             LateAircraftDelay=None).asDict()

assert third.asDict() == correctRow, "the row was expected to be %s but it was %s" % (correctRow, third.asDict())


## Flight Count
`flightCount` gets the number of flights for each airplane. The "TailNum" column is unique for each airplane so it can be used.

param `df`: Airtraffic DataFrame created using `loadDataAndRegister`.

`return`: DataFrame containing number of flights per TailNum. DataFrame should include columns "TailNum" and "count" (the number of flights for an airplane) . Airplanes whose TailNum is null should not be included in the returned DataFrame. **The returned DataFrame should be sorted by count in descending order.** 

Hint: use dataframe methods instead of sql

Example output:

TailNum|count
-------:|-----
N693BR| 1526|
N646BR| 1505|
N476HA| 1490|
N485HA| 1441|
N486HA| 1439|


In [6]:
def flightCount(df):
    df = df.groupBy("TailNum").count()
    df = df.na.drop()
    df = df.orderBy(("count"), ascending=False)
    return(df)

In [7]:
# example print
data = loadDataAndRegister(sampleFile)
flightCount(data).show(5)

+-------+-----+
|TailNum|count|
+-------+-----+
| N515MJ|    2|
| N317AE|    2|
| N479HA|    1|
| N909FJ|    1|
| N464AA|    1|
+-------+-----+
only showing top 5 rows



In [8]:
'''flightCount tests'''

data = loadDataAndRegister(testFile2)
        
correct = [Row(TailNum='N881AS', count=5),
           Row(TailNum='N886AS', count=3),
           Row(TailNum='N824AS', count=2)]

#print(flightCount(data).take(3))

correctRows(flightCount(data).take(3), correct)


### You can either use Spark SQL or normal DataFrame (given as parameter) transformations to implement the methods below.

## Cancelled Due to Security
`cancelledDueToSecurity` finds which flights were cancelled due to security reasons. 

`return`: DataFrame containing flights which were cancelled due to security reasons (CancellationCode = "D"). Columns "FlightNum" and "Dest" should be included.

Example output:

FlightNum|Dest|
----:|-------
4285| DHN|
4790| ATL|
3631| LEX|
3632| DFW|

In [9]:
def cancelledDueToSecurity(df):
    df = df.where(df.CancellationCode == "D").select("FlightNum", "Dest")
    return(df)

In [10]:
# example print

data = loadDataAndRegister(sampleFile)
cancelledDueToSecurity(data).show(5)

+---------+----+
|FlightNum|Dest|
+---------+----+
|     1642| LAS|
|      585| MSP|
+---------+----+



In [11]:
'''cancelledDueToSecurity tests'''

data = loadDataAndRegister(testFile)
correct = [Row(FlightNum=4794, Dest='JFK'), Row(FlightNum=4794, Dest='ATL')]
correctRows(cancelledDueToSecurity(data).collect(), correct)


## Longest Weather Delay
`longestWeatherDelay` finds the longest weather delay between January and March (1.1-31.3).

`return`: DataFrame containing the longest weather delay.

Example output:

|_c0|
|-------:|
|1148|

In [12]:
def longestWeatherDelay(df):
    df = df.select("WeatherDelay").where(df.Month.between(1,3)) # select weatherdelay values to df, where month is between January and March
    df = df.na.drop() # drop na values
    df = df.orderBy(("WeatherDelay"), ascending=False).limit(1) # oreder descending, limit to top item
    df = df.withColumn("WeatherDelay", df.WeatherDelay.cast('int')) # cast to integer
    
    return(df)
    raise NotImplementedError()

In [13]:
# example print

data = loadDataAndRegister(sampleFile)
longestWeatherDelay(data).show()

+------------+
|WeatherDelay|
+------------+
|          40|
+------------+



In [14]:
'''longestWeatherDelay tests'''

data = loadDataAndRegister(testFile)
test = longestWeatherDelay(data).first()[0]

assert test == 7, "the longest weather delay was expected to be 7 but it was %s" % test



## Did Not Fly
`didNotFly` finds which airlines didn't have flights. 

`return`: DataFrame containig descriptions (names) of airlines that didn't have flights.

Example output:

|         Description|
|--------------------|
|Aero Transcolombiana|
|Transmeridian Air...|
|Luftransport-Unte...|
|Euro Atlantic Air...|
|    Pearson Aircraft|


Hints:
- Schema "UniqueCarrier" (the code of airline) of table "airtraffic" can be used when implementing this method.
- Table "carriers" containing airlines' names is already loaded to "carriersTable" object at the beginning.
- Cancelled flights are not counted as "did not fly".

In [15]:
def didNotFly(df):
    df = spark.sql(
    "SELECT carriers.Description "+
    "FROM carriers "+
    "ANTI JOIN airtraffic "+
    "ON airtraffic.UniqueCarrier = carriers.Code"
    )
    return df

In [16]:
# example print

data = loadDataAndRegister(sampleFile)
didNotFly(data).show(5)

+--------------------+
|         Description|
+--------------------+
|       Titan Airways|
|  Tradewind Aviation|
|     Comlux Aviation|
|Master Top Linhas...|
| Flair Airlines Ltd.|
+--------------------+
only showing top 5 rows



In [17]:
'''didNotFly tests'''

data = loadDataAndRegister(testFile)
test = didNotFly(data).count()

assert test == 1489, "the amount of airlines that didn't fly was expected to be 1489 but it was %s" % test


## Flights from Vegas to JFK
`flightsFromVegasToJFK` finds airlines that fly from Vegas to JFK.

`return`: DataFrame containing columns "Descriptions" (names of airlines) and "Num" (number of flights). **The DataFrame should be sorted by Num in descending order.**

Example output:

|         Description|Num|
|--------------------|---|
|     JetBlue Airways|566|
|Delta Air Lines Inc.|441|
|US Airways Inc. (...|344|
|American Airlines...|121|

Hints:
- Vegas iasa code: LAS. JFK iasa code: JFK

In [18]:
def flightsFromVegasToJFK(df):
    df = df.join(carriersTable, df.UniqueCarrier==carriersTable.Code, "outer")
    df = df.select("Description").where((df.Origin == "LAS") & (df.Dest == "JFK"))
    df = df.groupBy("Description").count()
    df = df.withColumnRenamed("count", "Num")
    
    
    df = df.orderBy(("count"), ascending=False)
    df = df.orderBy(("Description"), ascending=False)
    return df

In [19]:
# example print

data = loadDataAndRegister(sampleFile)
flightsFromVegasToJFK(data).show(5)

+--------------------+---+
|         Description|Num|
+--------------------+---+
|Pinnacle Airlines...|  1|
|Northwest Airline...|  1|
+--------------------+---+



In [20]:
'''flightsFromVegasToJFK tests'''

data = loadDataAndRegister(testFile)
correct = [Row(Description='Titan Airways', Num=1),
           Row(Description='Atlantic Southeast Airlines', Num=1)]
correctRows(flightsFromVegasToJFK(data).collect(), correct)


## Time Spent in Taxiing
`timeSpentTaxiing` finds how much time airplanes spent in moving from gate to the runway and vise versa at an airport on average. 

`return`: DataFrame contains the average time spent in taxiing per airport. The DataFrame should contain columns "airport" (iata codes of airports) and "taxi" (the average time spent in taxiing). **The DataFrame should be sorted by "taxi" in ascending order.**

Example output:

|airport|             taxi|
|-------|-----------------|
|    DLG|              4.0|
|    BRW|5.051010191310567|
|    OME|6.034800675790983|
|    AKN|             6.75|
|    SCC|6.842553191489362|

Hints:
- Columns "TaxiIn" and "TaxiOut" tells time spend in taxiing. "TaxiIn" means time spent in taxiing in departure ("Origin") airport and "TaxiOut" spent in taxiing in arrival ("Dest") airport. The wanted average is (average taxiing at origin for a given destination + average taxiing at destination for a given matching origin) / 2.
- Try the "inner join".

In [40]:
def timeSpentTaxiing(df):
    df = df.select("Origin", "Dest", "TaxiIn", "TaxiOut")
    t1 = df.groupBy("Origin").agg({"TaxiIn": "avg"})
    t2 = df.groupBy("Dest").agg({"TaxiOut": "avg"})
    df = t1.join(t2,t1.Origin==t2.Dest)
    df = df.withColumnRenamed("avg(TaxiIn)", "taxin")
    df = df.withColumnRenamed("avg(TaxiOut)", "taxio")
    
    df = df.withColumn("taxi", (df.taxin + df.taxio)/2)
    df = df.drop("taxin","dest","taxio")    
    df = df.select("Origin", "taxi").withColumnRenamed("Origin", "airport")
    return df

In [41]:
# example print

data = loadDataAndRegister(sampleFile)
timeSpentTaxiing(data).show(5)

+-------+----+
|airport|taxi|
+-------+----+
|    GEG|7.75|
|    OAK|13.0|
|    DCA|10.0|
|    IAH|11.0|
|    HNL| 6.0|
+-------+----+
only showing top 5 rows



In [42]:
'''timeSpentTaxiing tests'''

data = loadDataAndRegister(testFile)
correct = [Row(airport='LAS', taxi=11.0), Row(airport='JFK', taxi=13.25)]
correctRows(timeSpentTaxiing(data).collect(), correct)


## Distance Median
`distanceMedian` finds the median travel distance.

`return`: DataFrame containing the median travel distance.

Example output:

|_ c0|
|---|
|583.0|

Hints:
- Schema "Distance" of table "airtraffic" contains distance information.
- You should use exact percentile functions like Spark SQL build-in [percentile function](https://spark.apache.org/docs/latest/api/sql/index.html#percentile).  
- What does percentile mean? Please check: https://en.wikipedia.org/wiki/Percentile#Third_variant and http://onlinestatbook.com/2/introduction/percentiles.html

In [24]:
def distanceMedian(df):
    df = spark.sql("""
    select percentile(Distance,0.5) from airtraffic
    """)
    return df
    raise NotImplementedError()

In [25]:
# example print

data = loadDataAndRegister(sampleFile)
distanceMedian(data).show()

+----------------------------+
|percentile(Distance, 0.5, 1)|
+----------------------------+
|                       507.5|
+----------------------------+



In [26]:
'''distanceMedian tests'''

data = loadDataAndRegister(testFile)
test = distanceMedian(data).first()[0]
assert test == 357.0, "the distance median was expected to be 357.0 but it was %s" % test


## Score95
`score95` finds the percentile, below which 95% of the delay (CarrierDelay) observations may be found. 

return: DataFrame containing the 95th percentile of carrier delay. 

Example output:

|_ c0|
|----|
|77.0|

Hints:
- You should use exact percentile functions like Spark SQL build-in [percentile function](https://spark.apache.org/docs/latest/api/sql/index.html#percentile). 

In [27]:
def score95(df):
    df = spark.sql("""
    select percentile(Carrierdelay,0.95) from airtraffic
    """)
    return df

In [28]:
# example print

data = loadDataAndRegister(sampleFile)
score95(data).show()

+---------------------------------+
|percentile(Carrierdelay, 0.95, 1)|
+---------------------------------+
|                33.85000000000002|
+---------------------------------+



In [29]:
'''score95 tests'''

data = loadDataAndRegister(testFile)
test = score95(data).first()[0]
assert test == 17.0, "the score95 was expected to be 17.0 but it was %s" % test


## Cancelled Flights
`cancelledFlights` finds airports where flights were cancelled. 

return: DataFrame containing columns "airport", "city" and "percentage". 
- Columns "airport" and "city" can be found from table "airports". Column "percentage" is the cancellation percentage of each airport (number of cancelled flights/total of flights).
- **The returned DataFrame should be sorted by "percentage" and secondly by "airport" both in descending order.**

Example output:

|             airport|       city|         percentage|
|--------------------|-----------|-------------------|
|Pellston Regional...|   Pellston| 0.3157894736842105|
|  Waterloo Municipal|   Waterloo|               0.25|
|  Telluride Regional|  Telluride|0.21084337349397592|
|Houghton County M...|    Hancock|0.19834710743801653|
|Rhinelander-Oneid...|Rhinelander|            0.15625|

In [30]:
def cancelledFlights(df):
    df = df.join(airportsTable, df.Origin ==  airportsTable.iata , "inner")
    df = df.select("airport", "city", "cancelled")
    
    t1 = df.select("airport", "cancelled").where(df.cancelled==1)
    t1 = t1.withColumnRenamed("airport", "ap")
    
    t2 = df.groupBy("airport", "city").count()
    t2 = t2.withColumnRenamed("count", "total")
    
    t2 = t2.join(t1, t1.ap ==  t2.airport , "inner")
    t2 = t2.withColumn("percentage", (t2.cancelled/t2.total))
    
    t2 = t2.select("airport","city","percentage")
    t2 = t2.sort((t2.percentage).desc(),(t2.airport).asc())
    return t2

In [31]:
# example print

data = loadDataAndRegister(sampleFile)
cancelledFlights(data).show(5)

+--------------------+-----------------+-------------------+
|             airport|             city|         percentage|
+--------------------+-----------------+-------------------+
|Orlando Internati...|          Orlando|                1.0|
| Salt Lake City Intl|   Salt Lake City| 0.3333333333333333|
|Dallas-Fort Worth...|Dallas-Fort Worth|0.14285714285714285|
+--------------------+-----------------+-------------------+



In [32]:
'''cancelledFlights tests'''

data = loadDataAndRegister(testFile)
correct = [Row(airport='McCarran International', city='Las Vegas', percentage=0.5),
           Row(airport='Roanoke Regional/ Woodrum ', city='Roanoke', percentage=0.25)]
correctRows(cancelledFlights(data).collect(), correct)


## Least Squares
`leastSquares` calculates the [linear least squares](https://en.wikipedia.org/wiki/Linear_least_squares) approximation for relationship between DepDelay and WeatherDelay (y=bx+c, where x represents DepDelay and y WeatherDelay, b is the slope and c constant term). We want to predict WeatherDelay.

`return`: tuple that has the constant term first and the slope second. If least squares can not be calculated, return 0.0 as terms.

Hints:
- Filter out entries where DepDelay<0 before calculating the linear least squares.
- There are definitely multiple datapoints for a single DepDelay value so calculate the average WeatherDelay per DepDelay.
- These links may be helpful:
    - https://en.wikipedia.org/wiki/Simple_linear_regression#Fitting_the_regression_line
    - http://www.neoprogrammics.com/linear_least_squares_regression
    - https://www.youtube.com/watch?v=JvS2triCgOY

In [33]:
def leastSquares(df):
    df = df.filter(df.DepDelay<0)
    return rmse

In [34]:
# example print

data = loadDataAndRegister(sampleFile)
leastSquares(data)

NameError: name 'rmse' is not defined

In [594]:
data = loadDataAndRegister(testFile)
test = leastSquares(data)
assert test == (952.0, -56.0), "the answer was expected to be (952.0, -56.0) but it was %s" % test


+----+
|rmse|
+----+
| 9.0|
+----+



NotImplementedError: 

In [None]:
spark.stop()