In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Create Spark configuration
conf = SparkConf().setAppName("Airline Delay Analysis").setMaster("local[*]")

# Initialize Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [0]:
# Load the dataset
df = spark.read.csv('/FileStore/tables/Flights_Delay.csv', header=True, inferSchema=True)

# Create a temporary view for SQL queries
df.createOrReplaceTempView("flights")

In [0]:
# Save the DataFrame as a Hive table
df.write.mode("overwrite").saveAsTable("flights_table")

In [0]:
# Describe the table schema
df.printSchema()

# Show top 10 rows
df.show(10)

root
 |-- ID: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true

In [0]:
df = df.repartition(10, "AIRLINE")

In [0]:
df.write.mode("overwrite").parquet("/mnt/data/Flights_Delay_Parquet")

In [0]:
df.cache()

Out[7]: DataFrame[ID: int, YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

In [0]:
filtered_df = df.filter(df.ARRIVAL_DELAY > 0)

In [0]:
avg_arrival_delay = spark.sql("""
SELECT AIRLINE, AVG(ARRIVAL_DELAY) as avg_delay
FROM flights
GROUP BY AIRLINE
ORDER BY avg_delay DESC
""")
avg_arrival_delay.show()

+-------+------------------+
|AIRLINE|         avg_delay|
+-------+------------------+
|     F9|24.103448275862068|
|     MQ|19.231592604605904|
|     NK|14.206426484907498|
|     B6| 13.95852534562212|
|     EV|10.884270870655678|
|     OO|10.154792043399638|
|     AA| 8.386631979187513|
|     UA| 6.697221614526362|
|     US| 5.977315185481719|
|     VX| 5.128571428571429|
|     HA| 4.072423398328691|
|     WN| 3.697840458351697|
|     DL|2.8144726712856043|
|     AS|-1.531766200762389|
+-------+------------------+

