In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.4-py2.py3-none-any.whl size=317849766 sha256=0c97a3891ec7301ae6ed51956047662bab19e34488e5d4c00733eb75821c9949
  Stored in directory: /root/.cache/pip/wheels/8d/28/22/5dbae8a8714ef046cebd320d0ef7c92f5383903cf854c15c0c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully in

In [2]:
import pandas as pd
import numpy as np

Dataset download

In [3]:
# import os
# import kagglehub
# from google.colab import drive

# # Mount Google Drive
# drive.mount('/content/drive')

# # Define a permanent save location inside Google Drive
# save_path = "/content/drive/MyDrive/datasets/flight_prices"
# os.makedirs(save_path, exist_ok=True)

# # Download dataset using kagglehub
# file_path = kagglehub.dataset_download("dilwong/flightprices")

# # Move dataset to Google Drive
# os.system(f"mv {file_path}/* {save_path}/")

# print(f"Dataset saved permanently at: {save_path}/itineraries.csv")


## Code to Read CSV Headers

In [4]:
from google.colab import drive
drive.mount('/content/drive')

file_path = "/content/drive/MyDrive/datasets/flight_prices/itineraries.csv"


Mounted at /content/drive


In [5]:
# Read only the first rows to get column names
try:
    df_preview = pd.read_csv(file_path, nrows=5)
    column_names = df_preview.columns.tolist()

    print("Columns in the dataset:")
    print(column_names)

except Exception as e:
    print(f"Error reading file: {e}")


Columns in the dataset:
['legId', 'searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'travelDuration', 'elapsedDays', 'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare', 'seatsRemaining', 'totalTravelDistance', 'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw', 'segmentsArrivalTimeEpochSeconds', 'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode', 'segmentsAirlineName', 'segmentsAirlineCode', 'segmentsEquipmentDescription', 'segmentsDurationInSeconds', 'segmentsDistance', 'segmentsCabinCode']


## Data Dictionary

In [6]:
# Convert columns to appropriate data types
conversion_dict = {
    "searchDate": "datetime64[ns]",
    "flightDate": "datetime64[ns]",
    "segmentsDepartureTimeRaw": "datetime64[ns]",
    "segmentsArrivalTimeRaw": "datetime64[ns]",
    "elapsedDays": "Int64",
    "isBasicEconomy": "boolean",
    "isRefundable": "boolean",
    "isNonStop": "boolean",
    "baseFare": "float64",
    "totalFare": "float64",
    "seatsRemaining": "Int64",
    "totalTravelDistance": "float64",
    "segmentsDepartureTimeEpochSeconds": "Int64",
    "segmentsArrivalTimeEpochSeconds": "Int64",
    "segmentsDurationInSeconds": "Int64",
    "segmentsDistance": "float64"
}

# Convert categorical columns to category type
categorical_columns = [
    "startingAirport", "destinationAirport", "fareBasisCode",
    "segmentsArrivalAirportCode", "segmentsDepartureAirportCode",
    "segmentsAirlineName", "segmentsAirlineCode", "segmentsEquipmentDescription",
    "segmentsCabinCode"
]

# Apply type conversions
for col, dtype in conversion_dict.items():
    if col in df_preview.columns:
        try:
            df_preview[col] = pd.to_datetime(df_preview[col]) if "datetime" in dtype else df_preview[col].astype(dtype)
        except Exception as e:
            print(f"Warning: Could not convert column '{col}' to {dtype}. Error: {e}")

# Convert categorical columns
for col in categorical_columns:
    if col in df_preview.columns:
        df_preview[col] = df_preview[col].astype("category")

# Function to get example values (first non-null value)
def get_example_value(df, column_name):
    return df[column_name].dropna().iloc[0] if column_name in df.columns else "N/A"

# Descriptions from Kaggle
column_descriptions = {
    "legId": "An identifier for the flight.",
    "searchDate": "Date when this entry was recorded from Expedia.",
    "flightDate": "Date of the flight.",
    "startingAirport": "Three-character IATA code for the departure airport.",
    "destinationAirport": "Three-character IATA code for the arrival airport.",
    "fareBasisCode": "The fare basis code.",
    "travelDuration": "Total travel duration in hours and minutes.",
    "elapsedDays": "Number of elapsed days (usually 0).",
    "isBasicEconomy": "Indicates whether the ticket is for basic economy.",
    "isRefundable": "Indicates whether the ticket is refundable.",
    "isNonStop": "Indicates whether the flight is non-stop.",
    "baseFare": "Base price of the ticket (in USD).",
    "totalFare": "Total price of the ticket including taxes and fees.",
    "seatsRemaining": "Number of seats remaining.",
    "totalTravelDistance": "Total travel distance. This data is sometimes missing.",
    "segmentsDepartureTimeEpochSeconds": "Unix time for departure of each segment. Entries are separated by '||'.",
    "segmentsDepartureTimeRaw": "ISO 8601 formatted departure time for each segment. Entries are separated by '||'.",
    "segmentsArrivalTimeEpochSeconds": "Unix time for arrival of each segment. Entries are separated by '||'.",
    "segmentsArrivalTimeRaw": "ISO 8601 formatted arrival time for each segment. Entries are separated by '||'.",
    "segmentsArrivalAirportCode": "IATA code for arrival airport of each segment. Entries are separated by '||'.",
    "segmentsDepartureAirportCode": "IATA code for departure airport of each segment. Entries are separated by '||'.",
    "segmentsAirlineName": "Name of the airline for each segment. Entries are separated by '||'.",
    "segmentsAirlineCode": "Two-letter airline code for each segment. Entries are separated by '||'.",
    "segmentsEquipmentDescription": "Type of airplane used for each segment. Entries are separated by '||'.",
    "segmentsDurationInSeconds": "Duration of the flight (in seconds) for each segment. Entries are separated by '||'.",
    "segmentsDistance": "Distance traveled (in miles) for each segment. Entries are separated by '||'.",
    "segmentsCabinCode": "Cabin code for each segment (e.g., coach). Entries are separated by '||'."
}

# Create the Data Dictionary dynamically
data_dict = [
    {
        "Column Name": col,
        "Data Type": str(df_preview[col].dtype),  # Now retrieves ACTUAL type from DataFrame
        "Description": column_descriptions.get(col, "N/A"),  # Use predefined description
        "Example Value": get_example_value(df_preview, col)
    }
    for col in df_preview.columns
]

# Convert to DataFrame
df_dict = pd.DataFrame(data_dict)

# Display the Data Dictionary
display(df_dict)

Unnamed: 0,Column Name,Data Type,Description,Example Value
0,legId,object,An identifier for the flight.,9ca0e81111c683bec1012473feefd28f
1,searchDate,datetime64[ns],Date when this entry was recorded from Expedia.,2022-04-16 00:00:00
2,flightDate,datetime64[ns],Date of the flight.,2022-04-17 00:00:00
3,startingAirport,category,Three-character IATA code for the departure ai...,ATL
4,destinationAirport,category,Three-character IATA code for the arrival airp...,BOS
5,fareBasisCode,category,The fare basis code.,LA0NX0MC
6,travelDuration,object,Total travel duration in hours and minutes.,PT2H29M
7,elapsedDays,Int64,Number of elapsed days (usually 0).,0
8,isBasicEconomy,boolean,Indicates whether the ticket is for basic econ...,False
9,isRefundable,boolean,Indicates whether the ticket is refundable.,False


## Data Ingestion

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split, trim, countDistinct, avg
from pyspark.sql.types import *

# Increase memory allocation to prevent crashes
spark = SparkSession.builder.appName("FlightDataIngestion") \
    .config("spark.driver.memory", "100g") \
    .getOrCreate()

# Define Schema
schema = StructType([
    StructField("legId", StringType(), True),
    StructField("searchDate", DateType(), True),
    StructField("flightDate", DateType(), True),
    StructField("startingAirport", StringType(), True),
    StructField("destinationAirport", StringType(), True),
    StructField("fareBasisCode", StringType(), True),
    StructField("travelDuration", StringType(), True),
    StructField("elapsedDays", IntegerType(), True),
    StructField("isBasicEconomy", BooleanType(), True),
    StructField("isRefundable", BooleanType(), True),
    StructField("isNonStop", BooleanType(), True),
    StructField("baseFare", DoubleType(), True),
    StructField("totalFare", DoubleType(), True),
    StructField("seatsRemaining", IntegerType(), True),
    StructField("totalTravelDistance", DoubleType(), True),
    StructField("segmentsDepartureTimeRaw", StringType(), True),
    StructField("segmentsArrivalTimeRaw", StringType(), True),
    StructField("segmentsArrivalAirportCode", StringType(), True),
    StructField("segmentsDepartureAirportCode", StringType(), True),
    StructField("segmentsAirlineName", StringType(), True),
    StructField("segmentsAirlineCode", StringType(), True),
    StructField("segmentsEquipmentDescription", StringType(), True),
    StructField("segmentsCabinCode", StringType(), True),
    StructField("segmentsDepartureTimeEpochSeconds", StringType(), True),
    StructField("segmentsArrivalTimeEpochSeconds", StringType(), True),
    StructField("segmentsDurationInSeconds", StringType(), True),
    StructField("segmentsDistance", StringType(), True)
])

# Read CSV file into Spark DataFrame
print("Loading dataset using PySpark...")
df = spark.read.csv(file_path, schema=schema, header=True)

# Trim whitespace from all string columns
df = df.select([trim(col(c)).alias(c) if t == "string" else col(c) for c, t in df.dtypes])

# Compute distinct counts for all columns efficiently
print("Computing distinct counts for all columns...")
distinct_counts = df.agg(*[countDistinct(col(c)).alias(c) for c in df.columns])

# Display distinct counts in small groups to avoid memory overload
num_columns = len(df.columns)
batch_size = 10  # Show results in groups of 10 columns
for i in range(0, num_columns, batch_size):
    cols_to_show = df.columns[i:i + batch_size]
    print(f"Distinct counts for columns {i + 1} to {i + batch_size}:")
    distinct_counts.select(cols_to_show).show()

# Identify truly empty columns (only 1 unique value, typically NULL)
empty_cols = [c for c in df.columns if distinct_counts.collect()[0][c] == 1]
if empty_cols:
    print(f"WARNING: These columns appear empty: {empty_cols}")
    df.select(empty_cols).show()

# Convert Date & Boolean Columns
df = df.withColumn("searchDate", col("searchDate").cast(DateType())) \
       .withColumn("flightDate", col("flightDate").cast(DateType())) \
       .withColumn("isBasicEconomy", col("isBasicEconomy").cast(BooleanType())) \
       .withColumn("isRefundable", col("isRefundable").cast(BooleanType())) \
       .withColumn("isNonStop", col("isNonStop").cast(BooleanType()))

# Convert Numeric Columns
numeric_cols = ["elapsedDays", "baseFare", "totalFare", "seatsRemaining", "totalTravelDistance"]
for col_name in numeric_cols:
    df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

# Process multi-value columns (split `||` separator & extract first value)
multi_value_columns = [
    "segmentsDepartureTimeEpochSeconds",
    "segmentsArrivalTimeEpochSeconds",
    "segmentsDurationInSeconds",
    "segmentsDistance"
]
for col_name in multi_value_columns:
    df = df.withColumn(col_name, split(col(col_name), r"\|\|")[0].cast(DoubleType()))

# Compute average totalTravelDistance per (startingAirport, destinationAirport)
avg_distance_df = df.groupBy("startingAirport", "destinationAirport") \
                    .agg(avg("totalTravelDistance").alias("avg_distance"))

# Join the average distance back to the original DataFrame
df = df.join(avg_distance_df, ["startingAirport", "destinationAirport"], "left")

# Fill missing totalTravelDistance with the computed average
df = df.withColumn("totalTravelDistance",
                   when(col("totalTravelDistance").isNull(), col("avg_distance"))
                   .otherwise(col("totalTravelDistance")))

# Drop the temporary avg_distance column
df = df.drop("avg_distance")

# Handle Missing Values
df = df.fillna({"segmentsEquipmentDescription": "Unknown"})

# Check final missing values summary
df.summary().show()

# Save cleaned dataset
df.write.csv("/content/drive/MyDrive/datasets/flight_prices/itineraries_cleaned.csv", header=True, mode="overwrite")

print("Full dataset ingestion completed with PySpark. Cleaned dataset saved.")


Loading dataset using PySpark...
Computing distinct counts for all columns...
Distinct counts for columns 1 to 10:
+-------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+
|  legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|
+-------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+
|5999739|       171|       217|             16|                16|        21062|          2110|          3|             2|           2|
+-------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+

Distinct counts for columns 11 to 20:
+---------+--------+---------+--------------+-------------------+------------------------+----------------------+--------------------------+--------------------------

## Issues Identified
### Columns Containing NULL or Infinity Values

Some columns (legId, fareBasisCode, travelDuration, etc.) have "Infinity" or "NULL" values, meaning their conversion did not work correctly.


The legId column contains "Infinity", which is likely an issue with its data type handling.


### Multi-Value Columns Still Have Delimiters (||)

segmentsDepartureTimeEpochSeconds, segmentsArrivalTimeEpochSeconds, segmentsDurationInSeconds, and segmentsDistance may still contain ||-separated values.


### Columns with 100% Missing Values

The summary shows some columns have all values as NULL (e.g., segmentsDepartureTimeEpochSeconds and segmentsDistance).
These columns might need to be dropped if they contain no usable data.

### Date and Time Handling Issues

segmentsDepartureTimeRaw and segmentsArrivalTimeRaw still contain raw timestamp strings.
Some datetime fields remain as NULL, meaning conversion failed in certain rows.