Flight delays analysis using sparkSQL-pyspark

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=e0d76b131d660668991d8e323d5d10d7f607e97cfb7853fd94fc6b173d9fea03
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Flights Analysis") \
    .getOrCreate()


In [None]:
spark

In [None]:
# Load the flights dataset
flights = spark.read.format("csv") \
.option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/departuredelays.csv")


In [None]:
#create a temporary view of ddatframe
flights.createOrReplaceTempView("flights")

In [None]:
# Perform analysis using Spark SQL
# Example 1: Average delay by origin airport
avg_delay_by_origin_mithil=spark.sql("""
    SELECT origin,Avg(delay)as avg_delay
    FROM flights
    GROUP BY origin
    ORDER BY avg_delay DESC
    LIMIT 10
""")

In [None]:
avg_delay_by_origin_mithil.show()

+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+



In [None]:
#emaple 2: tot flights and average delay by day of week
flights_by_day_mithil=spark.sql("""
SELECT
date,
COUNT(*) as total_flights,
AVG(delay) as avg_delay
From flights
group By date
order by date
limit 7
""")

In [None]:
flights_by_day_mithil.collect()

[Row(date=1010005, total_flights=1, avg_delay=-8.0),
 Row(date=1010010, total_flights=1, avg_delay=-6.0),
 Row(date=1010020, total_flights=2, avg_delay=-1.0),
 Row(date=1010023, total_flights=1, avg_delay=14.0),
 Row(date=1010025, total_flights=2, avg_delay=15.0),
 Row(date=1010029, total_flights=1, avg_delay=49.0),
 Row(date=1010030, total_flights=3, avg_delay=-5.666666666666667)]

In [None]:
# Example 3: Top 5 routes with the highest total delay
top_delayed_routes_mithil = spark.sql("""
    SELECT
        origin,
        destination,
        SUM(delay) as total_delay,
        COUNT(*) as flight_count
    FROM flights
    GROUP BY origin, destination
    ORDER BY total_delay DESC
    LIMIT 5
""")


In [None]:
top_delayed_routes_mithil.show()


+------+-----------+-----------+------------+
|origin|destination|total_delay|flight_count|
+------+-----------+-----------+------------+
|   LAX|        SFO|      51844|        3198|
|   ORD|        SFO|      41653|        1731|
|   SFO|        LAX|      40798|        3232|
|   LGA|        ATL|      35761|        2500|
|   JFK|        LAX|      35755|        2720|
+------+-----------+-----------+------------+



In [None]:
#show results
print("Top 10 origins by average delay:")
avg_delay_by_origin_mithil.show()

print("\nflights and average delay by day (first week):")
flights_by_day_mithil.show()

print("\nTop 5 routes with highest total delay:")
top_delayed_routes_mithil.show()

Top 10 origins by average delay:
+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+


flights and average delay by day (first week):
+-------+-------------+------------------+
|   date|total_flights|         avg_delay|
+-------+-------------+------------------+
|1010005|            1|              -8.0|
|1010010|            1|              -6.0|
|1010020|            2|              -1.0|
|1010023|            1|              14.0|
|1010025|            2|              15.0|
|1010029|            1|              49.0|
|1010030|            3|-5.666666666666667|
+-------+-------------+------------------+


Top 5 routes with highest total delay:
+------+----

In [None]:
#stop the sparksession
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg,count,hour,month

In [None]:
#initialize Sparksession
spark=SparkSession.builder \
.appName("extended flights analysis") \
.getOrCreate()

In [None]:
#load the flights dataset
flights=spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/content/departuredelays.csv")

In [None]:
# Create a temporary view of the DataFrame
flights.createOrReplaceTempView("flights")

In [None]:
# Example 4: Average delay by origin airport (top 10)
avg_delay_by_origin_mithil = spark.sql("""
    SELECT origin, AVG(delay) as avg_delay
    FROM flights
    GROUP BY origin
    ORDER BY avg_delay DESC
    LIMIT 10
""")

In [None]:
avg_delay_by_origin_mithil.show()

+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+



In [None]:
# Example 5: Top 5 busiest routes
busiest_routes_mithil = spark.sql("""
    SELECT
    origin,
    destination,
    COUNT(*) as flight_count
    FROM flights
    GROUP BY origin, destination
    ORDER BY flight_count DESC
    LIMIT 5
""")

In [None]:
busiest_routes_mithil.show()

+------+-----------+------------+
|origin|destination|flight_count|
+------+-----------+------------+
|   SFO|        LAX|        3232|
|   LAX|        SFO|        3198|
|   LAS|        LAX|        3016|
|   LAX|        LAS|        2964|
|   JFK|        LAX|        2720|
+------+-----------+------------+



In [None]:
# Example 6: Monthly flight trends
monthly_trends_mithil = spark.sql("""
    SELECT
    SUBSTRING(CAST(date AS STRING), 5, 2) as month,
    COUNT(*) as total_flights,
    AVG(delay) as avg_delay
    FROM flights
    GROUP BY SUBSTRING(CAST(date AS STRING), 5, 2)
    ORDER BY month
""")


In [None]:
monthly_trends_mithil.show()

+-----+-------------+------------------+
|month|total_flights|         avg_delay|
+-----+-------------+------------------+
|   00|        30649|11.788965382231067|
|   01|        25780|11.812063615205586|
|   02|        22895|13.687311640096091|
|   03|        25564|13.048623063683305|
|   04|        21493|13.328013771925743|
|   05|        23497|13.285057666936204|
|   10|        26978| 13.16543109200089|
|   11|        23701|12.136070208008102|
|   12|        20706|11.687192118226601|
|   13|        19877|11.790461337223928|
|   14|        19595|12.156723653993366|
|   15|        22111|12.806883451675636|
|   20|        21945| 11.77717019822283|
|   21|        18080|11.789546460176991|
|   22|        15428|  13.0869198859217|
|   23|        17676|11.292939579090293|
|   24|        15752|11.935627221940072|
|   25|        16961|11.581451565355817|
|   30|        18012|11.682378414390406|
|   31|        17022|12.303900834214545|
+-----+-------------+------------------+
only showing top

In [None]:
# Example 7: Percentage of delayed flights by origin
delayed_percentage = spark.sql("""
    SELECT
    origin,
    COUNT(*) as total_flights,
    SUM(CASE WHEN delay > 0 THEN 1 ELSE 0 END) as delayed_flights,
      (SUM(CASE WHEN delay > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) as delayed_percentage
    FROM flights
    GROUP BY origin
    ORDER BY delayed_percentage DESC
    LIMIT 10
""")

In [None]:
delayed_percentage.show()

+------+-------------+---------------+------------------+
|origin|total_flights|delayed_flights|delayed_percentage|
+------+-------------+---------------+------------------+
|   DAL|        11272|           7153| 63.45812633073101|
|   MDW|        20056|          12671| 63.17810131631432|
|   HOU|        14740|           8639| 58.60922659430122|
|   DEN|        53148|          30760| 57.87611951531572|
|   BWI|        21558|          12448| 57.74190555710177|
|   OAK|        10026|           5423| 54.08936764412527|
|   LSE|          154|             83| 53.89610389610390|
|   ORD|        64228|          33812| 52.64370679454444|
|   ISP|         1370|            720| 52.55474452554745|
|   STL|        12142|           6250| 51.47422170976775|
+------+-------------+---------------+------------------+



In [None]:
# Example 8: Average delay by hour of day
delay_by_hour_mithil = spark.sql("""
    SELECT
    CAST(SUBSTRING(CAST(date AS STRING), 10, 2) AS INT) as hour,
    AVG(delay) as avg_delay
    FROM flights
    GROUP BY SUBSTRING(CAST(date AS STRING), 10, 2)
    ORDER BY hour
""")


In [None]:
delay_by_hour_mithil.show()

+----+------------------+
|hour|         avg_delay|
+----+------------------+
|NULL|12.079802928761449|
+----+------------------+



In [None]:
# Stop the SparkSession
spark.stop()
