In [1]:
! pip install pyspark -q

In [2]:
from pyspark import __version__

print(f"PySpark version: {__version__}")

PySpark version: 3.5.3


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

spark = SparkSession\
    .builder\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .appName('BDA_Project')\
    .getOrCreate()

In [4]:
# Laoding the data

data_path = "/content/itineraries_random_2M.csv"

df_raw = spark.read.option("header", "true").csv(data_path)
df_raw.show(5)

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDi

In [5]:
# Randomly sampled 5% of the data
df_sampled = df_raw.sample(withReplacement=False, fraction=0.05, seed=42)
df_sampled.show()

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+------------------+--------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|  s

In [6]:
df_sampled.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: string (nullable = true)
 |-- isBasicEconomy: string (nullable = true)
 |-- isRefundable: string (nullable = true)
 |-- isNonStop: string (nullable = true)
 |-- baseFare: string (nullable = true)
 |-- totalFare: string (nullable = true)
 |-- seatsRemaining: string (nullable = true)
 |-- totalTravelDistance: string (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: str

In [7]:
from pyspark.sql.functions import col

def drop_epoch_columns(df_sampled):
    """Drops columns containing 'Epoch' in their names."""
    columns_to_drop = [col_name for col_name in df_sampled.columns if "Epoch" in col_name]
    if columns_to_drop:  # Check if any columns need to be dropped
        df_sampled = df_sampled.drop(*columns_to_drop)
        print(f"Dropped columns: {columns_to_drop}")
    else:
        print("No columns with 'Epoch' found.")
    return df_sampled

In [8]:
df_sampled = drop_epoch_columns(df_sampled)

Dropped columns: ['segmentsDepartureTimeEpochSeconds', 'segmentsArrivalTimeEpochSeconds']


In [9]:
# dropping unnecessory columns
columns_to_drop = ["segmentsAirlineCode","segmentsDepartureTimeRaw", "segmentsArrivalTimeRaw","legId","fareBasisCode","segmentsEquipmentDescription","segmentsDurationInSeconds","segmentsDistance"]
df_sampled = df_sampled.drop(*columns_to_drop)

In [10]:
from pyspark.sql.functions import col, to_date, when, regexp_replace, unix_timestamp, from_unixtime, to_timestamp

def convert_data_types(df_sampled):
    """Converts columns to appropriate data types."""
    df_sampled = df_sampled.withColumn("searchDate", to_date(col("searchDate"))) \
                           .withColumn("flightDate", to_date(col("flightDate"))) \
                           .withColumn("baseFare", col("baseFare").cast("double")) \
                           .withColumn("totalFare", col("totalFare").cast("double")) \
                           .withColumn("elapsedDays", col("elapsedDays").cast("int")) \
                           .withColumn("seatsRemaining", col("seatsRemaining").cast("int")) \
                           .withColumn("totalTravelDistance", col("totalTravelDistance").cast("int")) \
                           .withColumn("travelDuration", regexp_replace(col("travelDuration"), "[^0-9]", "").cast("int"))\
                           .withColumn("isBasicEconomy", when(col("isBasicEconomy") == "TRUE", True).otherwise(False)) \
                           .withColumn("isRefundable", when(col("isRefundable") == "TRUE", True).otherwise(False)) \
                           .withColumn("isNonStop", when(col("isNonStop") == "TRUE", True).otherwise(False))


    return df_sampled

In [11]:
df_sampled = convert_data_types(df_sampled)

In [12]:
def find_segment_columns(df_sampled):
    """Finds columns containing 'segments' in their names."""
    segment_columns = [col_name for col_name in df_sampled.columns if "segments" in col_name]
    return segment_columns

In [13]:
segment_cols = find_segment_columns(df_sampled)

if segment_cols:
    print("Columns containing 'segments':")
    for col_name in segment_cols:
        print(f"- {col_name}")
else:
    print("No columns containing 'segments' found.")

Columns containing 'segments':
- segmentsArrivalAirportCode
- segmentsDepartureAirportCode
- segmentsAirlineName
- segmentsCabinCode


In [14]:
from pyspark.sql.functions import col, split, size, max as spark_max

def split_columns(df, col_name, new_col_prefix):
    """Splits a column containing values separated by '||' into multiple columns."""
    if col_name in df.columns:
        df = df.withColumn(col_name, split(col(col_name), r"\|\|"))
        max_length = df.agg(spark_max(size(col(col_name)))).collect()[0][0]
        new_cols = [col(col_name).getItem(i).alias(f"{new_col_prefix}_{i+1}") for i in range(max_length)]
        df = df.select("*", *new_cols).drop(col_name)
    else:
        print(f"Column '{col_name}' not found in the DataFrame.")
    return df

# Applying the function to each segment column
for col_name in segment_cols:
    new_col_prefix = col_name.replace("segments", "").lower()
    df_sampled = split_columns(df_sampled, col_name, new_col_prefix)

#df_sampled.show(5)

In [15]:
from pyspark.sql.functions import col, dayofweek, month, weekofyear, datediff, to_date, when, lit, year

def feature_engineering(df):
    """Engineers features from the DataFrame."""

    # Time-based features for searchDate
    df = df.withColumn("search_day_of_week", dayofweek(col("searchDate"))) \
           .withColumn("search_month", month(col("searchDate"))) \
           .withColumn("search_week_of_year", weekofyear(col("searchDate"))) \
           .withColumn("search_year", year(col("searchDate")))

    # Time-based features for flightDate
    df = df.withColumn("flight_day_of_week", dayofweek(col("flightDate"))) \
           .withColumn("flight_month", month(col("flightDate"))) \
           .withColumn("flight_week_of_year", weekofyear(col("flightDate"))) \
           .withColumn("flight_year", year(col("flightDate")))

    # Seasonality for searchDate
    df = df.withColumn("search_is_winter", when((col("search_month") == 12) | (col("search_month") == 1) | (col("search_month") == 2), lit(1)).otherwise(lit(0))) \
           .withColumn("search_is_summer", when((col("search_month") >= 6) & (col("search_month") <= 8), lit(1)).otherwise(lit(0))) \
           .withColumn("search_is_spring", when((col("search_month") >= 3) & (col("search_month") <= 5), lit(1)).otherwise(lit(0))) \
           .withColumn("search_is_fall", when((col("search_month") >= 9) & (col("search_month") <= 11), lit(1)).otherwise(lit(0)))

    # Seasonality for flightDate
    df = df.withColumn("flight_is_winter", when((col("flight_month") == 12) | (col("flight_month") == 1) | (col("flight_month") == 2), lit(1)).otherwise(lit(0))) \
           .withColumn("flight_is_summer", when((col("flight_month") >= 6) & (col("flight_month") <= 8), lit(1)).otherwise(lit(0))) \
           .withColumn("flight_is_spring", when((col("flight_month") >= 3) & (col("flight_month") <= 5), lit(1)).otherwise(lit(0))) \
           .withColumn("flight_is_fall", when((col("flight_month") >= 9) & (col("flight_month") <= 11), lit(1)).otherwise(lit(0)))

    # Weekend flags
    df = df.withColumn("search_is_weekend", when((col("search_day_of_week") == 1) | (col("search_day_of_week") == 7), lit(1)).otherwise(lit(0))) \
           .withColumn("flight_is_weekend", when((col("flight_day_of_week") == 1) | (col("flight_day_of_week") == 7), lit(1)).otherwise(lit(0)))

    # Days until departure
    df = df.withColumn("days_until_departure", datediff(col("flightDate"), col("searchDate")))

    # Day part (Morning, Afternoon, Evening) - Example using travelDuration
    df = df.withColumn("departure_hour", (col("travelDuration")/60).cast("int")) # Rough estimation of departure time
    df = df.withColumn("day_part", when((col("departure_hour") >= 6) & (col("departure_hour") < 12), "Morning") \
                                    .when((col("departure_hour") >= 12) & (col("departure_hour") < 18), "Afternoon") \
                                    .when((col("departure_hour") >= 18) & (col("departure_hour") < 20), "Evening") \
                                    .otherwise("Night"))
    df = df.drop("departure_hour")

    return df

In [16]:
df_sampled = feature_engineering(df_sampled)
# df_sampled.printSchema()
# df_sampled.select("flight_month").distinct().show()

In [17]:
columns_to_drop = ["searchDate", "flightDate"]
df_sampled = df_sampled.drop(*columns_to_drop)
# df_sampled.printSchema() # Verify the columns are dropped

In [18]:
from pyspark.sql.functions import when

df_sampled = df_sampled.withColumn("distance_bin",
                   when(col("totalTravelDistance") < 500, "Short Haul")
                   .when((col("totalTravelDistance") >= 500) & (col("totalTravelDistance") < 2000), "Medium Haul")
                   .otherwise("Long Haul"))

In [19]:
from pyspark.sql.functions import col, when, lit

def calculate_layovers(df):
    """Calculates the number of layovers based on segment data."""

    # Find the maximum segment number dynamically
    max_segment = max([int(c.split("_")[-1]) for c in df.columns if "arrivalairportcode_" in c])

    df = df.withColumn("num_segments", lit(0))
    for i in range(1, max_segment + 1):
        df = df.withColumn("num_segments", when(col(f"arrivalairportcode_{i}").isNotNull(), col("num_segments") + 1).otherwise(col("num_segments")))

    df = df.withColumn("num_layovers", when(col("num_segments") > 0, col("num_segments") - 1).otherwise(0))

    df = df.drop("num_segments")
    return df

In [20]:
df_sampled = calculate_layovers(df_sampled)
# df_sampled.printSchema()
# df_sampled.show(5)

In [21]:
from pyspark.sql.functions import col, when, lit, log

def price_feature_engineering(df):
    df = df.withColumn("fare_difference", col("totalFare") - col("baseFare"))
    df = df.withColumn("fare_ratio", when(col("baseFare") != 0, col("totalFare") / col("baseFare")).otherwise(lit(0)))
    df = df.withColumn("price_per_mile", when(col("totalTravelDistance") != 0, col("totalFare") / col("totalTravelDistance")).otherwise(lit(0)))
    df = df.withColumn("seats_remaining_x_total_fare", col("seatsRemaining") * col("totalFare"))
    df = df.withColumn("days_until_departure_x_total_fare", col("days_until_departure") * col("totalFare"))
    df = df.withColumn("Flight_is_weekend_x_total_fare", col("flight_is_weekend") * col("totalFare"))
    return df



In [22]:
df_sampled = price_feature_engineering(df_sampled)
# df_sampled.printSchema()
# df_sampled.show(5)

In [23]:
from pyspark.sql.functions import concat_ws

def create_leg_routes(df):
    max_segment = max([int(c.split("_")[-1]) for c in df.columns if "arrivalairportcode_" in c])
    for i in range(1, max_segment + 1):
        df = df.withColumn(f"leg_route_{i}", concat_ws("-", col(f"departureairportcode_{i}"), col(f"arrivalairportcode_{i}")))
    return df

df_sampled = create_leg_routes(df_sampled)
# df_sampled.show(5)

In [24]:
from pyspark.sql.functions import concat_ws, collect_list, array_sort, array

def create_airline_combinations(df):
    max_segment = max([int(c.split("_")[-1]) for c in df.columns if "airlinename_" in c])
    df = df.withColumn("airline_combination", array_sort(array(*[col(f"airlinename_{i}") for i in range(1, max_segment + 1)])))
    df = df.withColumn("airline_combination", concat_ws("-", col("airline_combination")))
    return df

df_sampled = create_airline_combinations(df_sampled)
# df_sampled.show(5)

In [25]:
columns_to_drop = [
    "arrivalairportcode_1", "arrivalairportcode_2", "arrivalairportcode_3", "arrivalairportcode_4",
    "departureairportcode_1", "departureairportcode_2", "departureairportcode_3", "departureairportcode_4",
    "airlinename_1", "airlinename_2", "airlinename_3", "airlinename_4"]
df_sampled = df_sampled.drop(*columns_to_drop)
# df_sampled.printSchema()

In [26]:
from pyspark.sql.functions import col, collect_list, array_distinct, explode

def get_unique_cabin_codes(df):
    """Gets all unique cabin codes from segmented cabin code columns."""
    cabin_code_cols = [c for c in df.columns if "cabincode_" in c]
    if not cabin_code_cols:
        print("No cabin code columns found.")
        return []

    unique_codes_df = df.select(explode(array_distinct(array(*[col(c) for c in cabin_code_cols])))).distinct()

    unique_codes = [row[0] for row in unique_codes_df.collect()]
    return unique_codes

unique_cabin_codes = get_unique_cabin_codes(df_sampled)
print(f"Unique Cabin Codes: {unique_cabin_codes}")

Unique Cabin Codes: ['coach', 'premium coach', 'business', 'first', None]


In [27]:
from pyspark.sql.functions import when, lit, greatest, least

def create_cabin_class_features(df):
    """Creates cabin class features (highest, lowest, changes)."""

    unique_cabin_codes = ['coach', 'premium coach', 'business', 'first', None]
    max_segment = max([int(c.split("_")[-1]) for c in df.columns if "cabincode_" in c])

    # Create a mapping from cabin code to index
    cabin_code_map = {code: i + 1 for i, code in enumerate(unique_cabin_codes) if code is not None}
    cabin_code_map[None] = 0

    # Highest Cabin Class - This Creates a feature that indicates the highest cabin class flown on any segment of the flight
    df = df.withColumn("highest_cabin_class_index", lit(0))
    for i in range(1, max_segment + 1):
        df = df.withColumn(f"cabincode_{i}_index", lit(cabin_code_map.get(None)))
        for code, index in cabin_code_map.items():
            df = df.withColumn(f"cabincode_{i}_index", when(col(f"cabincode_{i}") == code, index).otherwise(col(f"cabincode_{i}_index")))
        df = df.withColumn("highest_cabin_class_index", greatest(col("highest_cabin_class_index"), col(f"cabincode_{i}_index")))
        df = df.drop(f"cabincode_{i}_index")

    df = df.withColumn("highest_cabin_class", lit(None).cast("string"))
    for code, index in cabin_code_map.items():
        df = df.withColumn("highest_cabin_class", when(col("highest_cabin_class_index") == index, code).otherwise(col("highest_cabin_class")))

    df = df.drop("highest_cabin_class_index")

    # Lowest Cabin Class - This creates a feature that indicates the lowest cabin class flown
    df = df.withColumn("lowest_cabin_class_index", lit(len(unique_cabin_codes)))
    for i in range(1, max_segment + 1):
        df = df.withColumn(f"cabincode_{i}_index", lit(cabin_code_map.get(None)))
        for code, index in cabin_code_map.items():
            df = df.withColumn(f"cabincode_{i}_index", when(col(f"cabincode_{i}") == code, index).otherwise(col(f"cabincode_{i}_index")))
        df = df.withColumn("lowest_cabin_class_index", least(col("lowest_cabin_class_index"), col(f"cabincode_{i}_index")))
        df = df.drop(f"cabincode_{i}_index")

    df = df.withColumn("lowest_cabin_class", lit(None).cast("string"))
    for code, index in cabin_code_map.items():
        df = df.withColumn("lowest_cabin_class", when(col("lowest_cabin_class_index") == index, code).otherwise(col("lowest_cabin_class")))
    df = df.drop("lowest_cabin_class_index")

    # Cabin Class Changes - This Creates features that count the number of times the cabin class changes during the journey
    df = df.withColumn("cabin_class_changes", lit(0))
    for i in range(1, max_segment):
        df = df.withColumn(f"cabincode_{i}_index", lit(cabin_code_map.get(None)))
        for code, index in cabin_code_map.items():
            df = df.withColumn(f"cabincode_{i}_index", when(col(f"cabincode_{i}") == code, index).otherwise(col(f"cabincode_{i}_index")))
        df = df.withColumn(f"cabincode_{i+1}_index", lit(cabin_code_map.get(None)))
        for code, index in cabin_code_map.items():
            df = df.withColumn(f"cabincode_{i+1}_index", when(col(f"cabincode_{i+1}") == code, index).otherwise(col(f"cabincode_{i+1}_index")))
        df = df.withColumn("cabin_class_changes", col("cabin_class_changes") + when(col(f"cabincode_{i}_index") != col(f"cabincode_{i+1}_index"),1).otherwise(0))
        df = df.drop(f"cabincode_{i}_index", f"cabincode_{i+1}_index")
    return df

df_sampled = create_cabin_class_features(df_sampled)

# df_sampled.show(5)
# df_sampled.printSchema()

In [28]:
columns_to_drop = ["cabincode_1", "cabincode_2", "cabincode_3", "cabincode_4"]
df_sampled = df_sampled.drop(*columns_to_drop)

In [29]:
output_path = "/content/de_output"

# DataFrame to Parquet
df_sampled.coalesce(1).write.mode("overwrite").parquet(output_path)