In [55]:
# Milestone 2: Core Spark Analysis

# Calculating Metrics & Trends.

# Import required libraries.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, min, corr, month

# Create a Spark session
spark = SparkSession.builder \
    .appName("FlightAnalysis") \
    .getOrCreate()

In [56]:
# Load the flight data into a DataFrame.
flights_df = spark.read.parquet("/Users/sonushah/Desktop/final_assignments/Flights 1m.parquet")

In [57]:
# Displaying the dataframe.
flights_df.show(5)

+----------+---------+---------+--------+--------+---------+---------+
|   FL_DATE|DEP_DELAY|ARR_DELAY|AIR_TIME|DISTANCE| DEP_TIME| ARR_TIME|
+----------+---------+---------+--------+--------+---------+---------+
|2006-01-01|        5|       19|     350|    2475| 9.083333|12.483334|
|2006-01-02|      167|      216|     343|    2475|11.783334|15.766666|
|2006-01-03|       -7|       -2|     344|    2475| 8.883333|12.133333|
|2006-01-04|       -5|      -13|     331|    2475| 8.916667|    11.95|
|2006-01-05|       -3|      -17|     321|    2475|     8.95|11.883333|
+----------+---------+---------+--------+--------+---------+---------+
only showing top 5 rows



In [58]:
# Calculate average departure and arrival delay for all flights.
avg_delays = flights_df.select(avg(col("DEP_DELAY")).alias("AvgDepartureDelay"),
                               avg(col("ARR_DELAY")).alias("AvgArrivalDelay")).collect()[0]
print("Average Departure Delay:", avg_delays["AvgDepartureDelay"])
print("Average Arrival Delay:", avg_delays["AvgArrivalDelay"])

Average Departure Delay: 8.651135
Average Arrival Delay: 6.400658


In [59]:
# Identify the longest flight by air time, including its distance and delays.
longest_flight = flights_df.select(max(col("AIR_TIME")).alias("LongestAirTime")).collect()[0]
longest_flight_info = flights_df.filter(col("AIR_TIME") == longest_flight["LongestAirTime"]).first()
print("Longest Flight Air Time:", longest_flight_info["AIR_TIME"])
print("Longest Flight Distance:", longest_flight_info["DISTANCE"])
print("Longest Flight Departure Delay:", longest_flight_info["DEP_DELAY"])
print("Longest Flight Arrival Delay:", longest_flight_info["ARR_DELAY"])

Longest Flight Air Time: 877
Longest Flight Distance: 1069
Longest Flight Departure Delay: 51
Longest Flight Arrival Delay: 770


In [60]:
# Determine the time of day with the least average departure delay.
min_dep_delay_time = flights_df.groupBy("DEP_TIME").agg(avg("DEP_DELAY").alias("AvgDepDelay")).orderBy("AvgDepDelay").first()
print("Time of Day with Least Average Departure Delay:", min_dep_delay_time["DEP_TIME"])



Time of Day with Least Average Departure Delay: 4.050000190734863


                                                                                

In [61]:
# Evaluate the correlation between departure delay and arrival delay, alongside air time and distance impact.
correlation = flights_df.select(corr("DEP_DELAY", "ARR_DELAY").alias("DepArrDelayCorrelation"),
                                corr("AIR_TIME", "DISTANCE").alias("AirTimeDistanceCorrelation")).collect()[0]
print("Departure Delay vs Arrival Delay Correlation:", correlation["DepArrDelayCorrelation"])
print("Air Time vs Distance Correlation:", correlation["AirTimeDistanceCorrelation"])

Departure Delay vs Arrival Delay Correlation: 0.9052460873068467
Air Time vs Distance Correlation: 0.9759585972042383


In [62]:
# Examine how average departure and arrival delays vary by month.
avg_delays_by_month = flights_df.groupBy(month("FL_DATE").alias("Month")) \
    .agg(avg("DEP_DELAY").alias("AvgDepartureDelay"), avg("ARR_DELAY").alias("AvgArrivalDelay")) \
    .orderBy("Month").collect()

print("Average Departure and Arrival Delays by Month:")
for row in avg_delays_by_month:
    print("Month:", row["Month"], "| Avg Departure Delay:", row["AvgDepartureDelay"], "| Avg Arrival Delay:", row["AvgArrivalDelay"])


Average Departure and Arrival Delays by Month:
Month: 1 | Avg Departure Delay: 8.05607493027906 | Avg Arrival Delay: 5.628670654061354
Month: 2 | Avg Departure Delay: 9.440354060529927 | Avg Arrival Delay: 7.4245329983483375


In [63]:
# Categorization & Data Preparation.

# Categorize flights based on delay severity with defined thresholds.

from pyspark.sql.functions import when

# Define delay severity thresholds.
thresholds = {'Minor': 15, 'Moderate': 30, 'Major': 60}

# Categorize flights based on delay severity
categorized_df = flights_df.withColumn("DelaySeverity",
                               when(flights_df["DEP_DELAY"] <= thresholds['Minor'], "Minor")
                               .when(flights_df["DEP_DELAY"] <= thresholds['Moderate'], "Moderate")
                               .otherwise("Major"))
categorized_df.show()

+----------+---------+---------+--------+--------+---------+---------+-------------+
|   FL_DATE|DEP_DELAY|ARR_DELAY|AIR_TIME|DISTANCE| DEP_TIME| ARR_TIME|DelaySeverity|
+----------+---------+---------+--------+--------+---------+---------+-------------+
|2006-01-01|        5|       19|     350|    2475| 9.083333|12.483334|        Minor|
|2006-01-02|      167|      216|     343|    2475|11.783334|15.766666|        Major|
|2006-01-03|       -7|       -2|     344|    2475| 8.883333|12.133333|        Minor|
|2006-01-04|       -5|      -13|     331|    2475| 8.916667|    11.95|        Minor|
|2006-01-05|       -3|      -17|     321|    2475|     8.95|11.883333|        Minor|
|2006-01-06|       -4|      -32|     320|    2475| 8.933333|11.633333|        Minor|
|2006-01-08|       -3|       -2|     346|    2475|     8.95|12.133333|        Minor|
|2006-01-09|        3|        0|     334|    2475|     9.05|12.166667|        Minor|
|2006-01-10|       -7|      -21|     334|    2475| 8.883333|11.81

In [64]:
# Analyze if longer flights are more prone to longer delays than shorter ones.

# Calculate average delay for shorter and longer flights.
average_delay_by_distance = flights_df.groupBy("DISTANCE") \
    .agg(avg("DEP_DELAY").alias("AvgDepartureDelay")) \
    .orderBy("DISTANCE")

average_delay_by_distance.show()

+--------+-------------------+
|DISTANCE|  AvgDepartureDelay|
+--------+-------------------+
|      30|               39.5|
|      31|  9.544642857142858|
|      36| 1.4561933534743203|
|      47|  5.445061043285238|
|      49| 2.2894248608534324|
|      50|              120.0|
|      56| 2.6363636363636362|
|      64|  2.252450980392157|
|      67| 13.672546857772877|
|      68| 1.7257142857142858|
|      70| 1.6409090909090909|
|      71|               34.0|
|      73| 7.9511111111111115|
|      74| 1.7270114942528736|
|      75|  16.08695652173913|
|      76|-1.0555555555555556|
|      77|  9.522670025188917|
|      78| 11.541147132169575|
|      79| 10.633484162895927|
|      81|  9.329787234042554|
+--------+-------------------+
only showing top 20 rows



In [66]:
# Observe how departures and arrivals vary hourly, identifying peak hours.

# Extract hour from departure and arrival timestamps
flights_df = flights_df.withColumn("DepartureHour", col("DEP_TIME").cast("integer"))
flights_df = flights_df.withColumn("ArrivalHour", col("ARR_TIME").cast("integer"))

# Calculate average departures and arrivals by hour
avg_departures_by_hour = flights_df.groupBy("DepartureHour").avg("DEP_DELAY").orderBy("DepartureHour")
avg_arrivals_by_hour = flights_df.groupBy("ArrivalHour").avg("ARR_DELAY").orderBy("ArrivalHour")

# Identify peak hours based on the highest average delay
peak_departure_hour = avg_departures_by_hour.orderBy(col("avg(DEP_DELAY)").desc()).first()["DepartureHour"]
peak_arrival_hour = avg_arrivals_by_hour.orderBy(col("avg(ARR_DELAY)").desc()).first()["ArrivalHour"]

print("Peak Departure Hour:", peak_departure_hour)
print("Peak Arrival Hour:", peak_arrival_hour)

Peak Departure Hour: 3
Peak Arrival Hour: 3


In [68]:
# Prepare a dataset for a machine learning model to predict arrival delay.

# Select relevant features and target variable
ml_dataset = flights_df.select("DISTANCE", "AIR_TIME", "DEP_DELAY", "ARR_DELAY").show()

+--------+--------+---------+---------+
|DISTANCE|AIR_TIME|DEP_DELAY|ARR_DELAY|
+--------+--------+---------+---------+
|    2475|     350|        5|       19|
|    2475|     343|      167|      216|
|    2475|     344|       -7|       -2|
|    2475|     331|       -5|      -13|
|    2475|     321|       -3|      -17|
|    2475|     320|       -4|      -32|
|    2475|     346|       -3|       -2|
|    2475|     334|        3|        0|
|    2475|     334|       -7|      -21|
|    2475|     321|        8|      -10|
|    2475|     321|       -5|      -27|
|    2475|     327|       -7|       -6|
|    2475|     333|       47|       73|
|    2475|     329|       -4|      -14|
|    2475|     337|       -4|      -11|
|    2475|     342|       -1|        4|
|    2475|     343|       -2|        2|
|    2475|     343|       -3|       -8|
|    2475|     342|       -8|       -9|
|    2475|     338|       -4|       -1|
+--------+--------+---------+---------+
only showing top 20 rows



In [70]:
# Identify the extremes in flight operations for both departure and arrival delays and write these analyses
#into an Excel file with different sheets.

import pandas as pd

extreme_departure_delay = flights_df.select(max("DEP_DELAY").alias("MaxDepartureDelay"), min("DEP_DELAY").alias("MinDepartureDelay"))
extreme_arrival_delay = flights_df.select(max("ARR_DELAY").alias("MaxArrivalDelay"), min("ARR_DELAY").alias("MinArrivalDelay"))

# Write analyses into an Excel file with different sheets
with pd.ExcelWriter('flight_analysis.xlsx') as writer:
    extreme_departure_delay.toPandas().to_excel(writer, sheet_name='Extreme Departure Delay', index=False)
    extreme_arrival_delay.toPandas().to_excel(writer, sheet_name='Extreme Arrival Delay', index=False)

24/03/04 19:30:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1007253 ms exceeds timeout 120000 ms
24/03/04 19:30:06 WARN SparkContext: Killing executors is not supported by current scheduler.
