In [1]:
# Standard imports
import pandas as pd
import numpy as np


In [2]:
# Load CSV files
booking_df = pd.read_csv("customer_booking.csv", encoding="ISO-8859-1")
reviews_df = pd.read_csv("airlines_reviews.csv", encoding="ISO-8859-1")
flights_df = pd.read_csv("flights_sample_3m.csv")


In [3]:
# Shape and sample rows
print("📦 Booking Dataset:", booking_df.shape)
display(booking_df.head())

print("🧑 Reviews Dataset:", reviews_df.shape)
display(reviews_df.head())

print("✈️ Flights Dataset:", flights_df.shape)
display(flights_df.head())


📦 Booking Dataset: (50000, 14)


Unnamed: 0,num_passengers,sales_channel,trip_type,purchase_lead,length_of_stay,flight_hour,flight_day,route,booking_origin,wants_extra_baggage,wants_preferred_seat,wants_in_flight_meals,flight_duration,booking_complete
0,2,Internet,RoundTrip,262,19,7,Sat,AKLDEL,New Zealand,1,0,0,5.52,0
1,1,Internet,RoundTrip,112,20,3,Sat,AKLDEL,New Zealand,0,0,0,5.52,0
2,2,Internet,RoundTrip,243,22,17,Wed,AKLDEL,India,1,1,0,5.52,0
3,1,Internet,RoundTrip,96,31,4,Sat,AKLDEL,New Zealand,0,0,1,5.52,0
4,2,Internet,RoundTrip,68,22,15,Wed,AKLDEL,India,1,0,1,5.52,0


🧑 Reviews Dataset: (8100, 17)


Unnamed: 0,Title,Name,Review Date,Airline,Verified,Reviews,Type of Traveller,Month Flown,Route,Class,Seat Comfort,Staff Service,Food & Beverages,Inflight Entertainment,Value For Money,Overall Rating,Recommended
0,Flight was amazing,Alison Soetantyo,2024-03-01,Singapore Airlines,True,Flight was amazing. The crew onboard this fl...,Solo Leisure,December 2023,Jakarta to Singapore,Business Class,4,4,4,4,4,9,yes
1,seats on this aircraft are dreadful,Robert Watson,2024-02-21,Singapore Airlines,True,Â Â Booking an emergency exit seat still meant...,Solo Leisure,February 2024,Phuket to Singapore,Economy Class,5,3,4,4,1,3,no
2,Food was plentiful and tasty,S Han,2024-02-20,Singapore Airlines,True,Excellent performance on all fronts. I would...,Family Leisure,February 2024,Siem Reap to Singapore,Economy Class,1,5,2,1,5,10,yes
3,âhow much food was available,D Laynes,2024-02-19,Singapore Airlines,True,Pretty comfortable flight considering I was f...,Solo Leisure,February 2024,Singapore to London Heathrow,Economy Class,5,5,5,5,5,10,yes
4,âservice was consistently goodâ,A Othman,2024-02-19,Singapore Airlines,True,The service was consistently good from start ...,Family Leisure,February 2024,Singapore to Phnom Penh,Economy Class,5,5,5,5,5,10,yes


✈️ Flights Dataset: (1044886, 32)


Unnamed: 0,FL_DATE,AIRLINE,AIRLINE_DOT,AIRLINE_CODE,DOT_CODE,FL_NUMBER,ORIGIN,ORIGIN_CITY,DEST,DEST_CITY,...,DIVERTED,CRS_ELAPSED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,DELAY_DUE_CARRIER,DELAY_DUE_WEATHER,DELAY_DUE_NAS,DELAY_DUE_SECURITY,DELAY_DUE_LATE_AIRCRAFT
0,2019-01-09,United Air Lines Inc.,United Air Lines Inc.: UA,UA,19977,1562,FLL,"Fort Lauderdale, FL",EWR,"Newark, NJ",...,0.0,186.0,176.0,153.0,1065.0,,,,,
1,2022-11-19,Delta Air Lines Inc.,Delta Air Lines Inc.: DL,DL,19790,1149,MSP,"Minneapolis, MN",SEA,"Seattle, WA",...,0.0,235.0,236.0,189.0,1399.0,,,,,
2,2022-07-22,United Air Lines Inc.,United Air Lines Inc.: UA,UA,19977,459,DEN,"Denver, CO",MSP,"Minneapolis, MN",...,0.0,118.0,112.0,87.0,680.0,,,,,
3,2023-03-06,Delta Air Lines Inc.,Delta Air Lines Inc.: DL,DL,19790,2295,MSP,"Minneapolis, MN",SFO,"San Francisco, CA",...,0.0,260.0,285.0,249.0,1589.0,0.0,0.0,24.0,0.0,0.0
4,2020-02-23,Spirit Air Lines,Spirit Air Lines: NK,NK,20416,407,MCO,"Orlando, FL",DFW,"Dallas/Fort Worth, TX",...,0.0,181.0,182.0,153.0,985.0,,,,,


In [4]:
# Null values
print("🔍 NULLS in Booking:")
print(booking_df.isnull().sum())

print("\n🔍 NULLS in Reviews:")
print(reviews_df.isnull().sum())

print("\n🔍 NULLS in Flights:")
print(flights_df.isnull().sum())

# Duplicates
print("\n🧾 DUPLICATES:")
print("Bookings:", booking_df.duplicated().sum())
print("Reviews:", reviews_df.duplicated().sum())
print("Flights:", flights_df.duplicated().sum())


🔍 NULLS in Booking:
num_passengers           0
sales_channel            0
trip_type                0
purchase_lead            0
length_of_stay           0
flight_hour              0
flight_day               0
route                    0
booking_origin           0
wants_extra_baggage      0
wants_preferred_seat     0
wants_in_flight_meals    0
flight_duration          0
booking_complete         0
dtype: int64

🔍 NULLS in Reviews:
Title                     0
Name                      0
Review Date               0
Airline                   0
Verified                  0
Reviews                   0
Type of Traveller         0
Month Flown               0
Route                     0
Class                     0
Seat Comfort              0
Staff Service             0
Food & Beverages          0
Inflight Entertainment    0
Value For Money           0
Overall Rating            0
Recommended               0
dtype: int64

🔍 NULLS in Flights:
FL_DATE                          0
AIRLINE                

In [5]:
# Check data types
print("📘 Booking Types:")
print(booking_df.dtypes)

print("\n📘 Reviews Types:")
print(reviews_df.dtypes)

print("\n📘 Flights Types:")
print(flights_df.dtypes)

# Remove duplicates
booking_df.drop_duplicates(inplace=True)
reviews_df.drop_duplicates(inplace=True)
flights_df.drop_duplicates(inplace=True)


📘 Booking Types:
num_passengers             int64
sales_channel             object
trip_type                 object
purchase_lead              int64
length_of_stay             int64
flight_hour                int64
flight_day                object
route                     object
booking_origin            object
wants_extra_baggage        int64
wants_preferred_seat       int64
wants_in_flight_meals      int64
flight_duration          float64
booking_complete           int64
dtype: object

📘 Reviews Types:
Title                     object
Name                      object
Review Date               object
Airline                   object
Verified                  object
Reviews                   object
Type of Traveller         object
Month Flown               object
Route                     object
Class                     object
Seat Comfort               int64
Staff Service              int64
Food & Beverages           int64
Inflight Entertainment     int64
Value For Money            

In [6]:
# Install Java and Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark


In [7]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"


In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FlightDataQuality").getOrCreate()


In [9]:
booking_sdf = spark.read.csv("customer_booking.csv", header=True, inferSchema=True)
reviews_sdf = spark.read.csv("airlines_reviews.csv", header=True, inferSchema=True)
flights_sdf = spark.read.csv("flights_sample_3m.csv", header=True, inferSchema=True)


In [10]:
booking_sdf.printSchema()
flights_sdf.printSchema()
reviews_sdf.printSchema()


root
 |-- num_passengers: integer (nullable = true)
 |-- sales_channel: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- purchase_lead: integer (nullable = true)
 |-- length_of_stay: integer (nullable = true)
 |-- flight_hour: integer (nullable = true)
 |-- flight_day: string (nullable = true)
 |-- route: string (nullable = true)
 |-- booking_origin: string (nullable = true)
 |-- wants_extra_baggage: integer (nullable = true)
 |-- wants_preferred_seat: integer (nullable = true)
 |-- wants_in_flight_meals: integer (nullable = true)
 |-- flight_duration: double (nullable = true)
 |-- booking_complete: integer (nullable = true)

root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST:

In [11]:
booking_sdf.createOrReplaceTempView("bookings")
flights_sdf.createOrReplaceTempView("flights")
reviews_sdf.createOrReplaceTempView("reviews")


In [13]:
booking_sdf.columns, flights_sdf.columns


(['num_passengers',
  'sales_channel',
  'trip_type',
  'purchase_lead',
  'length_of_stay',
  'flight_hour',
  'flight_day',
  'route',
  'booking_origin',
  'wants_extra_baggage',
  'wants_preferred_seat',
  'wants_in_flight_meals',
  'flight_duration',
  'booking_complete'],
 ['FL_DATE',
  'AIRLINE',
  'AIRLINE_DOT',
  'AIRLINE_CODE',
  'DOT_CODE',
  'FL_NUMBER',
  'ORIGIN',
  'ORIGIN_CITY',
  'DEST',
  'DEST_CITY',
  'CRS_DEP_TIME',
  'DEP_TIME',
  'DEP_DELAY',
  'TAXI_OUT',
  'WHEELS_OFF',
  'WHEELS_ON',
  'TAXI_IN',
  'CRS_ARR_TIME',
  'ARR_TIME',
  'ARR_DELAY',
  'CANCELLED',
  'CANCELLATION_CODE',
  'DIVERTED',
  'CRS_ELAPSED_TIME',
  'ELAPSED_TIME',
  'AIR_TIME',
  'DISTANCE',
  'DELAY_DUE_CARRIER',
  'DELAY_DUE_WEATHER',
  'DELAY_DUE_NAS',
  'DELAY_DUE_SECURITY',
  'DELAY_DUE_LATE_AIRCRAFT'])

In [14]:
from pyspark.sql.functions import split

# Split the 'route' into 'origin' and 'destination'
booking_sdf = booking_sdf.withColumn("origin", split(booking_sdf["route"], " to ").getItem(0))
booking_sdf = booking_sdf.withColumn("destination", split(booking_sdf["route"], " to ").getItem(1))

# Re-register the updated view
booking_sdf.createOrReplaceTempView("bookings")


In [15]:
occupancy_query = """
SELECT
    f.origin, f.destination,
    COUNT(*) AS total_bookings,
    SUM(b.num_passengers) AS passengers_booked,
    AVG(f.seats_available) AS avg_seats,
    ROUND(SUM(b.num_passengers)/AVG(f.seats_available), 2) AS occupancy_rate
FROM bookings b
JOIN flights f ON b.origin = f.origin AND b.destination = f.destination
GROUP BY f.origin, f.destination
ORDER BY occupancy_rate DESC
"""

occupancy_df = spark.sql(occupancy_query)
occupancy_df.show(10)


AnalysisException: cannot resolve '`f.destination`' given input columns: [f.AIRLINE, f.AIRLINE_CODE, f.AIRLINE_DOT, f.AIR_TIME, f.ARR_DELAY, f.ARR_TIME, f.CANCELLATION_CODE, f.CANCELLED, f.CRS_ARR_TIME, f.CRS_DEP_TIME, f.CRS_ELAPSED_TIME, f.DELAY_DUE_CARRIER, f.DELAY_DUE_LATE_AIRCRAFT, f.DELAY_DUE_NAS, f.DELAY_DUE_SECURITY, f.DELAY_DUE_WEATHER, f.DEP_DELAY, f.DEP_TIME, f.DEST, f.DEST_CITY, f.DISTANCE, f.DIVERTED, f.DOT_CODE, f.ELAPSED_TIME, f.FL_DATE, f.FL_NUMBER, f.ORIGIN, f.ORIGIN_CITY, f.TAXI_IN, f.TAXI_OUT, f.WHEELS_OFF, f.WHEELS_ON, b.booking_complete, b.booking_origin, b.destination, b.flight_day, b.flight_duration, b.flight_hour, b.length_of_stay, b.num_passengers, b.origin, b.purchase_lead, b.route, b.sales_channel, b.trip_type, b.wants_extra_baggage, b.wants_in_flight_meals, b.wants_preferred_seat]; line 9 pos 58;
'Sort ['occupancy_rate DESC NULLS LAST], true
+- 'Aggregate ['f.origin, 'f.destination], ['f.origin, 'f.destination, count(1) AS total_bookings#225L, 'SUM('b.num_passengers) AS passengers_booked#226, 'AVG('f.seats_available) AS avg_seats#227, 'ROUND(('SUM('b.num_passengers) / 'AVG('f.seats_available)), 2) AS occupancy_rate#228]
   +- 'Join Inner, ((origin#178 = origin#116) AND (destination#194 = 'f.destination))
      :- SubqueryAlias b
      :  +- SubqueryAlias bookings
      :     +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, origin#178, split(route#23,  to , -1)[1] AS destination#194]
      :        +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, split(route#23,  to , -1)[0] AS origin#178]
      :           +- Relation[num_passengers#16,sales_channel#17,trip_type#18,purchase_lead#19,length_of_stay#20,flight_hour#21,flight_day#22,route#23,booking_origin#24,wants_extra_baggage#25,wants_preferred_seat#26,wants_in_flight_meals#27,flight_duration#28,booking_complete#29] csv
      +- SubqueryAlias f
         +- SubqueryAlias flights
            +- Relation[FL_DATE#110,AIRLINE#111,AIRLINE_DOT#112,AIRLINE_CODE#113,DOT_CODE#114,FL_NUMBER#115,ORIGIN#116,ORIGIN_CITY#117,DEST#118,DEST_CITY#119,CRS_DEP_TIME#120,DEP_TIME#121,DEP_DELAY#122,TAXI_OUT#123,WHEELS_OFF#124,WHEELS_ON#125,TAXI_IN#126,CRS_ARR_TIME#127,ARR_TIME#128,ARR_DELAY#129,CANCELLED#130,CANCELLATION_CODE#131,DIVERTED#132,CRS_ELAPSED_TIME#133,... 8 more fields] csv


In [16]:
print("✅ Booking Columns:", booking_sdf.columns)
print("✅ Flights Columns:", flights_sdf.columns)


✅ Booking Columns: ['num_passengers', 'sales_channel', 'trip_type', 'purchase_lead', 'length_of_stay', 'flight_hour', 'flight_day', 'route', 'booking_origin', 'wants_extra_baggage', 'wants_preferred_seat', 'wants_in_flight_meals', 'flight_duration', 'booking_complete', 'origin', 'destination']
✅ Flights Columns: ['FL_DATE', 'AIRLINE', 'AIRLINE_DOT', 'AIRLINE_CODE', 'DOT_CODE', 'FL_NUMBER', 'ORIGIN', 'ORIGIN_CITY', 'DEST', 'DEST_CITY', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'DELAY_DUE_CARRIER', 'DELAY_DUE_WEATHER', 'DELAY_DUE_NAS', 'DELAY_DUE_SECURITY', 'DELAY_DUE_LATE_AIRCRAFT']


In [17]:
# Rename columns if needed
flights_sdf = flights_sdf.withColumnRenamed("Source", "origin").withColumnRenamed("Destination", "destination")

# Register again
flights_sdf.createOrReplaceTempView("flights")


In [18]:
occupancy_query = """
SELECT
    f.origin, f.destination,
    COUNT(*) AS total_bookings,
    SUM(b.num_passengers) AS passengers_booked,
    AVG(f.seats_available) AS avg_seats,
    ROUND(SUM(b.num_passengers)/AVG(f.seats_available), 2) AS occupancy_rate
FROM bookings b
JOIN flights f ON b.origin = f.origin AND b.destination = f.destination
GROUP BY f.origin, f.destination
ORDER BY occupancy_rate DESC
"""

occupancy_df = spark.sql(occupancy_query)
occupancy_df.show(10)


AnalysisException: cannot resolve '`f.destination`' given input columns: [f.AIRLINE, f.AIRLINE_CODE, f.AIRLINE_DOT, f.AIR_TIME, f.ARR_DELAY, f.ARR_TIME, f.CANCELLATION_CODE, f.CANCELLED, f.CRS_ARR_TIME, f.CRS_DEP_TIME, f.CRS_ELAPSED_TIME, f.DELAY_DUE_CARRIER, f.DELAY_DUE_LATE_AIRCRAFT, f.DELAY_DUE_NAS, f.DELAY_DUE_SECURITY, f.DELAY_DUE_WEATHER, f.DEP_DELAY, f.DEP_TIME, f.DEST, f.DEST_CITY, f.DISTANCE, f.DIVERTED, f.DOT_CODE, f.ELAPSED_TIME, f.FL_DATE, f.FL_NUMBER, f.ORIGIN, f.ORIGIN_CITY, f.TAXI_IN, f.TAXI_OUT, f.WHEELS_OFF, f.WHEELS_ON, b.booking_complete, b.booking_origin, b.destination, b.flight_day, b.flight_duration, b.flight_hour, b.length_of_stay, b.num_passengers, b.origin, b.purchase_lead, b.route, b.sales_channel, b.trip_type, b.wants_extra_baggage, b.wants_in_flight_meals, b.wants_preferred_seat]; line 9 pos 58;
'Sort ['occupancy_rate DESC NULLS LAST], true
+- 'Aggregate ['f.origin, 'f.destination], ['f.origin, 'f.destination, count(1) AS total_bookings#294L, 'SUM('b.num_passengers) AS passengers_booked#295, 'AVG('f.seats_available) AS avg_seats#296, 'ROUND(('SUM('b.num_passengers) / 'AVG('f.seats_available)), 2) AS occupancy_rate#297]
   +- 'Join Inner, ((origin#178 = origin#116) AND (destination#194 = 'f.destination))
      :- SubqueryAlias b
      :  +- SubqueryAlias bookings
      :     +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, origin#178, split(route#23,  to , -1)[1] AS destination#194]
      :        +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, split(route#23,  to , -1)[0] AS origin#178]
      :           +- Relation[num_passengers#16,sales_channel#17,trip_type#18,purchase_lead#19,length_of_stay#20,flight_hour#21,flight_day#22,route#23,booking_origin#24,wants_extra_baggage#25,wants_preferred_seat#26,wants_in_flight_meals#27,flight_duration#28,booking_complete#29] csv
      +- SubqueryAlias f
         +- SubqueryAlias flights
            +- Relation[FL_DATE#110,AIRLINE#111,AIRLINE_DOT#112,AIRLINE_CODE#113,DOT_CODE#114,FL_NUMBER#115,ORIGIN#116,ORIGIN_CITY#117,DEST#118,DEST_CITY#119,CRS_DEP_TIME#120,DEP_TIME#121,DEP_DELAY#122,TAXI_OUT#123,WHEELS_OFF#124,WHEELS_ON#125,TAXI_IN#126,CRS_ARR_TIME#127,ARR_TIME#128,ARR_DELAY#129,CANCELLED#130,CANCELLATION_CODE#131,DIVERTED#132,CRS_ELAPSED_TIME#133,... 8 more fields] csv


In [19]:
flights_sdf.columns


['FL_DATE',
 'AIRLINE',
 'AIRLINE_DOT',
 'AIRLINE_CODE',
 'DOT_CODE',
 'FL_NUMBER',
 'ORIGIN',
 'ORIGIN_CITY',
 'DEST',
 'DEST_CITY',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'DELAY_DUE_CARRIER',
 'DELAY_DUE_WEATHER',
 'DELAY_DUE_NAS',
 'DELAY_DUE_SECURITY',
 'DELAY_DUE_LATE_AIRCRAFT']

In [20]:
from pyspark.sql.functions import rand, round

# Simulate available seats between 100 and 300
flights_sdf = flights_sdf.withColumn("seats_available", round(rand() * 200 + 100))

# Re-register the table
flights_sdf.createOrReplaceTempView("flights")


In [21]:
occupancy_df = spark.sql(occupancy_query)
occupancy_df.show(10)


AnalysisException: cannot resolve '`f.destination`' given input columns: [f.AIRLINE, f.AIRLINE_CODE, f.AIRLINE_DOT, f.AIR_TIME, f.ARR_DELAY, f.ARR_TIME, f.CANCELLATION_CODE, f.CANCELLED, f.CRS_ARR_TIME, f.CRS_DEP_TIME, f.CRS_ELAPSED_TIME, f.DELAY_DUE_CARRIER, f.DELAY_DUE_LATE_AIRCRAFT, f.DELAY_DUE_NAS, f.DELAY_DUE_SECURITY, f.DELAY_DUE_WEATHER, f.DEP_DELAY, f.DEP_TIME, f.DEST, f.DEST_CITY, f.DISTANCE, f.DIVERTED, f.DOT_CODE, f.ELAPSED_TIME, f.FL_DATE, f.FL_NUMBER, f.ORIGIN, f.ORIGIN_CITY, f.TAXI_IN, f.TAXI_OUT, f.WHEELS_OFF, f.WHEELS_ON, b.booking_complete, b.booking_origin, b.destination, b.flight_day, b.flight_duration, b.flight_hour, b.length_of_stay, b.num_passengers, b.origin, b.purchase_lead, b.route, b.sales_channel, f.seats_available, b.trip_type, b.wants_extra_baggage, b.wants_in_flight_meals, b.wants_preferred_seat]; line 9 pos 58;
'Sort ['occupancy_rate DESC NULLS LAST], true
+- 'Aggregate ['f.origin, 'f.destination], ['f.origin, 'f.destination, count(1) AS total_bookings#365L, 'SUM('b.num_passengers) AS passengers_booked#366, 'AVG('f.seats_available) AS avg_seats#367, 'ROUND(('SUM('b.num_passengers) / 'AVG('f.seats_available)), 2) AS occupancy_rate#368]
   +- 'Join Inner, ((origin#178 = origin#116) AND (destination#194 = 'f.destination))
      :- SubqueryAlias b
      :  +- SubqueryAlias bookings
      :     +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, origin#178, split(route#23,  to , -1)[1] AS destination#194]
      :        +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, split(route#23,  to , -1)[0] AS origin#178]
      :           +- Relation[num_passengers#16,sales_channel#17,trip_type#18,purchase_lead#19,length_of_stay#20,flight_hour#21,flight_day#22,route#23,booking_origin#24,wants_extra_baggage#25,wants_preferred_seat#26,wants_in_flight_meals#27,flight_duration#28,booking_complete#29] csv
      +- SubqueryAlias f
         +- SubqueryAlias flights
            +- Project [FL_DATE#110, AIRLINE#111, AIRLINE_DOT#112, AIRLINE_CODE#113, DOT_CODE#114, FL_NUMBER#115, ORIGIN#116, ORIGIN_CITY#117, DEST#118, DEST_CITY#119, CRS_DEP_TIME#120, DEP_TIME#121, DEP_DELAY#122, TAXI_OUT#123, WHEELS_OFF#124, WHEELS_ON#125, TAXI_IN#126, CRS_ARR_TIME#127, ARR_TIME#128, ARR_DELAY#129, CANCELLED#130, CANCELLATION_CODE#131, DIVERTED#132, CRS_ELAPSED_TIME#133, ... 9 more fields]
               +- Relation[FL_DATE#110,AIRLINE#111,AIRLINE_DOT#112,AIRLINE_CODE#113,DOT_CODE#114,FL_NUMBER#115,ORIGIN#116,ORIGIN_CITY#117,DEST#118,DEST_CITY#119,CRS_DEP_TIME#120,DEP_TIME#121,DEP_DELAY#122,TAXI_OUT#123,WHEELS_OFF#124,WHEELS_ON#125,TAXI_IN#126,CRS_ARR_TIME#127,ARR_TIME#128,ARR_DELAY#129,CANCELLED#130,CANCELLATION_CODE#131,DIVERTED#132,CRS_ELAPSED_TIME#133,... 8 more fields] csv


In [22]:
print("✅ Booking Columns:\n", booking_sdf.columns)
print("\n✅ Flights Columns:\n", flights_sdf.columns)


✅ Booking Columns:
 ['num_passengers', 'sales_channel', 'trip_type', 'purchase_lead', 'length_of_stay', 'flight_hour', 'flight_day', 'route', 'booking_origin', 'wants_extra_baggage', 'wants_preferred_seat', 'wants_in_flight_meals', 'flight_duration', 'booking_complete', 'origin', 'destination']

✅ Flights Columns:
 ['FL_DATE', 'AIRLINE', 'AIRLINE_DOT', 'AIRLINE_CODE', 'DOT_CODE', 'FL_NUMBER', 'ORIGIN', 'ORIGIN_CITY', 'DEST', 'DEST_CITY', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'DELAY_DUE_CARRIER', 'DELAY_DUE_WEATHER', 'DELAY_DUE_NAS', 'DELAY_DUE_SECURITY', 'DELAY_DUE_LATE_AIRCRAFT', 'seats_available']


In [23]:
occupancy_df = spark.sql("""
SELECT
    f.origin,
    f.destination,
    COUNT(*) AS total_bookings
FROM bookings b
JOIN flights f
ON b.origin = f.origin AND b.destination = f.destination
GROUP BY f.origin, f.destination
""")

occupancy_df.show(10)


AnalysisException: cannot resolve '`f.destination`' given input columns: [f.AIRLINE, f.AIRLINE_CODE, f.AIRLINE_DOT, f.AIR_TIME, f.ARR_DELAY, f.ARR_TIME, f.CANCELLATION_CODE, f.CANCELLED, f.CRS_ARR_TIME, f.CRS_DEP_TIME, f.CRS_ELAPSED_TIME, f.DELAY_DUE_CARRIER, f.DELAY_DUE_LATE_AIRCRAFT, f.DELAY_DUE_NAS, f.DELAY_DUE_SECURITY, f.DELAY_DUE_WEATHER, f.DEP_DELAY, f.DEP_TIME, f.DEST, f.DEST_CITY, f.DISTANCE, f.DIVERTED, f.DOT_CODE, f.ELAPSED_TIME, f.FL_DATE, f.FL_NUMBER, f.ORIGIN, f.ORIGIN_CITY, f.TAXI_IN, f.TAXI_OUT, f.WHEELS_OFF, f.WHEELS_ON, b.booking_complete, b.booking_origin, b.destination, b.flight_day, b.flight_duration, b.flight_hour, b.length_of_stay, b.num_passengers, b.origin, b.purchase_lead, b.route, b.sales_channel, f.seats_available, b.trip_type, b.wants_extra_baggage, b.wants_in_flight_meals, b.wants_preferred_seat]; line 8 pos 43;
'Aggregate ['f.origin, 'f.destination], ['f.origin, 'f.destination, count(1) AS total_bookings#370L]
+- 'Join Inner, ((origin#178 = origin#116) AND (destination#194 = 'f.destination))
   :- SubqueryAlias b
   :  +- SubqueryAlias bookings
   :     +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, origin#178, split(route#23,  to , -1)[1] AS destination#194]
   :        +- Project [num_passengers#16, sales_channel#17, trip_type#18, purchase_lead#19, length_of_stay#20, flight_hour#21, flight_day#22, route#23, booking_origin#24, wants_extra_baggage#25, wants_preferred_seat#26, wants_in_flight_meals#27, flight_duration#28, booking_complete#29, split(route#23,  to , -1)[0] AS origin#178]
   :           +- Relation[num_passengers#16,sales_channel#17,trip_type#18,purchase_lead#19,length_of_stay#20,flight_hour#21,flight_day#22,route#23,booking_origin#24,wants_extra_baggage#25,wants_preferred_seat#26,wants_in_flight_meals#27,flight_duration#28,booking_complete#29] csv
   +- SubqueryAlias f
      +- SubqueryAlias flights
         +- Project [FL_DATE#110, AIRLINE#111, AIRLINE_DOT#112, AIRLINE_CODE#113, DOT_CODE#114, FL_NUMBER#115, ORIGIN#116, ORIGIN_CITY#117, DEST#118, DEST_CITY#119, CRS_DEP_TIME#120, DEP_TIME#121, DEP_DELAY#122, TAXI_OUT#123, WHEELS_OFF#124, WHEELS_ON#125, TAXI_IN#126, CRS_ARR_TIME#127, ARR_TIME#128, ARR_DELAY#129, CANCELLED#130, CANCELLATION_CODE#131, DIVERTED#132, CRS_ELAPSED_TIME#133, ... 9 more fields]
            +- Relation[FL_DATE#110,AIRLINE#111,AIRLINE_DOT#112,AIRLINE_CODE#113,DOT_CODE#114,FL_NUMBER#115,ORIGIN#116,ORIGIN_CITY#117,DEST#118,DEST_CITY#119,CRS_DEP_TIME#120,DEP_TIME#121,DEP_DELAY#122,TAXI_OUT#123,WHEELS_OFF#124,WHEELS_ON#125,TAXI_IN#126,CRS_ARR_TIME#127,ARR_TIME#128,ARR_DELAY#129,CANCELLED#130,CANCELLATION_CODE#131,DIVERTED#132,CRS_ELAPSED_TIME#133,... 8 more fields] csv


In [24]:
print("✅ Booking Schema:")
booking_sdf.printSchema()

print("\n✅ Flights Schema:")
flights_sdf.printSchema()


✅ Booking Schema:
root
 |-- num_passengers: integer (nullable = true)
 |-- sales_channel: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- purchase_lead: integer (nullable = true)
 |-- length_of_stay: integer (nullable = true)
 |-- flight_hour: integer (nullable = true)
 |-- flight_day: string (nullable = true)
 |-- route: string (nullable = true)
 |-- booking_origin: string (nullable = true)
 |-- wants_extra_baggage: integer (nullable = true)
 |-- wants_preferred_seat: integer (nullable = true)
 |-- wants_in_flight_meals: integer (nullable = true)
 |-- flight_duration: double (nullable = true)
 |-- booking_complete: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)


✅ Flights Schema:
root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER:

In [25]:
flights_sdf = flights_sdf \
    .withColumnRenamed("ORIGIN", "origin") \
    .withColumnRenamed("DEST", "destination")

flights_sdf.createOrReplaceTempView("flights")


In [26]:
occupancy_df = spark.sql("""
SELECT
    f.origin, f.destination,
    COUNT(*) AS total_bookings,
    SUM(b.num_passengers) AS passengers_booked,
    AVG(f.seats_available) AS avg_seats,
    ROUND(SUM(b.num_passengers)/AVG(f.seats_available), 2) AS occupancy_rate
FROM bookings b
JOIN flights f
ON b.origin = f.origin AND b.destination = f.destination
GROUP BY f.origin, f.destination
ORDER BY occupancy_rate DESC
""")

occupancy_df.show(10)


+------+-----------+--------------+-----------------+---------+--------------+
|origin|destination|total_bookings|passengers_booked|avg_seats|occupancy_rate|
+------+-----------+--------------+-----------------+---------+--------------+
+------+-----------+--------------+-----------------+---------+--------------+



In [27]:
cancellation_df = spark.sql("""
SELECT
    origin,
    destination,
    COUNT(*) AS total_flights,
    SUM(CAST(CANCELLED AS INT)) AS cancelled_flights,
    ROUND(SUM(CAST(CANCELLED AS INT)) / COUNT(*) * 100, 2) AS cancellation_rate
FROM flights
GROUP BY origin, destination
ORDER BY cancellation_rate DESC
""")

cancellation_df.show(10)


+------+-----------+-------------+-----------------+-----------------+
|origin|destination|total_flights|cancelled_flights|cancellation_rate|
+------+-----------+-------------+-----------------+-----------------+
|   MEM|        DSM|            2|                2|            100.0|
|   RNO|        JAC|            1|                1|            100.0|
|   GRR|        RAP|            1|                1|            100.0|
|   PIT|        MEM|            1|                1|            100.0|
|   PWM|        FLL|            1|                1|            100.0|
|   COS|        HOU|            1|                1|            100.0|
|   JAC|        SJC|            1|                1|            100.0|
|   AUS|        ACY|            1|                1|            100.0|
|   LAS|        JAC|            1|                1|            100.0|
|   EGE|        SLC|            1|                1|            100.0|
+------+-----------+-------------+-----------------+-----------------+
only s

In [28]:
from pyspark.sql import functions as F


In [29]:
# Null Check
for df, name in zip([booking_sdf, flights_sdf, reviews_sdf], ['Booking', 'Flights', 'Reviews']):
    print(f"\n🔎 Null Count in {name} Dataset:")
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Duplicate Check
for df, name in zip([booking_sdf, flights_sdf, reviews_sdf], ['Booking', 'Flights', 'Reviews']):
    print(f"\n🧹 Duplicate Rows in {name} Dataset:")
    print(df.count(), "rows total,", df.dropDuplicates().count(), "after removing duplicates")

# Data Type Check
for df, name in zip([booking_sdf, flights_sdf, reviews_sdf], ['Booking', 'Flights', 'Reviews']):
    print(f"\n📋 Schema Check for {name} Dataset:")
    df.printSchema()



🔎 Null Count in Booking Dataset:
+--------------+-------------+---------+-------------+--------------+-----------+----------+-----+--------------+-------------------+--------------------+---------------------+---------------+----------------+------+-----------+
|num_passengers|sales_channel|trip_type|purchase_lead|length_of_stay|flight_hour|flight_day|route|booking_origin|wants_extra_baggage|wants_preferred_seat|wants_in_flight_meals|flight_duration|booking_complete|origin|destination|
+--------------+-------------+---------+-------------+--------------+-----------+----------+-----+--------------+-------------------+--------------------+---------------------+---------------+----------------+------+-----------+
|             0|            0|        0|            0|             0|          0|         0|    0|             0|                  0|                   0|                    0|              0|               0|     0|      50000|
+--------------+-------------+---------+----------

In [30]:
review_query = """
SELECT
  Airline,
  COUNT(*) AS total_reviews,
  AVG(CAST(`Overall Rating` AS DOUBLE)) AS avg_rating,
  ROUND(100.0 * SUM(CASE WHEN Recommended = 'yes' THEN 1 ELSE 0 END) / COUNT(*), 2) AS recommend_percent
FROM reviews
GROUP BY Airline
ORDER BY avg_rating DESC
"""
review_df = spark.sql(review_query)
review_df.show()


+--------------------+-------------+------------------+-----------------+
|             Airline|total_reviews|        avg_rating|recommend_percent|
+--------------------+-------------+------------------+-----------------+
| friendly cabin c...|            1|              10.0|           100.00|
|     the best lounge|            1|              10.0|           100.00|
| Cabin crew was kind|            1|              10.0|           100.00|
| it was time to s...|            1|              10.0|           100.00|
| which hasn't hap...|            1|               9.0|           100.00|
| I bought connect...|            1|               8.0|           100.00|
|  All Nippon Airways|          258| 7.737704918032787|            73.64|
|             EVA Air|          279| 7.357414448669202|            70.25|
|       Qatar Airways|         1621|7.1513409961685825|            68.41|
|           the meals|            1|               7.0|           100.00|
| although did imp...|            1|  

In [31]:
review_df.coalesce(1).write.option("header", True).mode("overwrite").csv("review_kpis_output")


In [32]:
# Check unique airline codes (MDM)
spark.sql("SELECT DISTINCT Airline FROM reviews").show()


+--------------------+
|             Airline|
+--------------------+
|    transfers at DXB|
|             EVA Air|
|      Couple Leisure|
| even though they...|
|      Family Leisure|
|        January 2019|
| with average cat...|
| every time with ...|
| Cabin crew was kind|
|                food|
|         August 2017|
| the crew on both...|
|    Turkish Airlines|
|      September 2023|
| referencing thei...|
|       Qatar Airways|
| and of a good qu...|
| it was time to s...|
|        Solo Leisure|
| check in is fast...|
+--------------------+
only showing top 20 rows



In [35]:
delay_query = """
SELECT origin, destination,
       COUNT(*) AS total_flights,
       ROUND(AVG(dep_delay), 2) AS avg_departure_delay
FROM flights
GROUP BY origin, destination
ORDER BY avg_departure_delay DESC
"""

delay_df = spark.sql(delay_query)
delay_df.show(10)


+------+-----------+-------------+-------------------+
|origin|destination|total_flights|avg_departure_delay|
+------+-----------+-------------+-------------------+
|   DEN|        ABE|            1|             1089.0|
|   MIA|        HSV|            1|              681.0|
|   FCA|        LGA|            4|              394.5|
|   SFB|        GFK|            6|             297.33|
|   COS|        SAN|            3|              203.0|
|   CHA|        MSP|            1|              201.0|
|   PSC|        SAN|            8|             199.71|
|   VPS|        STC|            2|              195.0|
|   KOA|        BLI|            1|              185.0|
|   FOD|        DEN|            8|              164.5|
+------+-----------+-------------+-------------------+
only showing top 10 rows



In [36]:
ontime_query = """
SELECT AIRLINE,
       COUNT(*) AS total_flights,
       SUM(CASE WHEN ARR_DELAY <= 0 THEN 1 ELSE 0 END) AS ontime_flights,
       ROUND(SUM(CASE WHEN ARR_DELAY <= 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS ontime_percent
FROM flights
GROUP BY AIRLINE
ORDER BY ontime_percent DESC
"""
ontime_df = spark.sql(ontime_query)
ontime_df.show(10)


+--------------------+-------------+--------------+--------------+
|             AIRLINE|total_flights|ontime_flights|ontime_percent|
+--------------------+-------------+--------------+--------------+
|   Endeavor Air Inc.|       112463|         84211|         74.88|
|Delta Air Lines Inc.|       395239|        276841|         70.04|
|    Republic Airline|       143107|         98437|         68.79|
|SkyWest Airlines ...|       343737|        235436|         68.49|
|   PSA Airlines Inc.|       107050|         69527|         64.95|
|United Air Lines ...|       254504|        163778|         64.35|
|           Envoy Air|       121256|         78017|         64.34|
|  Mesa Airlines Inc.|        65012|         41744|         64.21|
|         Horizon Air|        20634|         13163|         63.79|
|American Airlines...|       383106|        239741|         62.58|
+--------------------+-------------+--------------+--------------+
only showing top 10 rows



In [37]:
review_kpi_query = """
SELECT Airline,
       COUNT(*) AS total_reviews,
       ROUND(AVG(CAST(`Overall Rating` AS DOUBLE)), 2) AS avg_rating
FROM reviews
GROUP BY Airline
ORDER BY avg_rating DESC
"""
review_kpi_df = spark.sql(review_kpi_query)
review_kpi_df.show(10)


+--------------------+-------------+----------+
|             Airline|total_reviews|avg_rating|
+--------------------+-------------+----------+
| it was time to s...|            1|      10.0|
|     the best lounge|            1|      10.0|
| Cabin crew was kind|            1|      10.0|
| friendly cabin c...|            1|      10.0|
| which hasn't hap...|            1|       9.0|
| I bought connect...|            1|       8.0|
|  All Nippon Airways|          258|      7.74|
|             EVA Air|          279|      7.36|
|       Qatar Airways|         1621|      7.15|
|           the meals|            1|       7.0|
+--------------------+-------------+----------+
only showing top 10 rows



In [38]:
# Example: Null Check for key fields
from pyspark.sql.functions import col

null_check_df = flights_sdf.select(
    [col(c).isNull().alias(f"{c}_is_null") for c in ['FL_DATE', 'ORIGIN', 'DESTINATION', 'AIRLINE']]
)
null_check_df.show(5)


+---------------+--------------+-------------------+---------------+
|FL_DATE_is_null|ORIGIN_is_null|DESTINATION_is_null|AIRLINE_is_null|
+---------------+--------------+-------------------+---------------+
|          false|         false|              false|          false|
|          false|         false|              false|          false|
|          false|         false|              false|          false|
|          false|         false|              false|          false|
|          false|         false|              false|          false|
+---------------+--------------+-------------------+---------------+
only showing top 5 rows



In [39]:
for c in ['FL_DATE', 'ORIGIN', 'DESTINATION', 'AIRLINE']:
    print(f"{c}: {flights_sdf.filter(col(c).isNull()).count()} nulls")


FL_DATE: 0 nulls
ORIGIN: 0 nulls
DESTINATION: 0 nulls
AIRLINE: 0 nulls


In [41]:
max_rating = review_kpi_df.select("avg_rating").toPandas()["avg_rating"].max()
assert max_rating <= 10, "Ratings should be ≤ 10"


In [42]:
review_kpi_df.toPandas().to_csv("/content/review_kpis_output/review_kpis.csv", index=False)
ontime_df.toPandas().to_csv("/content/review_kpis_output/on_time_kpi.csv", index=False)
