In [0]:
# Import Libraries
import pyspark.sql.functions as F
from pyspark.sql import Window

# Loading Data
delays = spark.read.table("hive_metastore.default.tilinajay2")
delays

DataFrame[FL_DATE: date, OP_CARRIER: string, OP_CARRIER_FL_NUM: string, ORIGIN: string, DEST: string, DEP_DELAY: int, TAXI_OUT: int, TAXI_IN: int, ARR_DELAY: int, CANCELLED: boolean, DIVERTED: boolean, CRS_ELAPSED_TIME: int, ACTUAL_ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, CARRIER_DELAY: int, WEATHER_DELAY: int, NAS_DELAY: int, SECURITY_DELAY: int, LATE_AIRCRAFT_DELAY: int, CRS_DEP_HOUR: int, CRS_DEP_MIN: int, DEP_HOUR: int, DEP_MIN: int, WHEELS_OFF_HOUR: int, WHEELS_OFF_MIN: int, WHEELS_ON_HOUR: int, WHEELS_ON_MIN: int, CRS_ARR_HOUR: int, CRS_ARR_MIN: int, ARR_HOUR: int, ARR_MIN: int]

**Adding the day of the week (Monday through Sunday)**

In [0]:
# Add a new column "DAY_OF_WEEK" to the 'delays' DataFrame
# The new column is created by formatting the "FL_DATE" column to represent the day of the week
delays = delays \
    .withColumn("DAY_OF_WEEK", F.date_format("FL_DATE", "EEEE"))

delays.select("FL_DATE", "DAY_OF_WEEK").show()

+----------+-----------+
|   FL_DATE|DAY_OF_WEEK|
+----------+-----------+
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
|2009-01-01|   Thursday|
+----------+-----------+
only showing top 20 rows



**Rate of weather delays at the departure airport in the previous hour**

In [0]:

# Create a new column "DEP_TIME" in the 'delays' DataFrame
# This column is calculated based on the sum of "DEP_HOUR" multiplied by 60 and "DEP_MIN"
delays = delays.withColumn("DEP_TIME", F.col("DEP_HOUR") * 60 + F.col("DEP_MIN"))

# Define a window specification for aggregating weather delay information over a specific time range
window_spec = Window \
    .partitionBy("ORIGIN", "FL_DATE") \
    .orderBy("DEP_TIME") \
    .rangeBetween(-60, -1)

# Create a new DataFrame 'delays_departing' with additional columns for weather delay analysis
delays_departing = delays \
    .withColumn("WEATHER_DELAY_BOOL", F.when(F.col("WEATHER_DELAY") >= 15, 1).otherwise(0)) \
    .withColumn("WEATHER_DELAY_SUM", F.when(F.isnull(F.sum("WEATHER_DELAY_BOOL").over(window_spec)), 0).otherwise(F.sum("WEATHER_DELAY_BOOL").over(window_spec))) \
    .withColumn("TOTAL_FLIGHTS", F.count("*").over(window_spec)) \
    .withColumn("RATE", (F.col("WEATHER_DELAY_SUM") / F.col("TOTAL_FLIGHTS"))) \
    .withColumn("RATE", F.when(F.isnull(F.col("RATE")), 0).otherwise(F.col("RATE"))) \
    .drop_duplicates(["ORIGIN", "FL_DATE", "DEP_HOUR", "DEP_MIN"])

# Select and display specific columns from the 'delays_departing' DataFrame
delays_departing.select("ORIGIN", "FL_DATE", "DEP_HOUR", "DEP_MIN", "WEATHER_DELAY_SUM", "TOTAL_FLIGHTS", "RATE")
delays_departing.show()

+------+----------+--------+-------+-----------------+-------------+----+
|ORIGIN|   FL_DATE|DEP_HOUR|DEP_MIN|WEATHER_DELAY_SUM|TOTAL_FLIGHTS|RATE|
+------+----------+--------+-------+-----------------+-------------+----+
|   ABE|2009-01-24|      12|     10|                0|            0| 0.0|
|   ABE|2009-01-24|      12|     30|                0|            1| 0.0|
|   ABE|2009-01-24|      16|      5|                0|            0| 0.0|
|   ABE|2009-01-24|      16|     53|                0|            1| 0.0|
|   ABE|2009-01-24|      17|     29|                0|            1| 0.0|
|   ABE|2009-01-25|      12|     12|                0|            0| 0.0|
|   ABE|2009-01-25|      12|     33|                0|            1| 0.0|
|   ABE|2009-01-25|      16|      6|                0|            0| 0.0|
|   ABE|2009-01-25|      17|      3|                0|            1| 0.0|
|   ABE|2009-01-25|      17|     25|                0|            1| 0.0|
|   ABE|2009-01-25|      17|     43|  

**Rate of weather delays from the arrival airport in the previous hour**

In [0]:
# Create a new column "ARR_TIME" in the 'delays' DataFrame
# This column is calculated based on the sum of "ARR_HOUR" multiplied by 60 and "ARR_MIN"
delays = delays.withColumn("ARR_TIME", F.col("ARR_HOUR") * 60 + F.col("ARR_MIN"))

# Define a window specification for aggregating weather delay information over a specific time range
window_spec = Window \
    .partitionBy("ORIGIN", "FL_DATE") \
    .orderBy("ARR_TIME") \
    .rangeBetween(-60, -1)

# Create a new DataFrame 'delays_arriving' with additional columns for weather delay analysis
delays_arriving = delays \
    .withColumn("WEATHER_DELAY_BOOL", F.when(F.col("WEATHER_DELAY") >= 15, 1).otherwise(0)) \
    .withColumn("WEATHER_DELAY_SUM", F.when(F.isnull(F.sum("WEATHER_DELAY_BOOL").over(window_spec)), 0).otherwise(F.sum("WEATHER_DELAY_BOOL").over(window_spec))) \
    .withColumn("TOTAL_FLIGHTS", F.count("*").over(window_spec)) \
    .withColumn("RATE", (F.col("WEATHER_DELAY_SUM") / F.col("TOTAL_FLIGHTS"))) \
    .withColumn("RATE", F.when(F.isnull(F.col("RATE")), 0).otherwise(F.col("RATE"))) \
    .drop_duplicates(["ORIGIN", "FL_DATE", "ARR_HOUR", "ARR_MIN"])

# Select and display specific columns from the 'delays_arriving' DataFrame
delays_arriving.select("ORIGIN", "FL_DATE", "ARR_HOUR", "ARR_MIN", "WEATHER_DELAY_SUM", "TOTAL_FLIGHTS", "RATE")
delays_arriving.show()

+------+----------+--------+-------+-----------------+-------------+----+
|ORIGIN|   FL_DATE|ARR_HOUR|ARR_MIN|WEATHER_DELAY_SUM|TOTAL_FLIGHTS|RATE|
+------+----------+--------+-------+-----------------+-------------+----+
|   ABE|2009-01-24|      13|     49|                0|            0| 0.0|
|   ABE|2009-01-24|      14|      1|                0|            1| 0.0|
|   ABE|2009-01-24|      17|     47|                0|            0| 0.0|
|   ABE|2009-01-24|      18|     46|                0|            1| 0.0|
|   ABE|2009-01-24|      19|     24|                0|            1| 0.0|
|   ABE|2009-01-25|      14|      8|                0|            0| 0.0|
|   ABE|2009-01-25|      14|     12|                0|            1| 0.0|
|   ABE|2009-01-25|      17|     45|                0|            0| 0.0|
|   ABE|2009-01-25|      18|     44|                0|            1| 0.0|
|   ABE|2009-01-25|      19|      2|                0|            1| 0.0|
|   ABE|2009-01-25|      19|     16|  

**Number of flights departing from the departure airport in the previous hour, compare the average number during this hour on the same day of the week, as a z score**

In [0]:
# Define a window specification for aggregating flight count over a specific time range
window_spec = Window \
    .partitionBy("ORIGIN", "FL_DATE") \
    .orderBy("DEP_TIME") \
    .rangeBetween(-60, -1)

# Define another window specification for aggregating flight count over a specific time range and by day of the week
window_spec_day_of_week = Window \
    .partitionBy("ORIGIN", "DAY_OF_WEEK") \
    .orderBy("DEP_TIME") \
    .rangeBetween(-60, -1)

# Create a new DataFrame 'delays_with_flight_count' with an additional column for flight count in the previous hour
delays_with_flight_count = delays \
    .withColumn("FLIGHT_COUNT_PREV_HR", F.count("*").over(window_spec)) \
    .withColumn("FLIGHT_COUNT_PREV_HR", F.when(F.isnull("FLIGHT_COUNT_PREV_HR"), 0).otherwise(F.col("FLIGHT_COUNT_PREV_HR")))

# Calculate the average flight count in the previous hour by day of the week
avg_flight_count_by_day_of_week = delays_with_flight_count \
    .withColumn("AVG_FLIGHT_COUNT_PREV_HR_DAY_OF_WEEK", F.avg("FLIGHT_COUNT_PREV_HR").over(window_spec_day_of_week)) \
    .withColumn("AVG_FLIGHT_COUNT_PREV_HR_DAY_OF_WEEK", F.when(F.isnull("AVG_FLIGHT_COUNT_PREV_HR_DAY_OF_WEEK"), 0).otherwise(F.col("AVG_FLIGHT_COUNT_PREV_HR_DAY_OF_WEEK")))

# Calculate the standard deviation of flight count in the previous hour by day of the week
sd_flight_count_by_day_of_week = avg_flight_count_by_day_of_week \
    .withColumn("sd", F.stddev("FLIGHT_COUNT_PREV_HR").over(window_spec_day_of_week)) \
    .fillna(0)

# Calculate the Z-score for flight count in the previous hour by day of the week
delays_zscore = sd_flight_count_by_day_of_week \
    .withColumn("z_score", (F.col("FLIGHT_COUNT_PREV_HR") - F.col("AVG_FLIGHT_COUNT_PREV_HR_DAY_OF_WEEK")) / F.col("sd")) \
    .fillna(0)

# Select and display specific columns from the 'delays_zscore' DataFrame
delays_zscore.select("ORIGIN", "FL_DATE", "DEP_HOUR", "DEP_MIN", "DAY_OF_WEEK", "z_score").show()

+------+----------+--------+-------+-----------+-------+
|ORIGIN|   FL_DATE|DEP_HOUR|DEP_MIN|DAY_OF_WEEK|z_score|
+------+----------+--------+-------+-----------+-------+
|   ABI|2009-03-05|      10|      1|   Thursday|    0.0|
|   ABI|2015-12-24|      10|      3|   Thursday|    0.0|
|   ABI|2015-08-06|      10|      3|   Thursday|    0.0|
|   ABI|2015-04-09|      10|      3|   Thursday|    0.0|
|   ABI|2009-02-12|      10|      4|   Thursday|    0.0|
|   ABI|2015-10-08|      10|      4|   Thursday|    0.0|
|   ABI|2015-11-12|      10|      4|   Thursday|    0.0|
|   ABI|2015-05-28|      10|      5|   Thursday|    0.0|
|   ABI|2009-02-26|      10|      5|   Thursday|    0.0|
|   ABI|2015-09-17|      10|      5|   Thursday|    0.0|
|   ABI|2015-07-02|      10|      6|   Thursday|    0.0|
|   ABI|2015-11-05|      10|      6|   Thursday|    0.0|
|   ABI|2015-09-10|      10|      6|   Thursday|    0.0|
|   ABI|2015-04-02|      10|      6|   Thursday|    0.0|
|   ABI|2009-03-12|      10|   

**Calculating the average speed of the flight by dividing the distance of the flight by the elapsed time**

In [0]:
# Calculate the average speed by creating a new column "AVERAGE_SPEED"
# This is computed by dividing the "DISTANCE" column by the "ACTUAL_ELAPSED_TIME" column
delays = delays \
    .withColumn("AVERAGE_SPEED", F.col("DISTANCE") / F.col("ACTUAL_ELAPSED_TIME"))

# Select specific columns including "FL_DATE", "ORIGIN", "AVERAGE_SPEED", "DISTANCE", and "ACTUAL_ELAPSED_TIME"
delays.select("FL_DATE", "ORIGIN", "AVERAGE_SPEED", "DISTANCE", "ACTUAL_ELAPSED_TIME").show()

+----------+------+------------------+--------+-------------------+
|   FL_DATE|ORIGIN|     AVERAGE_SPEED|DISTANCE|ACTUAL_ELAPSED_TIME|
+----------+------+------------------+--------+-------------------+
|2009-01-01|   DCA| 2.926470588235294|     199|                 68|
|2009-01-01|   EWR|              2.84|     213|                 75|
|2009-01-01|   EWR|3.2096774193548385|     199|                 62|
|2009-01-01|   DCA|3.5535714285714284|     199|                 56|
|2009-01-01|   IAD|2.7662337662337664|     213|                 77|
|2009-01-01|   ATL| 5.730769230769231|     745|                130|
|2009-01-01|   CLE|  5.12962962962963|     554|                108|
|2009-01-01|   DCA| 2.518987341772152|     199|                 79|
|2009-01-01|   EWR|2.7260273972602738|     199|                 73|
|2009-01-01|   EWR| 3.262295081967213|     199|                 61|
|2009-01-01|   DCA|3.2096774193548385|     199|                 62|
|2009-01-01|   EWR|          3.109375|     199| 

**Number of courrier flights delayed by the seasons (so we know how the courrier respond to the temperature by seasons)**

In [0]:
# Create a new column "MONTH" in the 'delays' DataFrame, extracting the month from "FL_DATE"
# Create another column "SZN" indicating the season based on the month
delays = delays.withColumn("MONTH", F.month("FL_DATE")) \
    .withColumn("SZN", 
                F.when((F.col("MONTH") == 12) | (F.col("MONTH") == 1) | (F.col("MONTH") == 2), "winter")
                .when((F.col("MONTH") == 3) | (F.col("MONTH") == 4) | (F.col("MONTH") == 5), "spring")
                .when((F.col("MONTH") == 6) | (F.col("MONTH") == 7) | (F.col("MONTH") == 8), "summer")
                .otherwise("fall"))

# Filter the 'delays' DataFrame to include only flights with carrier delays of 15 minutes or more
delayed_flights = delays.filter(F.col("CARRIER_DELAY") >= 15)

# Group the delayed flights by season and carrier, and calculate the total count of delayed flights
delayed_flights_count = delayed_flights.groupby("SZN", "OP_CARRIER") \
    .agg(F.count("*").alias("total_carrier_delayed_flights")) \
    .orderBy("SZN", "OP_CARRIER")

# Display the resulting DataFrame showing the total count of carrier delayed flights for each season and carrier
delayed_flights_count.show()

+----+----------+-----------------------------+
| SZN|OP_CARRIER|total_carrier_delayed_flights|
+----+----------+-----------------------------+
|fall|        9E|                         6838|
|fall|        AA|                        72513|
|fall|        AS|                         9051|
|fall|        B6|                        28244|
|fall|        CO|                         4995|
|fall|        DL|                        51890|
|fall|        EV|                        49283|
|fall|        F9|                         7509|
|fall|        FL|                         5718|
|fall|        G4|                          897|
|fall|        HA|                         5274|
|fall|        MQ|                        23909|
|fall|        NK|                         3101|
|fall|        NW|                         2022|
|fall|        OH|                         5631|
|fall|        OO|                        39981|
|fall|        UA|                        41572|
|fall|        US|                       