In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

In [2]:
# Creating a SparkSession
spark = SparkSession.builder.appName("AirlinesData").getOrCreate()

24/12/26 07:01:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Loading the passengers data from a csv file in a gcs bucket
passengers_df = spark.read.format("csv")\
                            .option("header", True)\
                            .option("inferschema", True)\
                            .option("mode", "PERMISSIVE")\
                            .load("gs://bucket_path/passengers_dirty.csv")

# Loading the flight bookings data from a csv file in a gcs bucket
airlines_df = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferschema", True)\
                        .option("mode", "PERMISSIVE")\
                        .load("gs://bucket_path/flight_bookings_dirty.csv")

                                                                                

In [4]:
passengers_df.show(5)

+------------+--------------+---------------+
|Passenger_ID|Passenger_Name|          Class|
+------------+--------------+---------------+
|           1|    Odika Rama|Premium Economy|
|           2|      Ryan Jha|       Business|
|           3|    Megha Keer|        Economy|
|           4| Zaitra Minhas|Premium Economy|
|           5|     Aarav Ram|       Business|
+------------+--------------+---------------+
only showing top 5 rows



In [5]:
airlines_df.show(5)

+----------+-------------------+---------+-------------------+-------------------+---------+-----------+-----+------------+
|Booking_ID|       Booking_Date|  Airline|        Travel_Date|       Arrival_Date|   Source|Destination|Price|Passenger_ID|
+----------+-------------------+---------+-------------------+-------------------+---------+-----------+-----+------------+
|         1|2021-01-09 00:00:00| SpiceJet|2021-01-29 14:49:00|2021-01-29 19:47:00|     Pune|      Delhi| 7996|           1|
|         2|2021-04-27 00:00:00|Akasa Air|2021-04-28 09:08:00|2021-04-28 13:16:00|  Lucknow|      Delhi| 9011|           1|
|         3|2022-10-18 00:00:00|Akasa Air|2022-11-16 11:19:00|2022-11-16 14:16:00|Hyderabad|      Patna|10179|           1|
|         4|2020-12-07 00:00:00|   TruJet|2020-12-14 19:19:00|2020-12-15 01:54:00|    Delhi|    Chennai| 7452|           2|
|         5|2021-04-02 00:00:00| SpiceJet|2021-04-26 21:44:00|2021-04-27 03:21:00|    Patna|     Indore| 9741|           2|
+-------

In [6]:
# Printing the schema for passengers data
passengers_df.printSchema()

# Print the schema of the flight bookings data
airlines_df.printSchema()

root
 |-- Passenger_ID: integer (nullable = true)
 |-- Passenger_Name: string (nullable = true)
 |-- Class: string (nullable = true)

root
 |-- Booking_ID: integer (nullable = true)
 |-- Booking_Date: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Travel_Date: timestamp (nullable = true)
 |-- Arrival_Date: timestamp (nullable = true)
 |-- Source: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Passenger_ID: integer (nullable = true)



## Checking for null values in both dataframes and fixing it

In [7]:
# Checking for the null values in each column of passengers_df
passengers_null_count = passengers_df\
                    .select(
                            [sum(col(c).isNull().cast("int")).alias(c) for c in passengers_df.columns]
                    )
passengers_null_count.show()

+------------+--------------+-----+
|Passenger_ID|Passenger_Name|Class|
+------------+--------------+-----+
|           0|             0|    0|
+------------+--------------+-----+



In [8]:
# Checking for null values in each column of airlines_df
airlines_null_count = airlines_df\
                    .select(
                            [sum(col(c).isNull().cast("int")).alias(c) for c in airlines_df.columns]
                    )
airlines_null_count.show()

+----------+------------+-------+-----------+------------+------+-----------+-----+------------+
|Booking_ID|Booking_Date|Airline|Travel_Date|Arrival_Date|Source|Destination|Price|Passenger_ID|
+----------+------------+-------+-----------+------------+------+-----------+-----+------------+
|         0|          28|      0|         33|          33|     0|          0|    0|           0|
+----------+------------+-------+-----------+------------+------+-----------+-----+------------+



In [9]:
# Dropping rows with any null values in any of columns in airlines_df
airlines_df = airlines_df.dropna(how="any")

# Rechecking for null values after dropping
airlines_null_count = airlines_df\
                    .select(
                        [sum(col(c).isNull().cast("int")).alias(c) for c in airlines_df.columns]
                    )
airlines_null_count.show()


[Stage 12:>                                                         (0 + 1) / 1]

+----------+------------+-------+-----------+------------+------+-----------+-----+------------+
|Booking_ID|Booking_Date|Airline|Travel_Date|Arrival_Date|Source|Destination|Price|Passenger_ID|
+----------+------------+-------+-----------+------------+------+-----------+-----+------------+
|         0|           0|      0|          0|           0|     0|          0|    0|           0|
+----------+------------+-------+-----------+------------+------+-----------+-----+------------+



                                                                                

## Fixing the format of Booking_Date column

In [10]:
# Dealing with the Booking_Date column
flight_booking_date = airlines_df\
                            .withColumn(
                                        "Booking_Date",
                                        when(col("Booking_Date").contains(" "), split(col("Booking_Date"), " ")[0])
                                        .otherwise(col("Booking_Date"))
                            ).withColumn(
                                        "Booking_Date",
                                        coalesce(
                                            to_date("Booking_Date", "yyyy-MM-dd"),
                                            to_date("Booking_Date", "dd/MM/yyyy")
                                        )
                            )
                                
flight_booking_date.select("Booking_Date").show(2)

+------------+
|Booking_Date|
+------------+
|  2021-01-09|
|  2021-04-27|
+------------+
only showing top 2 rows



## Extracting travel and arrival times from the Travel_Date and Arrival_Date

In [11]:
# Creating seperate time and date columns
flight_travel_arrival_time_df = flight_booking_date\
                                    .withColumn("Take_Off_Time", date_format("Travel_Date", "HH:mm:ss"))\
                                    .withColumn("Traveling_Date", to_date("Travel_Date", "yyyy-MM-dd"))\
                                    .withColumn("Landing_time", date_format("Arrival_Date", "HH:mm:ss"))\
                                    .withColumn("Arriving_Date", to_date("Travel_Date", "yyyy-MM-dd"))
flight_travel_arrival_time_df.show(1)

+----------+------------+--------+-------------------+-------------------+------+-----------+-----+------------+-------------+--------------+------------+-------------+
|Booking_ID|Booking_Date| Airline|        Travel_Date|       Arrival_Date|Source|Destination|Price|Passenger_ID|Take_Off_Time|Traveling_Date|Landing_time|Arriving_Date|
+----------+------------+--------+-------------------+-------------------+------+-----------+-----+------------+-------------+--------------+------------+-------------+
|         1|  2021-01-09|SpiceJet|2021-01-29 14:49:00|2021-01-29 19:47:00|  Pune|      Delhi| 7996|           1|     14:49:00|    2021-01-29|    19:47:00|   2021-01-29|
+----------+------------+--------+-------------------+-------------------+------+-----------+-----+------------+-------------+--------------+------------+-------------+
only showing top 1 row



## Calculating the flight duration

In [12]:
# Will calculate the flight duration and then select relevant columns
flight_duration_df = flight_travel_arrival_time_df\
                        .withColumn(
                                "Duration_in_hours",
                                round((unix_timestamp("Arrival_Date") - unix_timestamp("Travel_Date"))/3600)
                        ).selectExpr(
                                "Booking_ID", "Booking_Date", "Airline", "Source", "Destination",
                                "Traveling_Date", "Take_Off_Time", "Arriving_Date", "Landing_Time",
                                "Duration_in_hours", "Price", "Passenger_ID"
                        )
flight_duration_df.show(1)

+----------+------------+--------+------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|Booking_ID|Booking_Date| Airline|Source|Destination|Traveling_Date|Take_Off_Time|Arriving_Date|Landing_Time|Duration_in_hours|Price|Passenger_ID|
+----------+------------+--------+------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|         1|  2021-01-09|SpiceJet|  Pune|      Delhi|    2021-01-29|     14:49:00|   2021-01-29|    19:47:00|              5.0| 7996|           1|
+----------+------------+--------+------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
only showing top 1 row



## Fixing the airline column which has corrputed/bad values

In [13]:
# Filtering out the airlines with names less than or equal to 3 characters
airline_names_df = flight_duration_df.filter(length(col("Airline")) > 3)

In [14]:
# Fetching incorrect airline names into a list
wrong_unique_airlines = airline_names_df.filter(
    "Airline NOT IN ('Air India', 'IndiGo', 'SpiceJet', 'Go First', 'Vistara', 'AirAsia India', 'Alliance Air', 'Star Air', 'TruJet', 'Akasa Air')"
).select("Airline").distinct()

wrong_unique_airlines_list = [row["Airline"] for row in wrong_unique_airlines.collect()] 
print(f"Bad records in Airline column : {wrong_unique_airlines_list}")

[Stage 18:>                                                         (0 + 1) / 1]

Bad records in Airline column : ['Spic', 'Vistar', 'AirAsia Indi', 'Star Ai', 'Go Fir', 'Akasa ', 'AirAsia ', 'Akasa Ai', 'Star A']


                                                                                

In [15]:
# Defining a mapping for valid airline names
airline_mapping = {
    "AirAsia ": "AirAsia India",
    "Spic": "SpiceJet",
    "Vistar": "Vistara",
    "Star Ai": "Star Air",
    "Akasa Ai": "Akasa Air",
    "Star A": "Star Air",
    "Go Fir": "Go First",
    "Akasa ": "Akasa Air",
    "AirAsia Indi": "AirAsia India"
}

# Broadcasting the mapping dictionary
airline_map_broadcast = spark.sparkContext.broadcast(airline_mapping)

# Defining a function to fix airline column
def fix_airline_name(airline_name) :
    return airline_map_broadcast.value.get(airline_name, airline_name)

# Creating a udf for applying the mapping
fix_airline_udf = udf(fix_airline_name, StringType())

# Now Applying the udf to clean the airline column
cleaned_airline_df = airline_names_df.withColumn("Airline", fix_airline_udf(col("Airline")))
cleaned_airline_df.show(5)

[Stage 21:>                                                         (0 + 1) / 1]

+----------+------------+---------+---------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|Booking_ID|Booking_Date|  Airline|   Source|Destination|Traveling_Date|Take_Off_Time|Arriving_Date|Landing_Time|Duration_in_hours|Price|Passenger_ID|
+----------+------------+---------+---------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|         1|  2021-01-09| SpiceJet|     Pune|      Delhi|    2021-01-29|     14:49:00|   2021-01-29|    19:47:00|              5.0| 7996|           1|
|         2|  2021-04-27|Akasa Air|  Lucknow|      Delhi|    2021-04-28|     09:08:00|   2021-04-28|    13:16:00|              4.0| 9011|           1|
|         3|  2022-10-18|Akasa Air|Hyderabad|      Patna|    2022-11-16|     11:19:00|   2022-11-16|    14:16:00|              3.0|10179|           1|
|         4|  2020-12-07|   TruJet|    Delhi|    Chennai|    2020-12-14|     19:19:00|   2020-

                                                                                

## Correct negative price values to positive

In [16]:
# Dealing with price column by correcting the negative values in price column
price_df = cleaned_airline_df.withColumn("Price", abs(col("Price")))
price_df.show(5)

+----------+------------+---------+---------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|Booking_ID|Booking_Date|  Airline|   Source|Destination|Traveling_Date|Take_Off_Time|Arriving_Date|Landing_Time|Duration_in_hours|Price|Passenger_ID|
+----------+------------+---------+---------+-----------+--------------+-------------+-------------+------------+-----------------+-----+------------+
|         1|  2021-01-09| SpiceJet|     Pune|      Delhi|    2021-01-29|     14:49:00|   2021-01-29|    19:47:00|              5.0| 7996|           1|
|         2|  2021-04-27|Akasa Air|  Lucknow|      Delhi|    2021-04-28|     09:08:00|   2021-04-28|    13:16:00|              4.0| 9011|           1|
|         3|  2022-10-18|Akasa Air|Hyderabad|      Patna|    2022-11-16|     11:19:00|   2022-11-16|    14:16:00|              3.0|10179|           1|
|         4|  2020-12-07|   TruJet|    Delhi|    Chennai|    2020-12-14|     19:19:00|   2020-

                                                                                

## Joining the passengers_df and price_df

In [17]:
# Using join
final_joined_df = passengers_df.join(price_df, ["Passenger_ID"], how="inner")
final_joined_df.show(2)

+------------+--------------+---------------+----------+------------+---------+-------+-----------+--------------+-------------+-------------+------------+-----------------+-----+
|Passenger_ID|Passenger_Name|          Class|Booking_ID|Booking_Date|  Airline| Source|Destination|Traveling_Date|Take_Off_Time|Arriving_Date|Landing_Time|Duration_in_hours|Price|
+------------+--------------+---------------+----------+------------+---------+-------+-----------+--------------+-------------+-------------+------------+-----------------+-----+
|           1|    Odika Rama|Premium Economy|         1|  2021-01-09| SpiceJet|   Pune|      Delhi|    2021-01-29|     14:49:00|   2021-01-29|    19:47:00|              5.0| 7996|
|           1|    Odika Rama|Premium Economy|         2|  2021-04-27|Akasa Air|Lucknow|      Delhi|    2021-04-28|     09:08:00|   2021-04-28|    13:16:00|              4.0| 9011|
+------------+--------------+---------------+----------+------------+---------+-------+-----------+-

                                                                                

## Setting up the BigQuery details

In [18]:
# Define the BigQuery table name
table_name = "your-project-id.dataset-id.airlines_data"

# Define the GCS bucket for temporary storage
temp_bucket = "gs://path_to_temp_folder/bq_temp_folder"

# Writing the DataFrame to BigQuery
try :
    final_joined_df.write \
            .format("bigquery") \
            .option("table", table_name) \
            .option("writeDisposition", "WRITE_APPEND") \
            .option("temporaryGcsBucket", temp_bucket) \
            .mode("append") \
            .save()
    print(f"Successfully loaded the data into the BigQuery table {table_name}")
except Exception as e :
    print(f"Error while loading the dataframe : {str(e)}")

                                                                                

Successfully loaded the data into the BigQuery table synthetic-nova-438808-k6.Airlines_dataset.airlines_data


## Stopping the Spark Session

In [19]:
# Stop the Spark session to release resources
if spark:
    spark.stop()
    print("Spark session stopped successfully.")

Spark session stopped successfully.
