In [7]:
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.functions import year as pyspark_yr
from pyspark.sql.types import *
from pyspark.sql.window import *
from operator import itemgetter

In [8]:
spark = SparkSession.builder.appName("top_3").getOrCreate()

In [9]:
# Read files
aircrafts = spark.read.csv("ontimeperformance_aircrafts.csv", inferSchema=True,header=True)
aircrafts.show(3)
aircrafts.groupBy("manufacturer").agg(collect_list("tailnum")).show()
aircrafts.dtypes

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
only showing top 3 rows

+--------------------+---------------------+
|        manufacturer|collect_list(tailnum)|
+--------------------+---------------------+
|      FRIEDEMANN JON|             [N544AA]|
|               HELIO|             [N550AA]|
|    AIRBUS INDUSTRIE| [N102UW, N103US, ...|
|   FREDERICK CHRIS K|             [N509AA]|
|MCDONNELL DOUGLAS...| [N900DE, N901DE, ...|
|             AERONCA|            

[('tailnum', 'string'),
 ('type', 'string'),
 ('manufacturer', 'string'),
 ('issue_date', 'string'),
 ('model', 'string'),
 ('status', 'string'),
 ('aircraft_type', 'string'),
 ('engine_type', 'string'),
 ('year', 'string')]

In [10]:
airlines = spark.read.csv("ontimeperformance_airlines.csv", inferSchema=True,header=True)
print(airlines.count())
airlines.dtypes

545


[('carrier_code', 'string'), ('name', 'string'), ('country', 'string')]

In [11]:
airports = spark.read.csv("ontimeperformance_airports.csv", inferSchema=True, header=True)
airports.groupBy("city").count().show()
airports.dtypes

+--------------------+-----+
|                city|count|
+--------------------+-----+
|          Prattville|    1|
|            Bluffton|    1|
|         Middlefield|    1|
|              Aitkin|    1|
|           Birchwood|    1|
|      Douglas Bisbee|    1|
|           Fairbanks|    1|
|Indianapolis/Gree...|    1|
|               Kiana|    1|
|           Worcester|    1|
|         Santa Paula|    1|
|               Tyler|    1|
|        Belle Plaine|    1|
|          Charleston|    4|
|       Bowling Green|    3|
|          Deer Lodge|    1|
|         Springfield|    8|
|              Corona|    1|
|          Harrisburg|    3|
|             Cordell|    1|
+--------------------+-----+
only showing top 20 rows



[('airport_code', 'string'),
 ('airport_name', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('country', 'string'),
 ('lat', 'double'),
 ('long', 'double')]

In [12]:
flights = spark.read.csv("ontimeperformance_flights_medium.csv", inferSchema=True, header=True)
print(flights.count())
flights.dtypes

[Stage 24:>                                                       (0 + 16) / 16]

3272629


                                                                                

[('flight_id', 'int'),
 ('carrier_code', 'string'),
 ('flight_number', 'int'),
 ('flight_date', 'timestamp'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('tail_number', 'string'),
 ('scheduled_depature_time', 'string'),
 ('scheduled_arrival_time', 'string'),
 ('actual_departure_time', 'string'),
 ('actual_arrival_time', 'string'),
 ('distance', 'int')]

### Filter Nulls

In [13]:
aircrafts = aircrafts.filter((col("model").isNotNull()) & (~isnan(col("model")))).cache()

In [14]:
airlines = airlines.filter((col("carrier_code").isNotNull()) & (~isnan(col("carrier_code"))))
airlines.cache()

DataFrame[carrier_code: string, name: string, country: string]

## Q1

In [15]:
cess_models = aircrafts.filter(aircrafts.manufacturer == "CESSNA").select("tailnum", "model", "manufacturer")
cess_models = cess_models.withColumn("model_name", regexp_extract(cess_models["model"], '\d{3}', 0))
cess_models.cache()
cess_models.show()

+-------+----------+------------+----------+
|tailnum|     model|manufacturer|model_name|
+-------+----------+------------+----------+
| N201AA|       150|      CESSNA|       150|
| N202AA|      421C|      CESSNA|       421|
|   N293|     T337G|      CESSNA|       337|
| N3744D|      182A|      CESSNA|       182|
| N378AA|      172E|      CESSNA|       172|
| N421AA|      421C|      CESSNA|       421|
| N444AA|      182P|      CESSNA|       182|
| N474AA|      172M|      CESSNA|       172|
| N510AA|     T210N|      CESSNA|       210|
| N519AA|       550|      CESSNA|       550|
| N575AA|210-5(205)|      CESSNA|       210|
| N621AA|      172M|      CESSNA|       172|
+-------+----------+------------+----------+



In [16]:
flights_by_tailnum = flights.select("tail_number").groupBy("tail_number").count()
flights_by_tailnum.show(5)



+-----------+-----+
|tail_number|count|
+-----------+-----+
|     N240AU|  103|
|     N407AA| 1078|
|     N385US|  138|
|     N411US|   89|
|     N502AU|   16|
+-----------+-----+
only showing top 5 rows



                                                                                

In [17]:
cess_flights = flights_by_tailnum.join(broadcast(cess_models), flights_by_tailnum.tail_number == cess_models.tailnum, "right")

In [18]:
cess_flights.show(5)

22/11/05 14:39:11 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.


[Stage 31:==>             (2 + 14) / 16][Stage 32:>                 (0 + 1) / 1]

22/11/05 14:39:12 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.




22/11/05 14:39:12 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.
+-----------+-----+-------+-----+------------+----------+
|tail_number|count|tailnum|model|manufacturer|model_name|
+-----------+-----+-------+-----+------------+----------+
|     N421AA| 1084| N421AA| 421C|      CESSNA|       421|
|     N3744D|  298| N3744D| 182A|      CESSNA|       182|
|     N378AA|  225| N378AA| 172E|      CESSNA|       172|
|     N202AA| 1032| N202AA| 421C|      CESSNA|       421|
|     N201AA|  913| N201AA|  150|      CESSNA|       150|
+-----------+-----+-------+-----+------------+----------+
only showing top 5 rows



                                                                                

In [19]:
cess_flights = cess_flights.groupBy("model_name").agg(sum("count").alias("num_flights"))

In [20]:
top_3_df = cess_flights.orderBy(col("num_flights").desc()).limit(3)

In [21]:
top_3_df.show()

22/11/05 14:39:12 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.


[Stage 37:==>             (2 + 14) / 16][Stage 38:>                 (0 + 1) / 1]

22/11/05 14:39:13 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.
22/11/05 14:39:13 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.


                                                                                

+----------+-----------+
|model_name|num_flights|
+----------+-----------+
|       210|       2184|
|       421|       2116|
|       172|       1689|
+----------+-----------+



In [22]:
def output_top_3(li):
    for i in li:
        yield f"CESSNA {i[0]} \t {i[1]}"

In [23]:
for i in output_top_3(top_3_df.collect()):
    print(i)

22/11/05 14:39:14 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.




22/11/05 14:39:15 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.
22/11/05 14:39:15 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for right outer join.
CESSNA 210 	 2184
CESSNA 421 	 2116
CESSNA 172 	 1689


                                                                                

## Q2 average flight delay

In [24]:
provided_year = 1994

In [25]:
#flights.select("flight_number").show()
flights.dtypes

[('flight_id', 'int'),
 ('carrier_code', 'string'),
 ('flight_number', 'int'),
 ('flight_date', 'timestamp'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('tail_number', 'string'),
 ('scheduled_depature_time', 'string'),
 ('scheduled_arrival_time', 'string'),
 ('actual_departure_time', 'string'),
 ('actual_arrival_time', 'string'),
 ('distance', 'int')]

In [26]:
# Filter null flights
flights = flights.filter((flights.scheduled_depature_time.isNotNull()) & (~isnan(flights.scheduled_depature_time)))

In [27]:
flights = flights.filter((flights.actual_departure_time.isNotNull()) & (~isnan(flights.actual_departure_time)))

In [28]:
# cast
q2_flights = flights.withColumn("flight_date", col("flight_date").cast(DateType()))

In [29]:
# filter by year
q2_flights = q2_flights.withColumn("flight_yr", pyspark_yr("flight_date"))

In [30]:
q2_flights.dtypes

[('flight_id', 'int'),
 ('carrier_code', 'string'),
 ('flight_number', 'int'),
 ('flight_date', 'date'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('tail_number', 'string'),
 ('scheduled_depature_time', 'string'),
 ('scheduled_arrival_time', 'string'),
 ('actual_departure_time', 'string'),
 ('actual_arrival_time', 'string'),
 ('distance', 'int'),
 ('flight_yr', 'int')]

In [31]:
q2_flights = q2_flights.filter(col("flight_yr") == provided_year)

In [32]:
q2_flights = flights.select("carrier_code", "flight_date", "scheduled_depature_time", "actual_departure_time")
q2_flights

DataFrame[carrier_code: string, flight_date: timestamp, scheduled_depature_time: string, actual_departure_time: string]

In [33]:
q2_flights.show(5)

+------------+-------------------+-----------------------+---------------------+
|carrier_code|        flight_date|scheduled_depature_time|actual_departure_time|
+------------+-------------------+-----------------------+---------------------+
|          UA|1994-05-10 00:00:00|               20:15:00|             20:14:00|
|          UA|1994-05-12 00:00:00|               20:15:00|             20:14:00|
|          UA|1994-05-16 00:00:00|               20:15:00|             20:13:00|
|          UA|1994-05-17 00:00:00|               20:15:00|             20:14:00|
|          UA|1994-05-24 00:00:00|               20:15:00|             20:19:00|
+------------+-------------------+-----------------------+---------------------+
only showing top 5 rows



In [34]:
q2_flights = q2_flights.withColumn("flight_date", q2_flights.flight_date.cast(DateType()))
q2_flights = q2_flights.select("carrier_code", "flight_date", \
                               to_timestamp(when(q2_flights.scheduled_depature_time == "24:00:00", "00:00:00")\
                                            .otherwise(q2_flights.scheduled_depature_time), "HH:mm:ss")\
                                               .alias("scheduled_depature_time"),\
                               to_timestamp(when(q2_flights.actual_departure_time == "24:00:00", "00:00:00")\
                                            .otherwise(q2_flights.actual_departure_time), "HH:mm:ss")\
                                               .alias("actual_departure_time")\
                              )
q2_flights.printSchema()

root
 |-- carrier_code: string (nullable = true)
 |-- flight_date: date (nullable = true)
 |-- scheduled_depature_time: timestamp (nullable = true)
 |-- actual_departure_time: timestamp (nullable = true)



In [35]:
q2_flights.show(5)
q2_flights.dtypes

+------------+-----------+-----------------------+---------------------+
|carrier_code|flight_date|scheduled_depature_time|actual_departure_time|
+------------+-----------+-----------------------+---------------------+
|          UA| 1994-05-10|    1970-01-01 20:15:00|  1970-01-01 20:14:00|
|          UA| 1994-05-12|    1970-01-01 20:15:00|  1970-01-01 20:14:00|
|          UA| 1994-05-16|    1970-01-01 20:15:00|  1970-01-01 20:13:00|
|          UA| 1994-05-17|    1970-01-01 20:15:00|  1970-01-01 20:14:00|
|          UA| 1994-05-24|    1970-01-01 20:15:00|  1970-01-01 20:19:00|
+------------+-----------+-----------------------+---------------------+
only showing top 5 rows



[('carrier_code', 'string'),
 ('flight_date', 'date'),
 ('scheduled_depature_time', 'timestamp'),
 ('actual_departure_time', 'timestamp')]

In [36]:
# delay in seconds
q2_flights = q2_flights.select("carrier_code", (q2_flights["actual_departure_time"].cast("long") - q2_flights["scheduled_depature_time"].cast("long")).alias("delay_mins"))

In [37]:
q2_flights.show()

+------------+----------+
|carrier_code|delay_mins|
+------------+----------+
|          UA|       -60|
|          UA|       -60|
|          UA|      -120|
|          UA|       -60|
|          UA|       240|
|          UA|      1500|
|          UA|       -60|
|          UA|       300|
|          UA|       120|
|          UA|      3300|
|          UA|         0|
|          UA|         0|
|          UA|      -180|
|          UA|        60|
|          UA|       120|
|          UA|        60|
|          UA|      -120|
|          UA|         0|
|          UA|         0|
|          UA|       -60|
+------------+----------+
only showing top 20 rows



In [38]:
q2_flights = q2_flights.filter(col("delay_mins") != 0)\
                                .withColumn("final_delay", when(col("delay_mins")/60 <= 12, col("delay_mins"))\
                                   .when(col("delay_mins")/60 < -12, ((24*60) + col("delay_mins")))\
                                   .otherwise(-2))

In [39]:
q2_flights = q2_flights.filter(col("final_delay") > 0).select("carrier_code", "final_delay")

In [40]:
q2_flights = q2_flights.filter(col("carrier_code").isNotNull() & (~isnan(col("carrier_code"))))

In [41]:
airlines_filtered = airlines.filter(col("carrier_code").isNotNull() & (~isnan(col("carrier_code"))))\
                                                        .withColumnRenamed("carrier_code", "al_carrier_code")

In [42]:
# left join because airlines_df will have carrier codes that are not used in flights
flights_w_airlines = q2_flights.join(broadcast(airlines_filtered), q2_flights.carrier_code == airlines_filtered.al_carrier_code, "left")

In [43]:
flights_w_airlines = flights_w_airlines.select("carrier_code", "final_delay", "name")

In [44]:
flights_w_airlines = flights_w_airlines.groupBy("name").agg(count("final_delay").alias("num_delays"),\
                                              round(mean("final_delay"),2).alias("avg_delay"))

In [45]:
flights_w_airlines.show()



+--------------------+----------+---------+
|                name|num_delays|avg_delay|
+--------------------+----------+---------+
|            Tway Air|      5239|   335.78|
|     United Airlines|    162142|   278.34|
|    Sparrow Aviation|     24470|   331.43|
|Southwest Airline...|     39818|    377.6|
|Continental Air L...|     77139|   280.22|
|Northwest Airline...|     47378|   305.73|
|          US Airways|     27254|   279.57|
|Alaska Airlines Inc.|      5209|   365.43|
|Delta Air Lines Inc.|    159436|   274.32|
|American Airlines...|    123019|   275.14|
|    Independence Air|      1532|   373.08|
|        JetSuite Air|     19996|   304.01|
|Skywest Airlines ...|     22974|   320.33|
|American Eagle Ai...|     23952|   339.51|
|        Air Tanzania|       487|   321.19|
|     JetBlue Airways|      7924|   313.83|
|             AirTran|     11677|   343.87|
|Atlantic Southeas...|     14645|   386.51|
|Hawaiian Airlines...|       244|   259.18|
|              Comair|      1696

                                                                                

In [46]:
def q2_output(li):
    # needs to be sorted by airline name (ascending order (a-z))
    for i in li:
        yield f"{i[0]} \t {i[1]} \t {i[2]}"

In [47]:
q2_row_li = flights_w_airlines.collect()
q2_row_li.sort(key=itemgetter(0))
for i in q2_output(q2_row_li):
    print(i)



Air Tanzania 	 487 	 321.19
AirTran 	 11677 	 343.87
Alaska Airlines Inc. 	 5209 	 365.43
American Airlines Inc. 	 123019 	 275.14
American Eagle Airlines Inc. 	 23952 	 339.51
Atlantic Southeast Airlines 	 14645 	 386.51
Comair 	 1696 	 484.42
Continental Air Lines Inc. 	 77139 	 280.22
Delta Air Lines Inc. 	 159436 	 274.32
Frontier Airlines Inc. 	 7453 	 323.57
Hawaiian Airlines Inc. 	 244 	 259.18
Independence Air 	 1532 	 373.08
JetBlue Airways 	 7924 	 313.83
JetSuite Air 	 19996 	 304.01
Mesa Airlines Inc. 	 3231 	 378.92
Northwest Airlines Inc. 	 47378 	 305.73
PSA Airlines Inc. 	 1696 	 484.42
Pinnacle Airlines Inc. 	 2074 	 307.03
Skywest Airlines Inc. 	 22974 	 320.33
Southwest Airlines Co. 	 39818 	 377.6
Sparrow Aviation 	 24470 	 331.43
Tway Air 	 5239 	 335.78
US Airways 	 27254 	 279.57
United Airlines 	 162142 	 278.34


                                                                                

## Q3

In [48]:
flights = spark.read.csv("ontimeperformance_flights_tiny.csv", inferSchema=True, header=True)
flights.dtypes

                                                                                

[('flight_id', 'int'),
 ('carrier_code', 'string'),
 ('flight_number', 'int'),
 ('flight_date', 'timestamp'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('tail_number', 'string'),
 ('scheduled_depature_time', 'timestamp'),
 ('scheduled_arrival_time', 'string'),
 ('actual_departure_time', 'string'),
 ('actual_arrival_time', 'string'),
 ('distance', 'int')]

In [49]:
q3_flights = flights.filter(col("actual_departure_time").isNotNull() & ~isnan(col("actual_departure_time")))

In [50]:
q3_flights = q3_flights.groupBy("tail_number", "carrier_code").count()

In [51]:
q3_flights.show(5)

+-----------+------------+-----+
|tail_number|carrier_code|count|
+-----------+------------+-----+
|     N17345|          CO|  224|
|       N365|          WN|   94|
|     N507DA|          DL|   90|
|     N526UA|          UA|  232|
|     N7441U|          UA|  145|
+-----------+------------+-----+
only showing top 5 rows



                                                                                

In [52]:
q3_aircrafts = aircrafts.select("tailnum", "manufacturer", "model")

In [53]:
q3_airlines = airlines.select(col("carrier_code").alias("al_carrier_code"), "name")

In [54]:
q3_aircrafts = q3_aircrafts.select("tailnum", concat(col("manufacturer"), lit(" "), col("model")).alias("manu_model"))

In [55]:
q3_aircrafts.filter(col("manu_model").isNull()).show()

+-------+----------+
|tailnum|manu_model|
+-------+----------+
+-------+----------+



In [56]:
q3_flights = q3_flights.filter(col("tail_number").isNotNull() & ~isnan(col("tail_number")))

In [57]:
# join flights with airlines
q3_f_al = q3_flights.join(q3_airlines, q3_flights.carrier_code == q3_airlines.al_carrier_code, "left")

In [58]:
q3_f_al = q3_f_al.select("tail_number", "count", "name")
q3_f_al.show(5)



+-----------+-----+--------------------+
|tail_number|count|                name|
+-----------+-----+--------------------+
|     N17345|  224|Continental Air L...|
|       N365|   94|Southwest Airline...|
|     N507DA|   90|Delta Air Lines Inc.|
|     N526UA|  232|     United Airlines|
|     N7441U|  145|     United Airlines|
+-----------+-----+--------------------+
only showing top 5 rows



                                                                                

In [59]:
q3_final_join = q3_f_al.join(broadcast(q3_aircrafts), q3_f_al.tail_number == q3_aircrafts.tailnum, "left")

In [60]:
q3_final_join = q3_final_join.groupBy("name", "manu_model")\
                                .agg(sum(col("count")).alias("num_flights"))

In [61]:
q3_final_join = q3_final_join.filter(col("manu_model").isNotNull())

In [62]:
q3_final_join.show(5)



+--------------------+--------------------+-----------+
|                name|          manu_model|num_flights|
+--------------------+--------------------+-----------+
|          US Airways|      BOEING 737-3B7|        933|
|Northwest Airline...|AIRBUS INDUSTRIE ...|       3756|
|Skywest Airlines ...|     EMBRAER EMB-120|        914|
|Alaska Airlines Inc.|      BOEING 737-8FH|         30|
|Frontier Airlines...|     AIRBUS A319-111|       4141|
+--------------------+--------------------+-----------+
only showing top 5 rows



                                                                                

In [63]:
q3_final_join = q3_final_join.select("name", "num_flights", "manu_model", \
                                     rank().over(Window.partitionBy("name")\
                                            .orderBy(col("num_flights").desc()))\
                                            .alias("rank")).filter(col("rank") <= 5)

In [64]:
q3_final_join = q3_final_join.select("name", "manu_model").groupBy("name").agg(collect_list("manu_model"))

In [65]:
q3_final_join.show(5, truncate=False)



+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|name                        |collect_list(manu_model)                                                                                                               |
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|AirTran                     |[BOEING 717-200, BOEING 737-76N, BOEING 737-7BD]                                                                                       |
|Alaska Airlines Inc.        |[BOEING 737-4Q8, BOEING 737-790, BOEING 737-490, MCDONNELL DOUGLAS DC-9-83(MD-83), BOEING 737-990]                                     |
|American Airlines Inc.      |[MCDONNELL DOUGLAS DC-9-82(MD-82), MCDONNELL DOUGLAS DC-9-83(MD-83), BOEING 757-223, BOEING 767-223, BOEING 767-323]                   

                                                                                

In [66]:
def q3_parse(li):
    for i in li:
        yield f"{i[0]} \t [{', '.join(i[1])}]"

In [67]:
q3_final_li = q3_final_join.collect()
q3_final_li.sort(key=itemgetter(0))
for i in q3_parse(q3_final_li):
    print(i)

AirTran 	 [BOEING 717-200, BOEING 737-76N, BOEING 737-7BD]
Alaska Airlines Inc. 	 [BOEING 737-4Q8, BOEING 737-790, BOEING 737-490, MCDONNELL DOUGLAS DC-9-83(MD-83), BOEING 737-990]
American Airlines Inc. 	 [MCDONNELL DOUGLAS DC-9-82(MD-82), MCDONNELL DOUGLAS DC-9-83(MD-83), BOEING 757-223, BOEING 767-223, BOEING 767-323]
American Eagle Airlines Inc. 	 [EMBRAER EMB-145LR, EMBRAER EMB-135KL, BOMBARDIER INC CL-600-2C10, SAAB-SCANIA SAAB 340B, EMBRAER EMB-135LR]
Atlantic Southeast Airlines 	 [BOMBARDIER INC CL-600-2B19, BOMBARDIER INC CL-600-2C10, CANADAIR CL-600-2B19, AEROSPATIALE/ALENIA ATR-72-212, AEROSPATIALE ATR 72-212]
Comair 	 [BOMBARDIER INC CL-600-2B19, CANADAIR CL-600-2B19, BOMBARDIER INC CL-600-2C10, BOMBARDIER INC CL600-2D24, PIPER PA-28-180]
Continental Air Lines Inc. 	 [BOEING 737-524, BOEING 737-3TO, BOEING 737-824, BOEING 757-224, BOEING 737-724]
Delta Air Lines Inc. 	 [MCDONNELL DOUGLAS AIRCRAFT CO MD-88, BOEING 757-232, BOEING 767-332, BOEING 737-832, MCDONNELL DOUGLAS CO