We will analyze a dataset containing information about flights in the USA that stems from the [Bureau of Transportation Statistics](https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time). In the zip file provided, you will find a directory *airline_data* containing the files 2016_1.csv, ..., 2016_6.csv. Each line of a file (except the headers) contains the flight date, the airline ID, the flight number, the origin airport, the destination airport, the departure time, the departure delay in minutes, the arrival time, the arrival delay in minutes, the time in air in minutes, and the distance between both airports in miles. 

Preparation (if not done already):

1. Copy all the csv files to your virtual machine (e.g., via scp). Alternatively, you can also log in to the virtual machine and download the zip file directy from Absalon.
2. Create a directory 'airline_data' on your local Hadoop cluster. Afterwards, copy the csv files from your virtual machine to the Hadoop cluster via 'hadoop fs -put airline_data/*.csv airline_data/'

In [1]:
# generate an RDD based on all the csv files given in the airline_data directory.
# MAKE SURE THAT ONLY THE 2016_*.csv FILES ARE GIVEN IN THE DIRECTORY
airline_data = sc.textFile ("hdfs:///user/lsda/airline_data/*.csv")

In [2]:
# each csv file contains a header describing the data
header = airline_data.first()
print("Header information given in the first csv file:\n\n{}".format(header))

Header information given in the first csv file:

"FL_DATE","AIRLINE_ID","FL_NUM","ORIGIN","DEST","DEP_TIME","DEP_DELAY","ARR_TIME","ARR_DELAY","AIR_TIME","DISTANCE",


In [3]:
# filter the RDD to remove this header information (each csv file 
# contains such a line)
airline_data = airline_data.filter(lambda line: line != header)

# get the first 10 elements and print them
print("First 10 elements of the RDD:")
airline_data.take(10)

First 10 elements of the RDD:


['2016-01-01,19790,"1248","DTW","LAX","1935",0.00,"2120",-24.00,249.00,1979.00,',
 '2016-01-01,19790,"1251","ATL","GRR","2130",5.00,"2319",-2.00,92.00,640.00,',
 '2016-01-01,19790,"1254","LAX","ATL","2256",1.00,"0547",-13.00,207.00,1947.00,',
 '2016-01-01,19790,"1255","SLC","ATL","1700",4.00,"2213",-16.00,173.00,1590.00,',
 '2016-01-01,19790,"1256","BZN","MSP","1012",72.00,"1420",124.00,121.00,874.00,',
 '2016-01-01,19790,"1257","ATL","BNA","1356",83.00,"1402",83.00,38.00,214.00,',
 '2016-01-01,19790,"1257","BNA","ATL","1446",86.00,"1644",74.00,37.00,214.00,',
 '2016-01-01,19790,"1258","ATL","JAX","0946",1.00,"1053",3.00,45.00,270.00,',
 '2016-01-01,19790,"1258","JAX","ATL","1144",-1.00,"1247",-15.00,43.00,270.00,',
 '2016-01-01,19790,"1259","ATL","OKC","2107",-3.00,"2224",-12.00,116.00,761.00,']

In [4]:
def parse(line):
    
    line = line.split(',')
    
    try:
    
        airline_id = line[1]
        origin = line[3].strip('\"')
        dest = line[4].strip('\"')
        dep_delay = float(line[6])
        arr_delay = float(line[8])
        distance = float(line[10])
        
        return (airline_id, origin, dest, dep_delay, arr_delay, distance)
    
    except Exception as e:
        
        # in case of an error: simply return 'None'
        return None

In [5]:
# apply the parsing function to each element via the map
# transformation; afterwards, remove all elements that
# could not be parsed properly.
airlines = airline_data.map(parse)
airlines = airlines.filter(lambda line: line is not None)

In [6]:
# let's inspect the first ten elements
airlines.take(10)

[('19790', 'DTW', 'LAX', 0.0, -24.0, 1979.0),
 ('19790', 'ATL', 'GRR', 5.0, -2.0, 640.0),
 ('19790', 'LAX', 'ATL', 1.0, -13.0, 1947.0),
 ('19790', 'SLC', 'ATL', 4.0, -16.0, 1590.0),
 ('19790', 'BZN', 'MSP', 72.0, 124.0, 874.0),
 ('19790', 'ATL', 'BNA', 83.0, 83.0, 214.0),
 ('19790', 'BNA', 'ATL', 86.0, 74.0, 214.0),
 ('19790', 'ATL', 'JAX', 1.0, 3.0, 270.0),
 ('19790', 'JAX', 'ATL', -1.0, -15.0, 270.0),
 ('19790', 'ATL', 'OKC', -3.0, -12.0, 761.0)]

In [7]:
# (a) Shortest Flight Distance
flights = airlines.map(lambda line: (line[0], line[5]))
results_dist = flights.reduceByKey(lambda a, b: min(a,b))
print("ID\tDistance")
for result in results_dist.collect():
    print(result)

ID	Distance
('20366', 69.0)
('20409', 68.0)
('19393', 137.0)
('20304', 30.0)
('19690', 84.0)
('20416', 177.0)
('19790', 94.0)
('19805', 83.0)
('19930', 31.0)
('21171', 236.0)
('20436', 332.0)
('19977', 108.0)


In [8]:
# (b) Late Arrival Counts
delayed_flights = airlines.filter(lambda line: line[4] > 0)
delayed = delayed_flights.map(lambda line: (line[0],1))
results_delayed = delayed.reduceByKey(lambda a,b: a+b)

flights = airlines.map(lambda line: (line[0],1))
results = flights.reduceByKey(lambda a,b: a+b)

results_combined = results_delayed.join(results)
results = results_combined.map(lambda line: (line[0],(line[1][0],line[1][1],line[1][0]/line[1][1]*100))).collect()

print("ID\tDelayed\tTotal\tPercentage")
for result in results:
    print(result)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 39420)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/srv/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/srv/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most r

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35833)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 721, in __init__
    self.handle()
  File "/srv/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/srv/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/srv/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/srv/spark/python/pyspark/serializers.py", line 717, in read_int
    raise EOFErro

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:35833)

In [None]:
# (c) Mean and Standard Deviation for Arrival Delays
import math
means = airlines.map(lambda line: (line[0],(line[4],1)))
means = means.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).mapValues(lambda x: x[0]/x[1])

# calculate variance / get standard deviation from it
deviation = airlines.map(lambda line: (line[0],line[4]))
deviation = deviation.mapValues(lambda x: (1, x, x*x)).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
deviation = deviation.mapValues(lambda x: (math.sqrt(x[2]/x[0] - (x[1]/x[0])**2)))

results = means.join(deviation).collect()

print("ID\tMean\tDeviation")
for result in results:
    print(result)

In [None]:
# (d) Top-10 of Arrival Delays

sorted = airlines.map(lambda line: ((line[0],line[4]))).groupByKey() #.mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])) #.mapValues(lambda vs: sorted(vs))
print(sorted.take(2))