In [1]:
import pyspark
from datetime import datetime
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf,lit
from collections import namedtuple

sc = pyspark.SparkContext.getOrCreate()

# Queries:
1. the percentage of canceled flights per day, throughout the entire data set
2. weekly percentages of delays that are due to weather, throughout the entire data set 
3. the percentage of flights belonging to a given "distance group" that were able to halve their departure delays by the time they arrived at their destinations. Distance groups assort flights by their total distance in miles. Flights with distances that are less than 200 miles belong in group 1, flights with distances that are between 200 and 399 miles belong in group 2, flights with distances that are between 400 and 599 miles belong in group 3, and so on. The last group contains flights whose distances are between 2400 and 2599 miles.
4. a weekly "penalty" score for each airport that depends on both the its incoming and outgoing flights. The score adds 0.5 for each incoming flight that is more than 15 minutes late, and 1 for each outgoing flight that is more than 15 minutes late.


# 1) "The percentage of canceled flights per day, throughout the entire data set"


In [2]:
def getPercentage(year):
    # load a single file
    ds = sc.textFile('../BDdata/'+year+'.csv')
    # split the csv rows
    dsplitted = ds.map(lambda line : line.split(","))
    # remove the head
    head = dsplitted.take(1)[0]
    dsfiltered = dsplitted.filter(lambda x : x != head)
    # map rows -> (('Year', 'Month', 'DayofMonth'), 'Cancelled')
    dmapped = dsfiltered.map(lambda x : (tuple([int(el) for el in x[0:3]]),int(x[21])))
    
    # count the number of cancelled flights for each day
    date_cancelled = dmapped.reduceByKey(lambda a, b : a+b)
    # count the total number of flights for each day
    total_per_date = dmapped.map(lambda x: (x[0],1)).reduceByKey(lambda a, b : a+b)
    # join the two datasets on the date and map them to (('Year', 'Month', 'DayofMonth'), percentage of cancelled flights)
    results=date_cancelled.join(total_per_date).map(lambda x : (x[0], x[1][0]/x[1][1]))
    return results


In [3]:
res1 = []
for i in range(1994,2009):
    res1 = res1+getPercentage(str(i)).collect()
res1[:10]

[((1994, 1, 5), 0.047656139357031585),
 ((1994, 2, 1), 0.006602768903088392),
 ((1994, 2, 2), 0.005858685677984047),
 ((1994, 2, 3), 0.007247906551263106),
 ((1994, 5, 18), 0.0035822540644805732),
 ((1994, 5, 19), 0.002082176568573015),
 ((1994, 5, 21), 0.003274942878903275),
 ((1994, 5, 16), 0.005455047645352852),
 ((1994, 6, 25), 0.009863996413092213),
 ((1994, 6, 26), 0.003399638336347197)]

# 2) "Weekly percentages of delays that are due to weather, throughout the entire data set"

In [4]:
def getWeeklyWeatherPercentageFixed(start, end):
    # load the first year's csv
    ds = sc.textFile('../BDdata/'+str(start)+'.csv')
    head = ds.take(1)[0]
    ds = ds.filter(lambda x : x != head) # remove the header of the file
    for i in range(start+1,end+1):
        # merge with all the following years, removing the header for each of them
        ds = ds.union(sc.textFile('../BDdata/'+str(i)+'.csv').filter(lambda x : x != head))
    # split the csv rows
    ds = ds.map(lambda line : line.split(","))
    d1 = (ds
        # x[14] -> ArrDelay ; check if not NA and if >0 
        # x[25] -> WeatherDelay ; check if not NA
        .filter(lambda x : x[14] != 'NA' and int(x[14])>0 and x[25]!='NA') 
        # map the row to (('Year', 'Month', 'DayofMonth'), ('ArrDelay', 'WeatherDelay'))
        .map(lambda x : (tuple(x[0:3]), (int(x[14]),int(x[25]))))
        # reduce by key summing up ArrDelay and WeatherDelay
        .reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1]))
        # map to (('Year', 'Week'), ('ArrDelay', 'WeatherDelay'))
        .map(lambda x : (tuple(datetime(*[int(el) for el in x[0]]).isocalendar()[0:2]),x[1])))
        
        # reduce by year and week summing up ArrDelay and WeatherDelay
    d2 = (d1.reduceByKey(lambda x, y : (x[0]+y[0],x[1]+y[1]))
        # map to (('Year', 'Week'), percentage of delays due to weather)
            .map(lambda x : (x[0][0],x[0][1],x[1][1]/x[1][0])))
    
    return d2

res2= getWeeklyWeatherPercentageFixed(1994,2008).collect()

In [5]:
res2[:10]

[(2004, 7, 0.05757752477390909),
 (2008, 38, 0.029586498837440335),
 (2005, 49, 0.07510440149303374),
 (2007, 37, 0.06574637723331199),
 (2007, 4, 0.043133930079378484),
 (2006, 37, 0.0645626368329082),
 (2004, 36, 0.05358771727918297),
 (2004, 5, 0.09106793076338827),
 (2003, 38, 0.036452499395229015),
 (2007, 43, 0.03725776364000761)]

# 3) "The percentage of flights belonging to a given "distance group" that were able to halve their departure delays by the time they arrived at their destinations. 
Distance groups assort flights by their total distance in miles. Flights with distances that are less than 200 miles belong in group 1, flights with distances that are between 200 and 399 miles belong in group 2, flights with distances that are between 400 and 599 miles belong in group 3, and so on. The last group contains flights whose distances are between 2400 and 2599 miles."

In [6]:
def getDistanceGroup(start,end):
    # load the first csv file
    ds = sc.textFile('../BDdata/'+str(start)+'.csv')
    head = ds.take(1)[0]
    # remove the header of the csv file
    ds = ds.filter(lambda x : x != head)
    # load all the following years, merging them all together
    for i in range(start+1,end+1):
        ds = ds.union(sc.textFile('../BDdata/'+str(i)+'.csv'))
        head = ds.take(1)[0]
        ds = ds.filter(lambda x : x != head)
    # split the csv rows
    dsplitted = ds.map(lambda line : line.split(","))
    d1 = (dsplitted
        # x[21] -> 'Cancelled' ; 0 if the flight was not cancelled
        # x[14] -> 'ArrDelay' ; must not be NA
        # x[15] -> 'DepDelay' ; must not be NA
        # x[18] -> 'Distance' ; must not be NA
        .filter(lambda x : x[21] == '0' and x[14] != 'NA' and x[15] != 'NA' and x[18] != 'NA')
        # map the rows to ('DepDelay', 'ArrDelay', 'Distance')
        .map(lambda x : (x[15], x[14], x[18])))
    # map to (DistangeGroup, (DepDelay,ArrDelay))
    d2 = d1.map(lambda x : (int(x[2])//200+1,(int(x[0]),int(x[1]))))
    # get only the flights that managed to halve their departure delays by the time they arrived at their destinations.
    d3 = (d2.filter(lambda x : x[1][0]>0 and x[1][0]>=2*x[1][1])
        # sum up the remaining flights per DistanceGroup
        .map(lambda x: (x[0],1))
        .reduceByKey(lambda x, y : x+y))
        
    # sum up all the flights per DistanceGroup
    d4 = (d2.map(lambda x : (x[0],1))
          .reduceByKey(lambda x, y : x+y))

    # join the two dataframes on the distance group obtaining (DistanceGroup, percentage requested)
    d5 = d4.join(d3).map(lambda x : (x[0], x[1][1]/x[1][0]))
    return d5
    
res3 = getDistanceGroup(1994,2008).collect()

In [7]:
sorted(res3)

[(1, 0.09125316163080364),
 (2, 0.10946089820757102),
 (3, 0.12638140216395788),
 (4, 0.1408275431352104),
 (5, 0.15142853155608366),
 (6, 0.15797740018682774),
 (7, 0.16858827528427442),
 (8, 0.17525566944085058),
 (9, 0.18937652171886157),
 (10, 0.21065770321207053),
 (11, 0.21405063168968547),
 (12, 0.2022257070387234),
 (13, 0.18882836905997447),
 (14, 0.18276265730172667),
 (15, 0.193161539010636),
 (16, 0.19127346524606798),
 (17, 0.16409549254503145),
 (18, 0.25801677355698077),
 (19, 0.1758383490971625),
 (20, 0.16885266777027888),
 (21, 0.17226231565735803),
 (22, 0.1784207353827607),
 (23, 0.23058218702237043),
 (25, 0.1846497568668682)]

# 4) A weekly "penalty" score for each airport that depends on both the its incoming and outgoing flights.
The score adds 0.5 for each incoming flight that is more than 15 minutes late, and 1 for each outgoing flight that is more than 15 minutes late.

In [8]:
def getWeeklyPenaltyFixed(start, end):
    ds = sc.textFile('../BDdata/'+str(start)+'.csv')
    head = ds.take(1)[0]
    ds = ds.filter(lambda x : x != head)
    for i in range(start+1,end+1):
        ds = ds.union(sc.textFile('../BDdata/'+str(i)+'.csv').filter(lambda x : x != head))
    ds = ds.map(lambda line : line.split(","))
    # head.index("Cancelled") # -> 21
    # head.index("DepDelay") # -> 15
    # head.index("ArrDelay") # -> 14 ; not NA and >15 minutes
    # 16 orig, 17 dest
    # head.index("Distance") # -> 18
    d1 = ds.filter(lambda x : x[14] != "NA" and x[15] != "NA" and (int(x[14])>15 or int(x[15])>15))
    # map each row to ((year, week, orig), 1), ((year, week, dest), 0,5)
    d2 = d1.flatMap(lambda x : [((*datetime(*[int(el) for el in x[0:3]]).isocalendar()[0:2], x[16]),1 if int(x[15])>15 else 0),
                                ((*datetime(*[int(el) for el in x[0:3]]).isocalendar()[0:2], x[17]),0.5 if int(x[14])>15 else 0)])
    # sum up the penalty score for each (year, week, airport)
    d3 = d2.reduceByKey(lambda x, y : x+y)
    return d3.collect()

res4 = getWeeklyPenaltyFixed(1994,2008)

In [9]:
sorted(res4)[:10]

[((1993, 52, 'ABE'), 6.0),
 ((1993, 52, 'ABQ'), 17.0),
 ((1993, 52, 'ACY'), 0),
 ((1993, 52, 'AGS'), 5.5),
 ((1993, 52, 'ALB'), 12.0),
 ((1993, 52, 'AMA'), 4.5),
 ((1993, 52, 'ANC'), 17.5),
 ((1993, 52, 'ATL'), 377.5),
 ((1993, 52, 'AUS'), 24.5),
 ((1993, 52, 'AVL'), 2)]

In [10]:
res1

[((1994, 1, 5), 0.047656139357031585),
 ((1994, 2, 1), 0.006602768903088392),
 ((1994, 2, 2), 0.005858685677984047),
 ((1994, 2, 3), 0.007247906551263106),
 ((1994, 5, 18), 0.0035822540644805732),
 ((1994, 5, 19), 0.002082176568573015),
 ((1994, 5, 21), 0.003274942878903275),
 ((1994, 5, 16), 0.005455047645352852),
 ((1994, 6, 25), 0.009863996413092213),
 ((1994, 6, 26), 0.003399638336347197),
 ((1994, 6, 27), 0.0069387194284144),
 ((1994, 6, 28), 0.003035738926452325),
 ((1994, 7, 12), 0.008371333287960253),
 ((1994, 7, 13), 0.007550506768247058),
 ((1994, 7, 14), 0.017633442265795208),
 ((1994, 8, 8), 0.0029016802753222214),
 ((1994, 8, 9), 0.0019498419955624286),
 ((1994, 8, 11), 0.003987025273685633),
 ((1994, 8, 14), 0.014527673446247608),
 ((1994, 11, 23), 0.0013551053594416966),
 ((1994, 12, 14), 0.009224716814759547),
 ((1994, 2, 4), 0.004666949512091642),
 ((1994, 2, 9), 0.14133333333333334),
 ((1994, 2, 10), 0.08757021460463776),
 ((1994, 2, 11), 0.2562140148855498),
 ((1994,

In [11]:
res2

[(2004, 7, 0.05757752477390909),
 (2008, 38, 0.029586498837440335),
 (2005, 49, 0.07510440149303374),
 (2007, 37, 0.06574637723331199),
 (2007, 4, 0.043133930079378484),
 (2006, 37, 0.0645626368329082),
 (2004, 36, 0.05358771727918297),
 (2004, 5, 0.09106793076338827),
 (2003, 38, 0.036452499395229015),
 (2007, 43, 0.03725776364000761),
 (2008, 3, 0.06113343084072353),
 (2006, 39, 0.035075886369932835),
 (2006, 26, 0.051833493318907145),
 (2004, 26, 0.08600485630390489),
 (2005, 16, 0.05604493024466552),
 (2003, 36, 0.03531433928850461),
 (2008, 1, 0.04869675058097636),
 (2008, 36, 0.02651448719493318),
 (2006, 4, 0.04044075121956593),
 (2005, 51, 0.03713580378832005),
 (2007, 10, 0.028782405858441766),
 (2005, 18, 0.04442350982424638),
 (2008, 30, 0.061128877180501306),
 (2008, 34, 0.036362854925279554),
 (2007, 41, 0.039163286708868754),
 (2007, 8, 0.07743945900562352),
 (2004, 24, 0.08337911786036085),
 (2006, 33, 0.043553109201347244),
 (2003, 34, 0.060240938882279056),
 (2007, 47,

In [12]:
res3

[(1, 0.09125316163080364),
 (2, 0.10946089820757102),
 (3, 0.12638140216395788),
 (4, 0.1408275431352104),
 (5, 0.15142853155608366),
 (6, 0.15797740018682774),
 (7, 0.16858827528427442),
 (8, 0.17525566944085058),
 (9, 0.18937652171886157),
 (10, 0.21065770321207053),
 (11, 0.21405063168968547),
 (12, 0.2022257070387234),
 (13, 0.18882836905997447),
 (14, 0.18276265730172667),
 (15, 0.193161539010636),
 (16, 0.19127346524606798),
 (17, 0.16409549254503145),
 (18, 0.25801677355698077),
 (19, 0.1758383490971625),
 (20, 0.16885266777027888),
 (21, 0.17226231565735803),
 (22, 0.1784207353827607),
 (23, 0.23058218702237043),
 (25, 0.1846497568668682)]

In [13]:
res4

[((1994, 1, 'IND'), 244.0),
 ((1994, 2, 'DAL'), 145.0),
 ((1994, 1, 'MAF'), 1.0),
 ((1993, 52, 'KOA'), 1.5),
 ((1994, 1, 'JAN'), 16.5),
 ((1994, 7, 'LEX'), 9.0),
 ((1994, 6, 'DSM'), 51.0),
 ((1994, 12, 'YAK'), 0),
 ((1994, 15, 'DTW'), 468.0),
 ((1994, 16, 'HPN'), 3.0),
 ((1994, 14, 'DAL'), 116.5),
 ((1994, 19, 'SFO'), 363.5),
 ((1994, 18, 'HRL'), 14.0),
 ((1994, 19, 'DSM'), 14.0),
 ((1994, 19, 'BIL'), 7.0),
 ((1994, 19, 'GNV'), 1.5),
 ((1994, 23, 'CAE'), 20.5),
 ((1994, 25, 'LBB'), 26.5),
 ((1994, 29, 'STL'), 578.0),
 ((1994, 30, 'OTZ'), 0.5),
 ((1994, 31, 'DLH'), 1.5),
 ((1994, 34, 'PIA'), 0.5),
 ((1994, 37, 'PHX'), 274.0),
 ((1994, 39, 'DTW'), 291.0),
 ((1994, 39, 'HPN'), 12.0),
 ((1994, 36, 'TRI'), 6.5),
 ((1994, 39, 'DLH'), 2.5),
 ((1994, 36, 'SBA'), 2.5),
 ((1994, 38, 'RAP'), 1.0),
 ((1994, 40, 'AUS'), 77.0),
 ((1994, 43, 'OAJ'), 0.5),
 ((1994, 40, 'SJC'), 81.5),
 ((1994, 42, 'SHV'), 25.5),
 ((1994, 43, 'GNV'), 2.0),
 ((1994, 45, 'BOS'), 296.0),
 ((1994, 45, 'FAY'), 8.5),
 ((1994,