In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import os

In [3]:
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

# Create a Spark session
spark = SparkSession.builder \
    .appName("ML Pipeline") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "4") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

# Google Cloud Storage path
gcs_path = 'gs://my-bigdatatech-project-jl/cleaned/Data_Cleaned.parquet'

# Read the Parquet file from GCS
df_spark = spark.read.parquet(gcs_path)

# Show the DataFrame schema and the first few rows
df_spark.printSchema()
df_spark.show()

NameError: name 'spark' is not defined

In [None]:


# Filter the Spark DataFrame (example condition)
filtered_df = df_spark.filter(df_spark.totalFare.isNotNull() & df_spark.segmentsEquipmentDescription.isNotNull())

# Limit to 10 rows
limited_df = filtered_df.limit(50)

# Convert to Pandas DataFrame
pandas_df = limited_df.toPandas()

In [None]:


df_spark.dtypes

In [None]:


from pyspark.sql.functions import to_date

# convert date columns to actual date data type
df_spark = df_spark.withColumn('searchDate', to_date(df_spark.searchDate, 'yyyy-MM-dd'))
df_spark = df_spark.withColumn('flightDate', to_date(df_spark.flightDate, 'yyyy-MM-dd'))

In [None]:


from pyspark.sql.functions import date_format
from pyspark.sql.functions import col

# get year-month variable 
df_spark = df_spark.withColumn("searchdate_yearmonth", date_format(col("searchDate"), "yyyy-MM"))
df_spark = df_spark.withColumn('flightdate_yearmonth', date_format(col("flightDate"), "yyyy-MM"))

In [None]:


from pyspark.sql.functions import dayofweek

# get day of week as a number 
df_spark = df_spark.withColumn("searchDate_dayofweek", dayofweek(col("searchDate")) )
df_spark = df_spark.withColumn("flightDate_dayofweek", dayofweek(col("flightDate")) )

In [None]:


# get day of week as name e.g. Monday
df_spark = df_spark.withColumn("searchDate_dayofweekname", date_format(col("searchDate"), "E"))
df_spark = df_spark.withColumn("flightDate_dayofweekname", date_format(col("flightDate"), "E"))

In [None]:


from pyspark.sql.functions import when

# check if search or flight date falls on a weekend
df_spark = df_spark.withColumn("searchDate_weekend", when(df_spark.searchDate_dayofweek == 1, 1.0).when(df_spark.searchDate_dayofweek == 7, 1.0).otherwise(0))
# check if search or flight date falls on a weekend
df_spark = df_spark.withColumn("flightDate_weekend", when(df_spark.flightDate_dayofweek == 1, 1.0).when(df_spark.flightDate_dayofweek == 7, 1.0).otherwise(0))

In [None]:


from pyspark.sql.functions import col, when

# Define a list of holiday dates
holidays = [
    '2024-01-01',  # New Year's Day
    '2024-07-04',  # Independence Day
    '2024-12-25',  # Christmas Day
    '2024-11-28' # Thanksgiving Day
]

# Add a column to indicate if the flightDate is a holiday
df_spark = df_spark.withColumn(
    "isHoliday", 
    when(col("flightDate").cast("string").isin(holidays), 1).otherwise(0)
)

In [None]:


# Holiday Indicator
df_spark = df_spark.withColumn("isHoliday", when(col("flightDate").cast("string").isin(holidays), 1).otherwise(0))

In [None]:


from pyspark.sql.functions import month

# extract the month from flightDate and create a new column flightMonth
df_spark = df_spark.withColumn("flightMonth", month(col("flightDate")))

# create the season column based on the month
df_spark = df_spark.withColumn(
    "season",
    when(col("flightMonth").isin(6, 7, 8), "Summer")
    .when(col("flightMonth").isin(12, 1, 2), "Winter")
    .when(col("flightMonth").isin(3, 4, 5), "Spring")
    .otherwise("Fall")
)

In [None]:


from pyspark.sql.functions import lit, datediff

# 5. Flight Timing
df_spark = df_spark.withColumn("flightTiming", when((col("segmentsDepartureTimeRaw") >= lit("18:00:00")) &
                                            (col("segmentsArrivalTimeRaw") <= lit("06:00:00")), "Overnight")
                                       .otherwise("Daytime"))

# 6. Proximity of Booking
df_spark = df_spark.withColumn("daysUntilFlight", datediff("flightDate", "searchDate")) \
           .withColumn("bookingProximity", when(col("daysUntilFlight") <= 1, "Last Minute")
                                           .when((col("daysUntilFlight") > 1) & (col("daysUntilFlight") <= 7), "Within a Week")
                                           .otherwise("Planned in Advance"))

In [None]:


# check transformations
df_spark.select("flightTiming", "daysUntilFlight", "bookingProximity").show(10)

In [None]:


from pyspark.sql.functions import col

# Converting boolean columns to integer - 0 or 1 - in PySpark
df_spark = df_spark.withColumn('isBasicEconomy', col('isBasicEconomy').cast('int'))
df_spark = df_spark.withColumn('isRefundable', col('isRefundable').cast('int'))
df_spark = df_spark.withColumn('isNonStop', col('isNonStop').cast('int'))

In [None]:


df_spark.select("isNonStop").show(10)

In [None]:


# define seat availability volumes
df_spark = df_spark.withColumn(
    'seatAvailabilityCategory',
    when(col('seatsRemaining') <= 20, 'low')
    .when((col('seatsRemaining') > 20) & (col('seatsRemaining') <= 100), 'medium')
    .otherwise('high')
)

df_spark.select('seatsRemaining', 'seatAvailabilityCategory').show()

In [None]:


from pyspark.sql.functions import col

# correct columns with right data types

df_spark = df_spark.withColumn("travelDuration", col("travelDuration").cast("float")) \
                   .withColumn("segmentsDurationInSeconds", col("segmentsDurationInSeconds").cast("int")) \
                   .withColumn("segmentsDistance", col("segmentsDistance").cast("int"))

In [None]:


from pyspark.sql.functions import col

# ensure columns are numeric
df_spark = df_spark.withColumn("elapsedDays", col("elapsedDays").cast("int")) \
           .withColumn("isBasicEconomy", col("isBasicEconomy").cast("int")) \
           .withColumn("isRefundable", col("isRefundable").cast("int")) \
           .withColumn("isNonStop", col("isNonStop").cast("int")) \
           .withColumn("baseFare", col("baseFare").cast("float")) \
           .withColumn("seatsRemaining", col("seatsRemaining").cast("int")) \
           .withColumn("totalTravelDistance", col("totalTravelDistance").cast("int")) \
           .withColumn("segmentsDurationInSeconds", col("segmentsDurationInSeconds").cast("int")) \
           .withColumn("segmentsDistance", col("segmentsDistance").cast("int"))

In [None]:
# Outlier detection for 'totalFare' using IQR
quantiles = df_spark.approxQuantile("totalFare", [0.25, 0.75], 0.01)
Q1, Q3 = quantiles
IQR = Q3 - Q1

# Define bounds for outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter out outliers
df_spark = df_spark.filter((col("totalFare") >= lower_bound) & (col("totalFare") <= upper_bound))

# Display summary statistics for verification
df_spark.select("totalFare").describe().show()

In [None]:


from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, FloatType, LongType

In [None]:


schema = StructType([
    StructField("legId", StringType(), True),
    StructField("searchDate", DateType(), True),
    StructField("flightDate", DateType(), True),
    StructField("startingAirport", StringType(), True),
    StructField("destinationAirport", StringType(), True),
    StructField("fareBasisCode", StringType(), True),
    StructField("travelDuration", StringType(), True),
    StructField("elapsedDays", IntegerType(), True),
    StructField("isBasicEconomy", IntegerType(), True),
    StructField("isRefundable", IntegerType(), True),
    StructField("isNonStop", IntegerType(), True),
    StructField("baseFare", FloatType(), True),
    StructField("totalFare", FloatType(), True),
    StructField("seatsRemaining", IntegerType(), True),
    StructField("totalTravelDistance", IntegerType(), True),
    StructField("segmentsDepartureTimeEpochSeconds", LongType(), True),
    StructField("segmentsArrivalTimeEpochSeconds", LongType(), True),
    StructField("segmentsDurationInSeconds", IntegerType(), True),
    StructField("segmentsDistance", IntegerType(), True),
    StructField("segmentsCabinCode", StringType(), True)
])

In [None]:


from pyspark.sql.functions import isnan

sampled_df = df_spark.sample(fraction=0.01, seed=42)

# split data into training and testing sets
train_data, test_data = sampled_df.randomSplit([0.7, 0.3], seed=42)

# remove nan values
train_data = train_data.filter(~isnan("totalFare"))
test_data = test_data.filter(~isnan("totalFare"))

# features for the model
feature_columns = ['isRefundable', 'isBasicEconomy', 'isNonStop', 'searchDate_weekend', 
                   'flightDate_weekend', 'isHoliday', 'season', 'flightTiming', 'bookingProximity']

feature_columns = [
    col for col in feature_columns if train_data.select(col).distinct().count() > 1
]

numeric_features = [col for col in feature_columns if train_data.schema[col].dataType.typeName() != 'string']

# index and encode categorical features
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "Index") for col in feature_columns if col not in numeric_features
]
encoders = [
    OneHotEncoder(inputCol=col + "Index", outputCol=col + "Vector") for col in feature_columns if col not in numeric_features
]

# assembling features into a vector
assembler = VectorAssembler(
    inputCols=[col + "Vector" for col in feature_columns if col not in numeric_features] + numeric_features,
    outputCol="features"
)

# linear Regression model
lr = LinearRegression(labelCol="totalFare")

# create Pipeline 
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) 
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  
    .addGrid(lr.maxIter, [10, 50, 100])  
    .build()
)

evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")

crossval = CrossValidator(
    estimator=pipeline,  
    estimatorParamMaps=paramGrid,  
    evaluator=evaluator, 
    numFolds=3, 
    parallelism=2 
)

cv_model = crossval.fit(train_data)

# make predictions
predictions = cv_model.transform(test_data)

predictions = predictions.filter(~isnan("prediction") & ~isnan("totalFare"))

# evaluate the model
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})

print(f"RMSE: {rmse}")
print(f"R-squared: {r2}")


predictions.select("flightDate", "totalFare", "prediction").show(10, truncate=False)

In [None]:


# transform the training data
transformed_train_data = model.transform(train_data)

# save the transformed data to the /trusted folder
transformed_train_data.write.parquet("gs://my-bigdatatech-project-jl/trusted/processed_data", mode='overwrite')

print('transformed training data saved')

# save the LR model to the /models folder
model.save("gs://my-bigdatatech-project-jl/models/linear_regression_model")

print('LR model saved')

In [None]:


# Stop Spark session
spark.stop()