# Learning to use pyspark

_2020-06-15_

* Jose RazGuzman

---

# Setup

In [1]:
# Init pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Init sparksql -- Used to format the output nicely!
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

rows = sc.textFile("/air_transit_2007.csv")
data = rows.map(lambda line: line.split(","))
# data.cache()

from operator import add
header = data.first()

## Q1
Compute the total number of records.

In [3]:
# First line filters out the header
# Then we use the count function to obtain the number of total records
q1 = data.filter(lambda line: line != header)\
          .count()
print("The total number of records is:") 
print(q1)

The total number of records is:
7453215


## Q2
Find total number of operated flights per month, sorted by the month.

In [4]:
# First line filters out the header
# Then we map out a count of one for every month, aggregate the count of every month 
# Sort by the aggregate count by the key which is the month, and collect.
q2 = data.filter(lambda line: line != header)\
         .map(lambda x:(int(x[1]), 1))\
         .reduceByKey(add)\
         .sortByKey()\
         .collect()
       
sqlContext.createDataFrame(q2, ['Month','Total number of operated flights']).show()

+-----+--------------------------------+
|Month|Total number of operated flights|
+-----+--------------------------------+
|    1|                          621559|
|    2|                          565604|
|    3|                          639209|
|    4|                          614648|
|    5|                          631609|
|    6|                          629280|
|    7|                          648560|
|    8|                          653279|
|    9|                          600187|
|   10|                          629992|
|   11|                          605149|
|   12|                          614139|
+-----+--------------------------------+



## Q3
Find the plane with the highest number of flights. Each plane has a unique TailNum.

In [6]:
# First line filters out the header and Tail number '0' and '000000', under the assumption these are missing values
# Then we map out a value of one for every single tail number, aggregate all tuples to get the count of tail numbers
# Sort by the aggregate count which is the number of flights and collect.
q3 = data.filter(lambda line:(line != header and line[10] !='0' and line[10] !='000000'))\
         .map(lambda x:(x[10], 1))\
         .reduceByKey(add)\
         .sortBy(lambda x: x[1], ascending=False)\
         .take(1)

In [7]:
sqlContext.createDataFrame(q3, ['Tail Number','Number of flights']).show(n=1)

+-----------+-----------------+
|Tail Number|Number of flights|
+-----------+-----------------+
|     N655BR|             4483|
+-----------+-----------------+



## Q4
Compute the total flight time of each airplane, sorted by flight time in descending order.

In [8]:
# First line filters out the header and NA values
# Then we map tail number and the air time of each flight
# Sort by the aggregate count which is the number of flights and collect
# Reduce by adding up all the air time to get total flight time of each airplane and collect
q4 = data.filter(lambda line:( line != header and line[13] !='NA'))\
         .map(lambda x:( x[10], int(x[13])) )\
         .reduceByKey(add)\
         .sortBy(lambda x: x[1], ascending=False)\
         .collect()

In [23]:
sqlContext.createDataFrame(q4,['Tail Number','Total flight time']).show(n=100)

+-----------+-----------------+
|Tail Number|Total flight time|
+-----------+-----------------+
|     N556AS|           532213|
|     N557UA|           259376|
|     N597UA|           254760|
|     N636JB|           254357|
|     N637JB|           253562|
|     N590NW|           253079|
|     N607JB|           252862|
|     N590UA|           252847|
|     N505UA|           252382|
|     N554UA|           252378|
|     N558AS|           251992|
|     N212UA|           251816|
|     N598UA|           250612|
|     N624JB|           250327|
|     N646JB|           249865|
|     N625JB|           249089|
|     N543UA|           248784|
|     N599JB|           248747|
|     N565AS|           248648|
|     N640JB|           248571|
|     N618JB|           248465|
|     N666UA|           248352|
|     N595UA|           248135|
|     N639JB|           247980|
|     N649JB|           247807|
|     N645JB|           247465|
|     N523JB|           246640|
|     N633JB|           246161|
|     N5

## Q5
Find the busiest airport (in terms of number of departures + arrivals of all operated flights) for each month

In [2]:
# We begin by combining the departure + arrivals of all operated flight using parallelize
# First line filters out the header
# Then we map the month with the airport terminal and add a counter of one 
# We reduce by key to get the total number of flights for each month of arrival
# Repeat the last three steps for departures and add to parallelize
# We reduce with addition with the key again, this step adds the arrivals + departures
# We map to separate the string key, and we map again to separate month and terminal
q5 = sc.parallelize(\
    (data.filter(lambda line:( line != header))\
         .map(lambda x: (x[1]+'-'+x[16], 1))\
         .reduceByKey(add)\
         .collect())+\
    (data.filter(lambda line:( line != header))\
         .map(lambda x: (x[1]+'-'+x[17], 1))\
         .reduceByKey(add)\
         .collect()))\
    .reduceByKey(add) \
    .map(lambda x: (x[0].split('-'), x[1]))\
    .map(lambda x: (x[0][0], x[0][1], x[1]))

## Method 1

In [10]:
# We define two following functions for getting the max value

# seq_op is a Sequential Operation, which aggregates all values of a single partition
def seq_op(accumulator, element):
    if(accumulator[1] > element[1]):
        return accumulator 
    else: 
        return element

# comb_op is a Combiner Operation, which aggregates all aggregated values from different partitions
# In this case: Finding Maximum Marks 
def comb_op(accumulator1, accumulator2):
    if(accumulator1[1] > accumulator2[1]):
        return accumulator1 
    else:
        return accumulator2

In [11]:
# We generate a counter with Zero value, because we are finding Maximum Marks
# We map out our key as the month, and airport plus number of flights as a tuple
# We aggregate by key using our Sequential operation and Combiner Operation
# Map out again to separate the tuple and collect
zero_val = ('', 0)
q5_max = q5.map(lambda x: (int(x[0]), (x[1], x[2])))\
           .aggregateByKey(zero_val, seq_op, comb_op)\
           .map(lambda x: (x[0], x[1][0], x[1][1]))\
           .collect()

In [12]:
sqlContext.createDataFrame(q5_max, ['Month', 'Busiest Airport', 'Number of flights']).show()

+-----+---------------+-----------------+
|Month|Busiest Airport|Number of flights|
+-----+---------------+-----------------+
|    1|            ATL|            64597|
|    2|            ATL|            59558|
|    3|            ATL|            68073|
|    4|            ATL|            65773|
|    5|            ATL|            68350|
|    6|            ATL|            71968|
|    7|            ATL|            73969|
|    8|            ATL|            74569|
|    9|            ATL|            68529|
|   10|            ATL|            73883|
|   11|            ATL|            69358|
|   12|            ATL|            69029|
+-----+---------------+-----------------+



## Method 2

In [13]:
max_month = q5.map(lambda x: (int(x[0]), x))\
              .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))\
              .sortByKey()\
              .values()

In [14]:
sqlContext.createDataFrame(max_month, ['Month','Airport', 'Total_Flights']).show()

+-----+-------+-------------+
|Month|Airport|Total_Flights|
+-----+-------+-------------+
|    1|    ATL|        64597|
|    2|    ATL|        59558|
|    3|    ATL|        68073|
|    4|    ATL|        65773|
|    5|    ATL|        68350|
|    6|    ATL|        71968|
|    7|    ATL|        73969|
|    8|    ATL|        74569|
|    9|    ATL|        68529|
|   10|    ATL|        73883|
|   11|    ATL|        69358|
|   12|    ATL|        69029|
+-----+-------+-------------+



## Q6
Find the airline with highest average delay of each type in March 2007. Note: do not write separate code for each error type. You should compute a single RDD where each row contains the delay type, the airline that is worst regarding that delay type, and its average delay of that type in minutes.

In [20]:
# Sequential Operation, which aggregates all values of a single partition
def seq_op_2(accumulator, element):
    return accumulator[0] + element, accumulator[1] + 1

# Combiner Operation, which aggregates all aggregated values from different partitions
# In this case: Finding total sum of marks and the number of marks to divide later 
def comb_op_2(accumulator1, accumulator2):
    return accumulator1[0] + accumulator2[0], accumulator1[1] + accumulator2[1]

In [21]:
# We create a counter that has zero with zero values, generate a list with the types of delays, and create an empty list
# First line filters out the header and flights that happened in March
# We map out the type of delays to the unique carrier code and keep the value of they minute for the type of delay 
# We aggregate by key using our Sequential operation and Combiner Operation defined above
# Map out again to to get the average from the sum of marks and the number of marks for the type of delay
# Sort the results to have the worst delay type at the top and we take the first value
# We repeat to obtain the worst delay carrier for each delay type
counter = (0,0)
delays = ['Carrier', 'Weather','NAS','Security', 'Late Aircraft']
L=[]
for i in range(5):
    q = data.filter(lambda line:( line != header and line[1]=='3' ))\
         .map(lambda x:( delays[i]+' - '+x[8], float(x[24+i]) ))\
         .aggregateByKey(counter, seq_op_2, comb_op_2)\
         .mapValues(lambda x: x[0]/x[1])\
         .sortBy(lambda x: x[1], ascending=False)\
         .take(1)
    L.extend(q)

In [22]:
sqlContext.createDataFrame(L, ['Delay Type - Worst Carrier', 'Avg Delay in Minutes']).show(5)

+--------------------------+--------------------+
|Delay Type - Worst Carrier|Avg Delay in Minutes|
+--------------------------+--------------------+
|              Carrier - EV|   9.288187668673448|
|              Weather - OH|  2.9110093084458306|
|                  NAS - CO|   6.944122837980939|
|             Security - AS| 0.11413628672511922|
|        Late Aircraft - B6|   9.577584656877434|
+--------------------------+--------------------+



## Q9
Assume that a passenger wants to travel from Philadelphia International Airport(airport code: PHL) to Los Angeles International Airport(airport code: LAX), and then go back to Philadelphia(PHL). 

He departs PHL not earlier than 5:59am(scheduled time), stays at least 3:01 hours in Los Angeles and then arrive at PHL not later than 11pm. 

Based on the "scheduled" times, find which carrier has the highest number of flights with these constraints. Limit your analysis to February 2007 and use scheduled times.

In [24]:
# First line filters out the header and Febraury values
# For the filtering section we assume that the flight time between both cities is 5 hours
# Then we filter flights that go from PHL to LAX between 559am and 1000, otherwise he/she cannot get back in time
# Then we filter flights that go from LAX to PHL between 1100am (LAX time) and 1500 (LAX time)
# Map out the airlines that satisfy the time constraints and keep a counter of each available flight
# Reduce by adding up all the air time to get total number of flights
# Sort the values to have the highest number of flights by carrier at the top and collect the RDD.
q9 = data.filter(lambda line:(line != header and line[1]=='2' 
                                             and ((line[16]=='PHL' and line[17]=='LAX' and 559 <= int(line[5]) <= 1000)
                                             or (line[16]=='LAX' and line[17]=='PHL' and 1100 <= int(line[5]) <= 1500)))
                )\
         .map(lambda x: (x[8], 1))\
         .reduceByKey(add)\
         .sortBy(lambda x: x[1], ascending=False)\
         .collect()
sqlContext.createDataFrame(q9, ['Carrier', 'Number of flights with travel constraints']).show()

+-------+-----------------------------------------+
|Carrier|Number of flights with travel constraints|
+-------+-----------------------------------------+
|     US|                                       76|
|     WN|                                       56|
|     UA|                                       56|
+-------+-----------------------------------------+



In [25]:
print("The carrier which has the highest number of flights for the given constraints is US with 76 available flights.")

The carrier which has the highest number of flights for the given constraints is US with 76 available flights.


## Q10
Generate the `departure flights` board of the Los Angeles Airport at 12 Jan 2007 at 13:00. 

The board should contain flights with actual departure times between 12:00 and 14:00, sorted by scheduled departure time. 

The resulting table should at least contain scheduled departure time, actual departure time (if departed), airline code, and 
destination.

In [26]:
# First line filters out the header and January values plus all the flight from the twelve day of the month
# Second line continues the filter to only select flight from the Los Angeles Aiport and within the time span of 1200 and 1400
# Third line maps requested values, it contains scheduled departure time, actual departure time, airline code, and destination
# Fourth line maps two if statement to show which flights have actually departed
# If the flight has already passed 1300, it shows actual departure time, if there are 30min to the flight it says 'Boarding' 
# and if there are more than 30min before the flight it prints out 'On Time'
# The query is sorted by scheduled departure time and collected.
q10 = data.filter(lambda line:(line != header and line[1]=='1' and line[2]=='12'
                                              and line[16]=='LAX' and 1200 <= int(line[5]) <= 1400))\
          .map(lambda x:(x[5],x[4],x[8],x[17]))\
          .map(lambda x:(x[0],x[1],x[2],x[3]) 
               if int(x[1])<1300
               else ( (x[0],'Boarding',x[2],x[3]) 
                    if int(x[1])<=1330
                    else (x[0],'On Time',x[2],x[3]))) \
          .sortBy(lambda x: x[0])\
          .collect()

In [27]:
sqlContext.createDataFrame(q10, ['Scheduled Departure Time', 'Actual Departure Time', 'Airline', 'Destination']).show(n=100)

+------------------------+---------------------+-------+-----------+
|Scheduled Departure Time|Actual Departure Time|Airline|Destination|
+------------------------+---------------------+-------+-----------+
|                    1200|                 1256|     WN|        OAK|
|                    1200|                 1159|     AA|        JFK|
|                    1200|                 1213|     CO|        IAH|
|                    1205|                 1206|     AA|        MIA|
|                    1211|                 1254|     NW|        MSP|
|                    1214|                 1217|     OO|        SAN|
|                    1215|                 1213|     WN|        SJC|
|                    1215|                 1217|     DL|        ATL|
|                    1220|                 1216|     FL|        ATL|
|                    1220|                 1215|     MQ|        MRY|
|                    1224|                 1224|     OO|        SJC|
|                    1224|        