In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, BooleanType, DateType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import dayofweek, hour

# Schema
schema = StructType([
    StructField("flightDate", DateType()),
    StructField("startingAirport", StringType()),
    StructField("destinationAirport", StringType()),
    StructField("isBasicEconomy", BooleanType()),
    StructField("isNonStop", BooleanType()),
    StructField("segmentsAirlineName", StringType()),
    StructField("baseFare", DoubleType()),
    StructField("totalFare", DoubleType()),
    StructField("totalTravelDistance", DoubleType()),
    StructField("travelDurationMinutes", IntegerType())
])

df = spark.read.schema(schema).parquet("gs://project-bucket-kl/cleaned/cleaned_data.parquet")

# Feature Engineering
df = df.withColumn("dayOfWeek", dayofweek(df.flightDate))
df = df.withColumn("hourOfDay", hour(df.flightDate))
df = df.withColumn("isBasicEconomy", df.isBasicEconomy.cast(StringType()))
df = df.withColumn("isNonStop", df.isNonStop.cast(StringType()))

# Indexers and Encoders for categorical features
categoricalColumns = ["startingAirport", "destinationAirport", "isBasicEconomy", "isNonStop", "segmentsAirlineName"]
indexers = [StringIndexer(inputCol=column, outputCol=column + "_indexed") for column in categoricalColumns]
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[indexer.getOutputCol().replace("_indexed", "_encoded") for indexer in indexers]
)

# Numeric Columns to Vectors
numeric_features = ["baseFare", "totalTravelDistance", "travelDurationMinutes"]
vector_assemblers = [VectorAssembler(inputCols=[feature], outputCol=feature + "Vec") for feature in numeric_features]

# Normalize numeric columns using MinMaxScaler
scalers = [MinMaxScaler(inputCol=feature + "Vec", outputCol=feature + "_scaled") for feature in numeric_features]

# Assemble all features into a single vector
assembler = VectorAssembler(
    inputCols=[enc for enc in encoder.getOutputCols()] + [scaler.getOutputCol() for scaler in scalers],
    outputCol="features"
)

# Pipeline
pipeline = Pipeline(stages=indexers + [encoder] + vector_assemblers + scalers + [assembler])
df_transformed = pipeline.fit(df).transform(df)

#Save Transformed Data
df_transformed.write.mode('overwrite').parquet("gs://project-bucket-kl/trusted/feature_engineered_data.parquet")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, BooleanType, DateType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import dayofweek, hour

# Define schema
schema = StructType([
    StructField("flightDate", DateType()),
    StructField("startingAirport", StringType()),
    StructField("destinationAirport", StringType()),
    StructField("isBasicEconomy", BooleanType()),
    StructField("isNonStop", BooleanType()),
    StructField("segmentsAirlineName", StringType()),
    StructField("baseFare", DoubleType()),
    StructField("totalFare", DoubleType()),
    StructField("totalTravelDistance", DoubleType()),
    StructField("travelDurationMinutes", IntegerType())
])

df = spark.read.schema(schema).parquet("gs://project-bucket-kl/trusted/feature_engineered_data.parquet")

print("Defining and training the model...")
# Model definition
lr = LinearRegression(featuresCol='features', labelCol='totalFare', regParam=0.1, elasticNetParam=0.5, maxIter=50)

# Split data into training and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = lr.fit(train_data)

print("Making predictions and evaluating the model...")
# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

print("Saving the model...")
# Save the model
model.write().overwrite().save("gs://project-bucket-kl/models/flight_price_model")

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import round

data_path = "gs://project-bucket-kl/trusted/feature_engineered_data.parquet"
df = spark.read.parquet(data_path)

# Define Linear Regression model and parameter grid
lr = LinearRegression(featuresCol='features', labelCol='totalFare')
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .build()

# Define Evaluator
evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")

# CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5,
                    seed=42)

# Split data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Run cross-validation
cvModel = cv.fit(train_data)

# Get Best Model
bestModel = cvModel.bestModel

# Predictions on test data
predictions = bestModel.transform(test_data)

# Evaluate final model
final_rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {final_rmse}")

# Hyperparameters of best model
print(f"Best Model Hyperparameters:")
print(f" - regParam: {bestModel._java_obj.getRegParam()}")
print(f" - elasticNetParam: {bestModel._java_obj.getElasticNetParam()}")
print(f" - maxIter: {bestModel._java_obj.getMaxIter()}")

# Save the best model
final_model_path = "gs://project-bucket-kl/models/flight_price_best_model"
bestModel.write().overwrite().save(final_model_path)
print(f"Model saved successfully at {final_model_path}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import dayofweek, round

data = spark.read.parquet("gs://project-bucket-kl/trusted/feature_engineered_data.parquet")
model_path = "gs://project-bucket-kl/models/flight_price_model"
model = LinearRegressionModel.load(model_path)

predictions = model.transform(data)

# Round the prediction results to the nearest hundredth
predictions = predictions.withColumn("roundedPrediction", round(predictions.prediction, 2))

# Display
predictions.select(
    "totalFare",
    "roundedPrediction",
    "startingAirport", 
    "destinationAirport",
    "totalTravelDistance",
    "travelDurationMinutes",
    "flightDate",
    "segmentsAirlineName",
    "dayOfWeek"
).show(10)

# Evaluate
evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="roundedPrediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on the data = {rmse}")