In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=0d70184d81fb2d912094cdc466fb5a1f6467a110d654d2c123b383930950de70
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
!pip install sparkxgb

Collecting sparkxgb
  Downloading sparkxgb-0.1.tar.gz (3.6 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyspark==3.1.1 (from sparkxgb)
  Downloading pyspark-3.1.1.tar.gz (212.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.3/212.3 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.1.1->sparkxgb)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m23.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: sparkxgb, pyspark
  Building wheel for sparkxgb (setup.py) ... [?25l[?25hdone
  Created wheel for sparkxgb: filename=sparkxgb-0.1-py3-none-any.whl size=5629 sha256=8c46aa23d8595d137ad4b0b6c0d7f37a54f92c141afe55c1759e3ac672366dd4
  Stored in directory: /root/.cache/pip/wheels/b7/0c/a1/786408e13056fabeb8a72134e101b1e142fc95905c7b0e2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, isnan
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, dayofyear
from pyspark.sql.functions import concat_ws, col, dayofyear

#-------
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import PipelineModel



# Define a function to save the model to Google Drive
def save_model_to_drive(model, model_name):
    # Specify the path to the folder where you want to save the model
    model_path = f"/content/drive/MyDrive/SENG550/models/{model_name}"
    model.write().overwrite().save(model_path)



# Initialize Spark session
spark = SparkSession.builder.appName("FlightDelayPred").getOrCreate()

# Load the data
data_path = "/content/drive/MyDrive/SENG550/2007.csv"
df1 = spark.read.csv(data_path, header=True, inferSchema=True)
airportDf = spark.read.csv("/content/drive/MyDrive/SENG550/airports.csv", header=True, inferSchema=True)


df2 = df1.select("Year", "Month", "DayofMonth", "DayOfWeek", "DepTime", "CRSDepTime", "ArrTime", "ArrDelay", "DepDelay", "CRSArrTime", "Distance", "Cancelled", "Origin", "Dest")

df2 = df2.join(airportDf, df2["Origin"] == airportDf["iata"], "left")\
    .withColumnRenamed("lat", "OriginLat") \
    .withColumnRenamed("long", "OriginLong")

df2 = df2.drop("iata", "airport", "city", "state", "country", "lat", "long")

df2 = df2.join(airportDf, df2["Dest"] == airportDf["iata"], "left")\
    .withColumnRenamed("lat", "DestLat") \
    .withColumnRenamed("long", "DestLong")

df2 = df2.drop("iata", "airport", "city", "state", "country", "lat", "long", "Origin", "Dest")

# Remove rows with null or NaN values in target column
df2 = df2.filter(df2.ArrDelay.isNotNull() & (~isnan(df2.ArrDelay)) & (df2.ArrDelay != "NULL"))
df2 = df2.filter(df2["Cancelled"] == 0)
df2 = df2.drop("Cancelled")

df = df2\
    .withColumn("ArrTime", df2["ArrTime"].cast(FloatType()))\
    .withColumn("DepDelay", df2["DepDelay"].cast(FloatType()))\
    .withColumn("DepTime", df2["DepTime"].cast(FloatType()))\
    .withColumn("ArrDelay", df2["ArrDelay"].cast(FloatType()))\
    .withColumn("Distance", df2["Distance"].cast(FloatType()))


df = df.withColumn("DayOfYear", dayofyear(concat_ws("-", col("Year"), col("Month"), col("DayofMonth"))))

# Split the data into training and test sets (e.g., 75% training and 25% testing)
train_df, test_df = df.randomSplit([0.75, 0.25], seed=42)

# Selecting the features and target variable
features = ["Year", "Month", "DayofMonth", "DayOfWeek", "DepTime", "CRSDepTime", "ArrTime",
            "DepDelay", "CRSArrTime", "Distance", "OriginLat", "OriginLong", "DestLat", "DestLong"]
target = 'ArrDelay'

# Imputer replaces missing values in feature columns with their respective mean values
imputer = Imputer(
    inputCols = features,
    strategy = 'mean'
)


"""
#------ EDA: Basic Statistical Summaries
# Display the schema to understand the data types
df.printSchema()

# Show summary statistics for your dataframe
df.describe().show()
#------

#------ EDA: Distribution of Numeric Features
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame for visualization
pdf = df.select('DepTime', 'Distance', 'ArrDelay').toPandas()

# Plotting histograms
plt.figure(figsize=(12, 5))
for i, col in enumerate(pdf.columns):
    plt.subplot(1, 3, i+1)
    sns.histplot(pdf[col], kde=True)
    plt.title(f'Distribution of {col}')
plt.tight_layout()
plt.show()
#------


#------ EDA: Correlation Matrix
# Compute correlation matrix
corr_matrix = pdf.corr()

# Plotting heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
plt.title("Correlation Matrix")
plt.show()
#------
"""


# ---------
# Use Chi-Squared Selector to select a subset of features before polynomial expansion
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol=target)
# ---------



# VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=features, outputCol="features", handleInvalid="skip")

# StandardScaler to standardize features (turns everything to the scale of -3 to +3)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Update your models to use the scaled features
rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol=target)
lr = LinearRegression(featuresCol="scaledFeatures", labelCol=target)
gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol=target)

# Polynomial Expansion for degree 5
# polyExpansion = PolynomialExpansion(degree=5, inputCol="scaledFeatures", outputCol="polyFeatures")
polyExpansion = PolynomialExpansion(degree=3, inputCol="scaledFeatures", outputCol="polyFeatures")
poly_lr = LinearRegression(featuresCol="polyFeatures", labelCol=target)

#-------- Define the GBT model with reduced complexity
gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol=target, maxIter=10, maxDepth=5)
#--------

# Update Pipelines to include scaler
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])
lr_pipeline = Pipeline(stages=[assembler, scaler, lr])
gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt])
# gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol=target, maxIter=10, maxDepth=5)
# poly_pipeline = Pipeline(stages=[assembler, scaler, polyExpansion, poly_lr])
poly_pipeline = Pipeline(stages=[assembler, scaler, selector, polyExpansion, poly_lr])


# List of models to train and evaluate
models = [rf_pipeline,lr_pipeline,gbt_pipeline,poly_pipeline]

for model in models:

    # Print the RMSE and R2
    if model == poly_pipeline:
        model_name = "Polynomial Linear Regression"
    else:
        model_name = model.getStages()[-1].__class__.__name__

    # Train the model on the training set
    trained_model = model.fit(train_df)

    # Save the trained model to Google Drive
    save_model_to_drive(trained_model, model_name)

    # Make predictions on the test set
    predictions = trained_model.transform(test_df)

    # Evaluate the model for RMSE
    rmse_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
    rmse = rmse_evaluator.evaluate(predictions)

    # Evaluate the model for R2
    r2_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
    r2 = r2_evaluator.evaluate(predictions)

    # # Print the RMSE and R2
    # if model == poly_pipeline:
    #     model_name = "Polynomial Linear Regression"
    # else:
    #     model_name = model.getStages()[-1].__class__.__name__
    print(f"{model_name} - Root Mean Squared Error (RMSE): {rmse}, R2: {r2}")


# Stop the Spark session
spark.stop()


RandomForestRegressor - Root Mean Squared Error (RMSE): 17.955249125537758, R2: 0.790520508179235
LinearRegression - Root Mean Squared Error (RMSE): 14.245497626168719, R2: 0.8681397512953777
GBTRegressor - Root Mean Squared Error (RMSE): 16.77583648018136, R2: 0.8171365076661519
Polynomial Linear Regression - Root Mean Squared Error (RMSE): 10.069900164650715, R2: 0.934111598570097
