In [0]:
pip install pyspark

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max, min, sum, month

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("FlightDataAnalysis").getOrCreate()

In [0]:
#File uploaded to /FileStore/tables/airports-1.csv
#File uploaded to /FileStore/tables/flights-1.csv
#File uploaded to /FileStore/tables/raw_flight_data-1.csv

In [0]:
# Load CSV files
airports_df = spark.read.csv("/FileStore/tables/airports-1.csv", header=True, inferSchema=True)
flights_df = spark.read.csv("/FileStore/tables/flights-1.csv", header=True, inferSchema=True)
raw_flight_data_df = spark.read.csv("/FileStore/tables/raw_flight_data-1.csv", header=True, inferSchema=True)

In [0]:
# **Data Cleaning**
# Remove duplicates
airports_df = airports_df.dropDuplicates()
flights_df = flights_df.dropDuplicates()
raw_flight_data_df = raw_flight_data_df.dropDuplicates()

In [0]:
# Drop missing values
airports_df = airports_df.na.drop()
flights_df = flights_df.na.drop()
raw_flight_data_df = raw_flight_data_df.na.drop()

In [0]:
flights_df = flights_df.withColumn("delay", col("ArrDelay") + col("DepDelay"))
flights_df.show(5)


+----------+---------+-------+---------------+-------------+--------+--------+-----+
|DayofMonth|DayOfWeek|airline|OriginAirportID|DestAirportID|DepDelay|ArrDelay|delay|
+----------+---------+-------+---------------+-------------+--------+--------+-----+
|        19|        5|     dl|          14869|        12478|       0|      -8|   -8|
|        19|        5|     dl|          11193|        12892|      -6|     -11|  -17|
|        19|        5|     dl|          14057|        14869|      -4|     -15|  -19|
|        19|        5|     dl|          15016|        11433|      28|      24|   52|
|        19|        5|     dl|          11433|        13303|      -3|       1|   -2|
+----------+---------+-------+---------------+-------------+--------+--------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import avg

flights_df.groupBy("airline").agg(avg("delay").alias("avg_delay")).show()

+-------+------------------+
|airline|         avg_delay|
+-------+------------------+
|     us| 8.913458236021432|
|     b6|22.260432906116154|
|     mq|29.074327881432307|
|     ha|3.0685993111366248|
|     yv| 17.96360843510582|
|     fl|17.397470621877392|
|     9e|14.355000564468584|
|     ev|24.455205272063406|
|     f9|24.973556077904632|
|     wn|21.187773275127398|
|     ua|17.735460206789725|
|     vx| 24.05662279312232|
|     as|0.3872078433661113|
|     dl|10.244658520030713|
|     oo| 14.21580187201624|
|     aa| 19.20978568288267|
+-------+------------------+



In [0]:
airports_df.show(5)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10304|      Aniak|   AK|       Aniak Airport|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10165|Adak Island|   AK|                Adak|
+----------+-----------+-----+--------------------+
only showing top 5 rows



In [0]:
raw_flight_data_df.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11433|        13303|      -3|       1|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows



In [0]:
# **Joining all three datasets**
# First join flights with airport data (based on origin airport code)
flights_df = flights_df.withColumnRenamed("Carrier", "airline")
from pyspark.sql.functions import trim, lower, col

flights_df = flights_df.withColumn("airline", trim(lower(col("airline"))))
flights_enriched_df = flights_df.join(airports_df, flights_df["OriginAirportID"] == airports_df["airport_id"], "left")

In [0]:
# Now join with raw flight data (assuming 'flight_id' is the key)
final_df = flights_enriched_df.join(raw_flight_data_df, "OriginAirportID", "left")

In [0]:
final_df.show(5)

+---------------+----------+---------+-------+-------------+--------+--------+-----+----------+-------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|OriginAirportID|DayofMonth|DayOfWeek|airline|DestAirportID|DepDelay|ArrDelay|delay|airport_id|   city|state|                name|DayofMonth|DayOfWeek|Carrier|DestAirportID|DepDelay|ArrDelay|
+---------------+----------+---------+-------+-------------+--------+--------+-----+----------+-------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|          10397|        19|        5|     dl|        15016|      -1|     -19|  -20|     10397|Atlanta|   GA|Hartsfield-Jackso...|        18|        4|     DL|        11298|      -3|      44|
|          10397|        19|        5|     dl|        15016|      -1|     -19|  -20|     10397|Atlanta|   GA|Hartsfield-Jackso...|        18|        4|     DL|        12451|       1|      -7|
|          10397|        19|        5|  

In [0]:
# 1. Total number of flights
total_flights = final_df.count()

In [0]:
# 2. Most frequent airline
most_frequent_airline = final_df.groupBy("airline").count().orderBy(col("count").desc()).first()

In [0]:
# 3. Busiest airport (most departures)
busiest_airport = final_df.groupBy("OriginAirportID").count().orderBy(col("count").desc()).first()

In [0]:
# 4. Average flight delay
avg_delay = final_df.select(avg("delay")).first()[0]

In [0]:
# 5. Maximum and minimum flight delay recorded
max_delay = final_df.select(max("delay")).first()[0]
min_delay = final_df.select(min("delay")).first()[0]

In [0]:
# 6. Airlines with the most delays
airline_delays = final_df.groupBy("airline").agg(avg("delay").alias("avg_delay")).orderBy(col("avg_delay").desc())

In [0]:
# 8. Airports with the most delays
airport_delays = final_df.groupBy("OriginAirportID").agg(avg("delay").alias("avg_delay")).orderBy(col("avg_delay").desc())

In [0]:
# 9. Percentage of delayed vs. on-time flights
total_delayed_flights = final_df.filter(col("delay") > 0).count()
percentage_delayed = (total_delayed_flights / total_flights) * 100 if total_flights > 0 else 0

In [0]:
flights_df = flights_df.withColumnRenamed("DestAirportID", "destination")

In [0]:
from pyspark.sql.functions import count

destination_counts_df = flights_df.groupBy("destination").agg(count("*").alias("flight_count"))
destination_counts_df.show()

+-----------+------------+
|destination|flight_count|
+-----------+------------+
|      14570|       10351|
|      12264|       37574|
|      14771|       83629|
|      11057|       76441|
|      13830|       11254|
|      12191|       28844|
|      10529|       13063|
|      11259|       18993|
|      10800|       14137|
|      14831|       24268|
|      12889|       77762|
|      14730|       10939|
|      13303|       42014|
|      13342|       20964|
|      10140|       17545|
|      11278|       42829|
|      15304|       35126|
|      10792|       14791|
|      14524|       11482|
|      10423|       26987|
+-----------+------------+
only showing top 20 rows



In [0]:
final_df.show(3)

+---------------+----------+---------+-------+-------------+--------+--------+-----+----------+--------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|OriginAirportID|DayofMonth|DayOfWeek|airline|DestAirportID|DepDelay|ArrDelay|delay|airport_id|    city|state|                name|DayofMonth|DayOfWeek|Carrier|DestAirportID|DepDelay|ArrDelay|
+---------------+----------+---------+-------+-------------+--------+--------+-----+----------+--------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|          14057|        19|        5|     dl|        14869|      -4|     -15|  -19|     14057|Portland|   OR|Portland Internat...|        21|        7|     DL|        14869|      -7|     -13|
|          14057|        19|        5|     dl|        14869|      -4|     -15|  -19|     14057|Portland|   OR|Portland Internat...|        19|        5|     AA|        11298|      -8|     -22|
|          14057|        19|       

In [0]:
flights_enriched_df = flights_df.join(
    airports_df, 
    flights_df["destination"] == airports_df["airport_id"], 
    "left"
)

In [0]:
flights_enriched_df.show(2)

+----------+---------+-------+---------------+-----------+--------+--------+-----+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|airline|OriginAirportID|destination|DepDelay|ArrDelay|delay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-----------+--------+--------+-----+----------+--------------+-----+--------------------+
|        19|        5|     dl|          14869|      12478|       0|      -8|   -8|     12478|      New York|   NY|John F. Kennedy I...|
|        19|        5|     dl|          14057|      14869|      -4|     -15|  -19|     14869|Salt Lake City|   UT|Salt Lake City In...|
+----------+---------+-------+---------------+-----------+--------+--------+-----+----------+--------------+-----+--------------------+
only showing top 2 rows



In [0]:
raw_flight_data_df.show(2)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [0]:
full_flights_df = flights_enriched_df.join(
    raw_flight_data_df, 
    ["OriginAirportID"],  # Modify based on actual common columns
    "left"
)

In [0]:
full_flights_df.show(2)

+---------------+----------+---------+-------+-----------+--------+--------+-----+----------+--------------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|OriginAirportID|DayofMonth|DayOfWeek|airline|destination|DepDelay|ArrDelay|delay|airport_id|          city|state|                name|DayofMonth|DayOfWeek|Carrier|DestAirportID|DepDelay|ArrDelay|
+---------------+----------+---------+-------+-----------+--------+--------+-----+----------+--------------+-----+--------------------+----------+---------+-------+-------------+--------+--------+
|          14057|        19|        5|     dl|      14869|      -4|     -15|  -19|     14869|Salt Lake City|   UT|Salt Lake City In...|        21|        7|     DL|        14869|      -7|     -13|
|          14057|        19|        5|     dl|      14869|      -4|     -15|  -19|     14869|Salt Lake City|   UT|Salt Lake City In...|        19|        5|     AA|        11298|      -8|     -22|
+--------------

In [0]:
# 18. Top 5 destination airports by traffic
top_destinations = full_flights_df.groupBy("destination").count().orderBy(col("count").desc()).limit(5)

In [0]:
top_destinations.show(3)