# Big Data Project

The goal of the project is to infer qualitative data regarding USA flights during the years 1994-2008. The data can be downloaded from [stat-computing.org](http://stat-computing.org/dataexpo/2009/the-data.html).

In this notebook we are computing a weekly "**penalty**" score for each airport that depends on both the its incoming and outgoing
flights. The score adds `0.5` for each incoming flight that is more than _15 minutes_ late, and `1` for
each outgoing flight that is more than _15 minutes_ late.

### Initialize PySpark

In [1]:
# Find Apache Spark on this machine
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# Build a Spark SQL Session for DataFrames
master = 'local[2]'
appName = 'Airport Weekly Penalty'
spark = SparkSession \
    .builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

### Load Data

In [3]:
from pathlib import Path
full_data = '../dataset/*.csv.bz2'
penalty_data = '../dataset/penalty_dataset.parquet'

path = Path(penalty_data)
# If reduced dataset is not found, load the full compressed dataset and reduce it.
# This is going to take lot of time. Just wait.
if not path.is_dir():
    df = spark.read.csv(full_data, inferSchema=True, header=True, sep=',')
    df.select(df['Year'], df['Month'], df['DayofMonth'], \
          df['DayOfWeek'], df['DepTime'], df['CRSDepTime'], \
          df['ArrTime'], df['CRSArrTime'], df['ArrDelay'], \
          df['DepDelay'], df['Origin'], df['Dest'], df['Cancelled']) \
    .replace('NA', None) \
    .write \
    .save(penalty_data, format='parquet')

# Load the reduced dataset
df = spark.read.load(penalty_data)

In [4]:
# Explore the data
df.printSchema()
df.show(10)

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: integer (nullable = true)

+----+-----+----------+---------+-------+----------+-------+----------+--------+--------+------+----+---------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|ArrDelay|DepDelay|Origin|Dest|Cancelled|
+----+-----+----------+---------+-------+----------+-------+----------+--------+--------+------+----+---------+
|1995|    1|         6|        5|    657|       645|    952|       937|      15|      12|   ORD| PHL|        0|
|1995|    1|         7|   

In [5]:
# Drop entries with 'na' departure and arrival time
df = df.dropna(subset=['DepTime', 'ArrTime'])

# Parse dates to datetime format
import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType, IntegerType

make_date = lambda year, month, day : datetime.datetime(year, month, day) 
make_date = F.udf(make_date, TimestampType())

week_year = lambda date : date.isocalendar()[1]
week_year = F.udf(week_year, IntegerType())

df = df.select(make_date(df['Year'], df['Month'], df['DayofMonth']).alias('Date'), \
               'DayOfWeek', 'DepTime', 'ArrTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest')
df = df.select('Date', week_year('Date').alias('WeekYear'), 'ArrDelay', 'DepDelay', 'Origin', 'Dest')
# df.show(10)

In [6]:
# Flights that are more than 15 minutes late
left_late = df.filter(df['DepDelay'] > 15)
arrived_late = df.filter(df['ArrDelay'] > 15)

# Number of times per week an airport had a departure or an arrival more than 15 minutes late
incoming_late = arrived_late.groupBy([F.year('Date').alias('Year'), 'WeekYear', arrived_late['Dest'].alias('Airport')]).count()
outgoing_late = left_late.groupBy([F.year('Date').alias('Year'), 'WeekYear', left_late['Origin'].alias('Airport')]).count()

# incoming_late.show(10)
# outgoing_late.show(10)

In [7]:
# Penalties on arrivals and departures
incoming_factor = 0.5
outgoing_factor = 1.0

incoming_penalty = incoming_late.select('Year', 'WeekYear', 'Airport', (incoming_late['count'] * incoming_factor).alias('Penalty'))
outgoing_penalty = outgoing_late.select('Year', 'WeekYear', 'Airport', (outgoing_late['count'] * outgoing_factor).alias('Penalty'))

# incoming_penalty.show(10)
# outgoing_penalty.show(10)

In [8]:
# Sum up the penalties
penalties = incoming_penalty.unionAll(outgoing_penalty) \
                .groupBy('Year', 'WeekYear', 'Airport').sum('Penalty') \
                .withColumnRenamed('sum(Penalty)', 'WeeklyPenalty')

# penalties.show(10)

In [9]:
# Store output Dataframe (or load it if already existing)
final_dataset = '../dataset/penalty_analitics.parquet'

path= Path(final_dataset)
if not path.is_dir():
    penalties.write.mode('overwrite').save(final_dataset, format='parquet')
else:
    penalties = spark.read.load(final_dataset)

In [10]:
# Output a list of tuples of schema:
# ('Year', 'WeekYear', 'Airport', 'WeeklyPenalty')
penalty_data = penalties.rdd.map(tuple).collect()
print(penalty_data[:100])

[(1995, 4, 'OAK', 356.5), (1995, 1, 'OMA', 110.0), (1995, 4, 'OTZ', 4.0), (1995, 3, 'FCA', 2.0), (1995, 4, 'CDV', 2.0), (1995, 6, 'FCA', 2.5), (1995, 7, 'KSM', 0.5), (1995, 9, 'ABQ', 198.5), (1995, 9, 'BOS', 775.5), (1995, 9, 'BZN', 11.5), (1995, 6, 'CHS', 38.0), (1995, 11, 'SAT', 100.0), (1995, 13, 'MYR', 2.0), (1995, 13, 'ORD', 1249.0), (1995, 11, 'LAN', 9.0), (1995, 17, 'TPA', 165.5), (1995, 14, 'MBS', 16.0), (1995, 16, 'AUS', 222.5), (1995, 20, 'PNS', 25.0), (1995, 18, 'RDU', 64.5), (1995, 18, 'LBB', 31.5), (1995, 21, 'MFR', 8.5), (1995, 22, 'AGS', 12.0), (1995, 22, 'BOS', 240.5), (1995, 22, 'BRW', 8.0), (1995, 22, 'CAK', 2.5), (1995, 18, 'DAY', 32.0), (1995, 23, 'MFR', 8.0), (1995, 31, 'SLC', 482.0), (1995, 28, 'OME', 8.0), (1995, 29, 'ORF', 79.5), (1995, 28, 'DTW', 1047.0), (1995, 31, 'LAN', 20.5), (1995, 33, 'ORD', 2648.0), (1995, 32, 'ERI', 6.5), (1995, 34, 'JAC', 5.5), (1995, 35, 'LAN', 2.5), (1995, 35, 'BDL', 67.0), (1995, 37, 'TVC', 1.0), (1995, 39, 'GSO', 51.0), (1995, 37, 