# Middleware Project: Big Data

## Initialization

In [1]:
DATASET_DIR = './dataset/'

In [2]:
# imports
import operator
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from time import time

In [3]:
# initialize spark
sc = pyspark.SparkContext("local[4]", "MiddlewareProject")  # 4 threads
spark = SparkSession.builder.appName("MiddlewareProject").getOrCreate()

## test vari

In [4]:
# come aprire un file
df = spark.read.csv(DATASET_DIR + 'short.csv', header=True)

In [5]:
# roba con dataframe
df.first()

Row(Year='1994', Month='1', DayofMonth='7', DayOfWeek='5', DepTime='858', CRSDepTime='900', ArrTime='954', CRSArrTime='1003', UniqueCarrier='US', FlightNum='227', TailNum='NA', ActualElapsedTime='56', CRSElapsedTime='63', AirTime='NA', ArrDelay='-9', DepDelay='-2', Origin='CLT', Dest='ORF', Distance='290', TaxiIn='NA', TaxiOut='NA', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')

In [6]:
df.count()

100000

In [7]:
# replace 'NA' with null values
df = df.replace('NA', None)

In [8]:
# find columns with null values
for col in df.columns:
    num = df.filter(df[col].isNull()).count()
    if num > 0:
        print("{:17} -> {}".format(col, num))

DepTime           -> 5754
ArrTime           -> 6135
TailNum           -> 100000
ActualElapsedTime -> 6135
AirTime           -> 100000
ArrDelay          -> 6135
DepDelay          -> 5754
TaxiIn            -> 100000
TaxiOut           -> 100000
CancellationCode  -> 100000
CarrierDelay      -> 100000
WeatherDelay      -> 100000
NASDelay          -> 100000
SecurityDelay     -> 100000
LateAircraftDelay -> 100000


In [9]:
# cast columns to correct type
intcolumns = ['Year', 'Month', 'DayofMonth', 'DayofWeek', 'DepTime', 'CRSDEPTime', 'ArrTime', 'CRSArrTime',
              'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Distance', 'CarrierDelay',
              'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'Cancelled', 'Diverted']

for c in intcolumns:
    df = df.withColumn(c, df[c].cast('int'))

In [10]:
df.first()

Row(Year=1994, Month=1, DayofMonth=7, DayofWeek=5, DepTime=858, CRSDEPTime=900, ArrTime=954, CRSArrTime=1003, UniqueCarrier='US', FlightNum='227', TailNum=None, ActualElapsedTime=56, CRSElapsedTime=63, AirTime=None, ArrDelay=-9, DepDelay=-2, Origin='CLT', Dest='ORF', Distance=290, TaxiIn=None, TaxiOut=None, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=None, WeatherDelay=None, NASDelay=None, SecurityDelay=None, LateAircraftDelay=None)

In [11]:
## to pandas dataframe
# dfpd = df.toPandas()

In [12]:
# dfpd.head(5)

In [13]:
# dfpd.describe()

## Actual assignment

In [18]:
# https://spark.apache.org/docs/latest/rdd-programming-guide.html

### Initialization

In [4]:
# years = range(1994, 2009)
years = range(2004, 2005)
path = DATASET_DIR + '{' + ','.join(str(y) for y in years) + '}.csv'
print('Loading from:', path)

# load entire file
df = spark.read.csv(path, header=True)
print('Number of entries:', df.count())

Loading from: ./dataset/{2004}.csv
Number of entries: 7129270


In [5]:
# replace 'NA' with null values
df = df.replace('NA', None)

In [6]:
# cast columns to correct type
intcolumns = ['Year', 'Month', 'DayofMonth', 'DayofWeek', 'DepTime', 'CRSDEPTime', 'ArrTime', 'CRSArrTime',
              'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Distance', 'CarrierDelay',
              'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'Cancelled', 'Diverted']

for c in intcolumns:
    df = df.withColumn(c, df[c].cast('int'))
df.first()

Row(Year=2004, Month=1, DayofMonth=12, DayofWeek=1, DepTime=623, CRSDEPTime=630, ArrTime=901, CRSArrTime=915, UniqueCarrier='UA', FlightNum='462', TailNum='N805UA', ActualElapsedTime=98, CRSElapsedTime=105, AirTime=80, ArrDelay=-14, DepDelay=-7, Origin='ORD', Dest='CLT', Distance=599, TaxiIn='7', TaxiOut='11', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0)

### Percentage of canceled flights per day

In [17]:
# the percentage of canceled flights per day

### Weekly percentages of delays due to weather

In [14]:
from datetime import datetime

# get RDD
raw_data = df.rdd

# filter out flights with no arrival delay
delayed_flights = raw_data.filter(lambda r: r['ArrDelay'] is not None and r['ArrDelay'] > 0)

# add the week as key
def find_week(row):
    '''Returns the week as a tuple (year, week in the year)'''
    d = datetime(year=row['Year'], month=row['Month'], day=row['DayofMonth'])
    ic = d.isocalendar()
    return (ic[0], ic[1])

indexed_by_week = delayed_flights.map(lambda r: (find_week(r), r))

# check for each flight if it had any weather delay
# also map to 1 to count delayed flights
flight_delays = indexed_by_week.map(lambda t: (t[0], (1, 1 if t[1]['WeatherDelay'] else 0)))

# count number flights with delay and number of flights with weather delay, for each week
num_delays = flight_delays.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# compute percentage of weather delays over all delayed flights, for each week
weather_delay_ratios = num_delays.map(lambda t: (t[0], 100 * t[1][1] / t[1][0]))

In [15]:
# execute query
starttime = time()
res = weather_delay_ratios.collect()
endtime = time() - starttime
print('Time taken: {:.5} seconds'.format(endtime))

# order in cronological order
res = sorted(res, key=lambda x: x[0][0]*53 + x[0][1])

Time taken: 57.835 seconds


In [16]:
print(res[:10])

[((2004, 1), 5.62224301563962), ((2004, 2), 4.541806567848661), ((2004, 3), 4.810908625637464), ((2004, 4), 3.5868844789270535), ((2004, 5), 6.811728191243478), ((2004, 6), 5.502487404040243), ((2004, 7), 3.408539543910723), ((2004, 8), 1.7978157378885466), ((2004, 9), 4.413975082668092), ((2004, 10), 3.3621790324868774)]


In [22]:
# TODO visualizzazione
# plot per settimana

In [23]:
## "a week" = tutti i voli con orario di partenza in quella settimana
## "delay" interpreto come arrival delay. 
## Testato, non ci sono weather delay che vengono "recuperati" all'arrivo

## versione 1: (accettata dal prof) <---
# numero di voli con any weather delay / numero di voli con ritardo

## versione 2: (versione "pesata")
# per ogni volo, se ha ritardo, fai weather delay / ritardo volo
# fai media

## versione 3: (aggregata)
# totale weather delay / totale ritardo

### Delay reduced per distance group

In [24]:
# the percentage of flights belonging to a given "distance group"
# that were able to halve their departure delays by the time they 
# arrived at their destinations. 

# Distance groups assort flights by their total distance in miles.
# Flights with distances that are less than 200 miles belong in group 1,
# flights with distances that are between 200 and 399 miles belong 
# in group 2, flights with distances that are between 400 and 599 miles
# belong in group 3, and so on. The last group contains flights whose
# distances are between 2400 and 2599 miles.

In [22]:
# get RDD
raw_data = df.rdd

# take only flights with a departure delay
late_departures = raw_data.filter(lambda r: r['DepDelay'] is not None and r['DepDelay'] > 0)

# add distance group as key
def distance_group(row):
    distance = row['Distance']
    return (distance // 200) + 1
    
distance_groups = late_departures.map(lambda r: (distance_group(r), r))

# check for each flight if it managed to halve its departure delay
# also map to 1 to count flights
def was_delay_halved(row):
    departure_delay = row['DepDelay']
    arrival_delay = row['ArrDelay']
    if arrival_delay is None:
        # missing data
        return False
    gain = departure_delay - arrival_delay
    return gain >= departure_delay / 2

halved_delays = distance_groups.map(lambda t: (t[0], (1, was_delay_halved(t[1]))))

# count number of delayed flights and number of flights with halved delay
totals = halved_delays.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# compute percentage per distance group
halving_percentages = totals.map(lambda t: (t[0], 100 * t[1][1] / t[1][0]))

In [23]:
# execute query
starttime = time()
res = halving_percentages.collect()
endtime = time() - starttime
print('Time taken: {:.5} seconds'.format(endtime))

# sort by distance group
res = sorted(res, key=lambda x: x[0])

Time taken: 52.432 seconds


TypeError: 'int' object is not subscriptable

In [31]:
# results
for t in res:
    print("{:4} < d < {:4} miles --> {:.2f}%".format((t[0]-1) * 200, t[0] * 200, t[1]))

   0 < d <  200 miles --> 24.87%
 200 < d <  400 miles --> 25.79%
 400 < d <  600 miles --> 27.23%
 600 < d <  800 miles --> 29.64%
 800 < d < 1000 miles --> 33.53%
1000 < d < 1200 miles --> 34.90%
1200 < d < 1400 miles --> 37.50%
1400 < d < 1600 miles --> 37.94%
1600 < d < 1800 miles --> 39.88%
1800 < d < 2000 miles --> 40.51%
2000 < d < 2200 miles --> 42.10%
2200 < d < 2400 miles --> 44.19%
2400 < d < 2600 miles --> 42.82%
2600 < d < 2800 miles --> 45.51%
2800 < d < 3000 miles --> 44.89%
3000 < d < 3200 miles --> 30.00%
3200 < d < 3400 miles --> 47.90%
3400 < d < 3600 miles --> 44.17%
3600 < d < 3800 miles --> 44.89%
3800 < d < 4000 miles --> 33.92%
4000 < d < 4200 miles --> 43.88%
4200 < d < 4400 miles --> 41.72%
4400 < d < 4600 miles --> 39.44%
4800 < d < 5000 miles --> 37.98%


In [None]:
# TODO visualization and plots

### Penalty scores for each airport

In [25]:
# a weekly "penalty" score for each airport that depends on both 
# its incoming and outgoing flights. 

# The score adds 0.5 for each incoming flight that is more than 15 minutes
# late, and 1 for each outgoing flight that is more than 15 minutes late

### Our group's data analysis

In [26]:
# an additional data analysis defined by your group

# IDEE
# qualcosa con gli altri delays? security, nas, carrier
# qualche confronto tra carrier
# 

In [27]:
# prima fare con un solo file come debug, poi mettere tutti