In [0]:
# All Imports
from pyspark.sql.functions import to_timestamp, unix_timestamp, col, mean, max, hour, dayofweek, when
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import DecisionTreeClassifier
import os
from pyspark.ml.regression import RandomForestRegressor

# Task 1 - Classification

In [0]:
# Part 1: Data Preparation

# 1-a) Load Dataset
df = spark.table("default.yellow_tripdata_2015_01").limit(100000)

df = df.withColumn("passenger_count", col("passenger_count").cast("int"))
df = df.withColumn("trip_distance", col("trip_distance").cast("double"))
df = df.withColumn("fare_amount", col("fare_amount").cast("double"))
df = df.withColumn("tip_amount", col("tip_amount").cast("double"))
df = df.withColumn("mta_tax", col("mta_tax").cast("double"))
df = df.withColumn("extra", col("extra").cast("double"))
df = df.withColumn("total_amount", col("total_amount").cast("double"))
df = df.withColumn("tolls_amount", col("tolls_amount").cast("double"))

df = df.withColumn(
    "tpep_pickup_datetime",
    to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss")
)
df = df.withColumn(
    "tpep_dropoff_datetime",
    to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss")
)

df = df.withColumn("pickup_latitude", col("pickup_latitude").cast("double"))
df = df.withColumn("pickup_longitude", col("pickup_longitude").cast("double"))
df = df.withColumn("dropoff_latitude", col("dropoff_latitude").cast("double"))
df = df.withColumn("dropoff_longitude", col("dropoff_longitude").cast("double"))
df = df.withColumn("payment_type", col("payment_type").cast("int"))

# 1-b) Data Exploration & Cleaning
df.printSchema()
df.select("fare_amount", "trip_distance", "passenger_count").describe().show()

df = df.dropna(subset=["fare_amount", "trip_distance", "passenger_count"])
df = df.filter((df.fare_amount > 0) & (df.trip_distance != 0))

# 1-c) Feature Engineering
df = df.withColumn(
    "trip_duration",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0
)  # minutes
df = df.filter(df.trip_duration > 0)

df = df.withColumn("trip_speed", col("trip_distance") / (col("trip_duration") / 60.0))  # distance/hour
df = df.filter(df.trip_speed > 0)

# Time-based features explicitly (as suggested in assignment)
df = df.withColumn("pickup_hour", hour("tpep_pickup_datetime"))
df = df.withColumn("pickup_day_of_week", dayofweek("tpep_pickup_datetime"))

df.show(5)

# 1-d) Target Variable Creation
df = df.withColumn("high_fare", when(col("fare_amount") > 20, 1).otherwise(0))

# 1-e) Data Splitting (70/30)
train_df, test_df = df.randomSplit([0.7, 0.3], seed=777)
test_df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+-------+------------------+------------------+------------------+
|summary|       fare_amount|     trip_distance|   

In [0]:
# Pipeline 1: Decision Tree Classifier

# 2-a) Define pipeline stages
feature_cols = [
    "payment_type",
    "passenger_count",
    "trip_distance",
    "trip_duration",
    "trip_speed",
    "pickup_hour",
    "pickup_day_of_week"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

dt = DecisionTreeClassifier(
    labelCol="high_fare",
    featuresCol="features"
)

pipeline = Pipeline(stages=[assembler, dt])

# 2-b) Hyperparameter tuning with CrossValidator 

# imp note for TA: I'm using an existing Volume path for Spark ML temp storage based on my structure in the workspace
dbutils.fs.mkdirs("/Volumes/workspace/default/data/cv_temp")
os.environ["SPARKML_TEMP_DFS_PATH"] = "/Volumes/workspace/default/data/cv_temp"

paramGrid = (
    ParamGridBuilder()
    .addGrid(dt.maxDepth, [3, 5, 10])             
    .addGrid(dt.minInstancesPerNode, [1, 5, 10])  
    .build()
)

cv_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="f1"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=cv_evaluator,
    numFolds=3,
    parallelism=2
)

# 2-c) Model training (on training data)
cvModel = cv.fit(train_df)

#best pipeline
best_pipeline_model = cvModel.bestModel

# 2-d) Model evaluation on test data
pred_test = best_pipeline_model.transform(test_df)

pred_test.select("high_fare", "prediction", "features").show(10, truncate=False)

#F1
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = f1_evaluator.evaluate(pred_test)

#Precision
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="precisionByLabel"
)
precision = precision_evaluator.evaluate(pred_test)

#recall
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="recallByLabel"
)
recall = recall_evaluator.evaluate(pred_test)

print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# 2-e) Save the trained pipeline 
best_pipeline_model.write().overwrite().save("/Volumes/workspace/default/data/dt_high_fare_pipeline")

+---------+----------+------------------------------------------------------------+
|high_fare|prediction|features                                                    |
+---------+----------+------------------------------------------------------------+
|0        |0.0       |[1.0,1.0,1.5,9.15,9.836065573770492,0.0,5.0]                |
|0        |0.0       |[2.0,1.0,1.6,20.533333333333335,4.675324675324675,0.0,5.0]  |
|0        |0.0       |[2.0,4.0,0.5,3.9166666666666665,7.659574468085106,0.0,5.0]  |
|0        |0.0       |[2.0,1.0,1.4,7.7,10.909090909090908,0.0,5.0]                |
|0        |0.0       |[1.0,1.0,0.8,4.466666666666667,10.746268656716417,0.0,5.0]  |
|1        |1.0       |[2.0,1.0,16.9,25.966666666666665,39.050064184852374,0.0,5.0]|
|0        |0.0       |[2.0,1.0,0.3,1.95,9.23076923076923,0.0,5.0]                 |
|0        |0.0       |[1.0,1.0,1.6,7.516666666666667,12.771618625277164,0.0,5.0]  |
|0        |0.0       |[2.0,1.0,3.5,10.666666666666666,19.687500000000004,0.0

In [0]:
# Pipeline 2: Logistic Regression Pipeline

# 3-a) Define pipeline stages
feature_cols = [
    "payment_type",
    "passenger_count",
    "trip_distance",
    "trip_duration",
    "trip_speed",
    "pickup_hour",
    "pickup_day_of_week"
]

assembler_lr = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

logr = LogisticRegression(
    labelCol="high_fare",
    featuresCol="features"
)

pipeline_lr = Pipeline(stages=[assembler_lr, logr])

# 3-b) Hyperparameter tuning with CrossValidator
cv_tmp_path = "/Volumes/workspace/default/data/cv_temp"
dbutils.fs.mkdirs(cv_tmp_path)
os.environ["SPARKML_TEMP_DFS_PATH"] = cv_tmp_path

paramGrid_lr = (
    ParamGridBuilder()
    .addGrid(logr.regParam, [0.01, 0.1, 0.5])
    .addGrid(logr.maxIter, [10, 50, 100])
    .build()
)

#for CV we can optimize AUC
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="high_fare",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

cv_lr = CrossValidator(
    estimator=pipeline_lr,
    estimatorParamMaps=paramGrid_lr,
    evaluator=binary_evaluator,
    numFolds=5,
    parallelism=2
)

# 3-c) Train using cross-validation
cv_lr_model = cv_lr.fit(train_df)

# Best pipeline
best_lr_pipeline = cv_lr_model.bestModel

#inspect chosen hyperparameters
best_lr = best_lr_pipeline.stages[-1]  # last stage is LogisticRegression
print("Best regParam:", best_lr.getRegParam())
print("Best maxIter:", best_lr.getMaxIter())

# 3-d) Evaluate on the test set
predictions = best_lr_pipeline.transform(test_df)

predictions.select("high_fare", "prediction", "probability").show(10, truncate=False)

# F1. precision, recall
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = f1_evaluator.evaluate(predictions)
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="precisionByLabel"
)
precision = precision_evaluator.evaluate(predictions)
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_fare",
    predictionCol="prediction",
    metricName="recallByLabel"
)
recall = recall_evaluator.evaluate(predictions)
auc = binary_evaluator.evaluate(predictions)

print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"AUC: {auc:.4f}")

# 3-e) Save the trained Logistic Regression pipeline
best_lr_pipeline.write().overwrite().save("/Volumes/workspace/default/data/lr_high_fare_pipeline")

Best regParam: 0.01
Best maxIter: 10
+---------+----------+----------------------------------------+
|high_fare|prediction|probability                             |
+---------+----------+----------------------------------------+
|0        |0.0       |[0.8767614432379037,0.12323855676209627]|
|0        |0.0       |[0.8701200579501847,0.12987994204981534]|
|0        |0.0       |[0.9114608806128656,0.08853911938713444]|
|0        |0.0       |[0.9063710717992971,0.0936289282007029] |
|0        |0.0       |[0.890556815687598,0.10944318431240196] |
|1        |0.0       |[0.851470756616728,0.148529243383272]   |
|0        |0.0       |[0.9194599477482274,0.08054005225177263]|
|0        |0.0       |[0.8817347017197817,0.11826529828021826]|
|0        |0.0       |[0.8988961795479969,0.10110382045200306]|
|0        |0.0       |[0.910678044866618,0.08932195513338204] |
+---------+----------+----------------------------------------+
only showing top 10 rows
F1 Score: 0.8343
Precision: 0.8870
Recall:

# Task 2: Regression

In [0]:
# Part 1: Data Prep

# 1-a) Load Dataset (new DataFrame)
# Use the same table, but create a separate DataFrame for regression
df_reg = spark.table("default.yellow_tripdata_2015_01").limit(100000)
df_reg = df_reg.withColumn("passenger_count", col("passenger_count").cast("int"))
df_reg = df_reg.withColumn("trip_distance", col("trip_distance").cast("double"))
df_reg = df_reg.withColumn("fare_amount", col("fare_amount").cast("double"))
df_reg = df_reg.withColumn(
    "tpep_pickup_datetime",
    to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss")
)
df_reg = df_reg.withColumn(
    "tpep_dropoff_datetime",
    to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss")
)

df_reg.printSchema()


# 1-b) Feature Engineering

# remove missing or clearly invalid values
df_reg = df_reg.dropna(subset=["fare_amount", "trip_distance", "passenger_count"])
df_reg = df_reg.filter((col("fare_amount") > 0) & (col("trip_distance") > 0))

# Trip duration in minutes
df_reg = df_reg.withColumn(
    "trip_duration",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0
)

df_reg = df_reg.filter(col("trip_duration") > 0)
#filter out some outliers
df_reg = df_reg.filter(col("fare_amount") < 300)
df_reg = df_reg.filter(col("trip_distance") < 100)
df_reg = df_reg.filter(col("trip_duration") < 300)

# Time-based features
df_reg = df_reg.withColumn("pickup_hour", hour("tpep_pickup_datetime"))
df_reg = df_reg.withColumn("pickup_day_of_week", dayofweek("tpep_pickup_datetime"))

# 1-c) Train/Test Split for Regression
train_reg_df, test_reg_df = df_reg.randomSplit([0.7, 0.3], seed=42)

print("Training rows (regression):", train_reg_df.count())
print("Test rows (regression):", test_reg_df.count())

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

Training rows (regression): 69336
Test rows (regression): 29866


In [0]:
# Pipeline 1: Linear Regression for fare_amount

# 2-a) Define Pipeline Stages

# Feature columns for reg
reg_feature_cols = [
    "trip_distance",
    "trip_duration",
    "pickup_hour",
    "pickup_day_of_week",
    "passenger_count"
]
assembler_reg = VectorAssembler(
    inputCols=reg_feature_cols,
    outputCol="features"
)
# Scale features
scaler_reg = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

# Linear Regression model
lr_reg = LinearRegression(
    labelCol="fare_amount",
    featuresCol="scaled_features"
)

pipeline_reg = Pipeline(stages=[assembler_reg, scaler_reg, lr_reg])

# 2-b) Hyperparameter Tuning with CrossValidator

# same path as before
cv_tmp_path_reg = "/Volumes/workspace/default/data/cv_temp"
dbutils.fs.mkdirs(cv_tmp_path_reg)
os.environ["SPARKML_TEMP_DFS_PATH"] = cv_tmp_path_reg

paramGrid_reg = (
    ParamGridBuilder()
    .addGrid(lr_reg.regParam, [0.0, 0.01, 0.1])
    .addGrid(lr_reg.maxIter, [50, 100])
    .build()
)

rmse_evaluator_cv = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="rmse"
)

cv_reg = CrossValidator(
    estimator=pipeline_reg,
    estimatorParamMaps=paramGrid_reg,
    evaluator=rmse_evaluator_cv,
    numFolds=2,
    parallelism=2
)

# 2-c) Train the model on training data

cv_reg_model = cv_reg.fit(train_reg_df)

#best pipeline
best_reg_pipeline = cv_reg_model.bestModel
best_lr_reg = best_reg_pipeline.stages[-1]

print("Best Linear Regression hyperparameters:")
print("regParam:", best_lr_reg.getRegParam())
print("maxIter:", best_lr_reg.getMaxIter())

# 2-d) Evaluate on test data

pred_reg_test = best_reg_pipeline.transform(test_reg_df)

pred_reg_test.select(
    "fare_amount", "prediction",
    "trip_distance", "trip_duration", "pickup_hour", "pickup_day_of_week"
).show(10, truncate=False)

# RMSE, r2
rmse_evaluator = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = rmse_evaluator.evaluate(pred_reg_test)
r2_evaluator = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="r2"
)
r2 = r2_evaluator.evaluate(pred_reg_test)

print(f"RMSE: {rmse:.4f}")
print(f"R2 : {r2:.4f}")

# 2-e) Save the trained regression pipeline
best_reg_pipeline.write().overwrite().save("/Volumes/workspace/default/data/linreg_fare_pipeline")

Best Linear Regression hyperparameters:
regParam: 0.0
maxIter: 50
+-----------+------------------+-------------+------------------+-----------+------------------+
|fare_amount|prediction        |trip_distance|trip_duration     |pickup_hour|pickup_day_of_week|
+-----------+------------------+-------------+------------------+-----------+------------------+
|4.5        |4.6975675799699586|0.5          |3.9166666666666665|0          |5                 |
|10.5       |8.45206841005853  |0.7          |16.633333333333333|0          |5                 |
|10.0       |10.932618385284302|2.5          |10.816666666666666|0          |5                 |
|8.7        |8.16296098444009  |1.0          |12.816666666666666|0          |5                 |
|21.2       |21.073674714402898|5.3          |26.35             |0          |5                 |
|12.2       |13.330770556957185|3.6          |10.766666666666667|0          |5                 |
|5.0        |4.943365459334104 |0.4          |5.4666666666666

In [0]:
# Pipeline 2: Random Forest Regressor Pipeline

# 2-a) Define pipeline
rf_feature_cols = [
    "trip_distance",
    "trip_duration",
    "pickup_hour",
    "pickup_day_of_week",
    "passenger_count"
]
assembler_rf = VectorAssembler(
    inputCols=rf_feature_cols,
    outputCol="features"
)
rf_reg = RandomForestRegressor(
    labelCol="fare_amount",
    featuresCol="features",
    seed=42
)
rf_pipeline = Pipeline(stages=[assembler_rf, rf_reg])

# 2-b) Hyperparameter Tuning

dbutils.fs.mkdirs("/Volumes/workspace/default/data/cv_temp")
os.environ["SPARKML_TEMP_DFS_PATH"] = "/Volumes/workspace/default/data/cv_temp"

paramGrid_rf = (
    ParamGridBuilder()
    .addGrid(rf_reg.numTrees, [20, 50])
    .addGrid(rf_reg.maxDepth, [5, 10])
    .build()
)

rf_rmse_evaluator_cv = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="rmse"
)

cv_rf = CrossValidator(
    estimator=rf_pipeline,
    estimatorParamMaps=paramGrid_rf,
    evaluator=rf_rmse_evaluator_cv,
    numFolds=3,
    parallelism=2
)

# 2-c) Train on training data

cv_rf_model = cv_rf.fit(train_reg_df)

best_rf_pipeline = cv_rf_model.bestModel
best_rf_model = best_rf_pipeline.stages[-1]

print("Best Random Forest hyperparameters:")
print("numTrees:", best_rf_model.getNumTrees)
print("maxDepth:", best_rf_model.getMaxDepth())

# 2-d) Evaluate Random Forest on test data
pred_rf_test = best_rf_pipeline.transform(test_reg_df)
pred_rf_test.select(
    "fare_amount", "prediction",
    "trip_distance", "trip_duration",
    "pickup_hour", "pickup_day_of_week"
).show(10, truncate=False)
rf_rmse_evaluator = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="rmse"
)
rf_rmse = rf_rmse_evaluator.evaluate(pred_rf_test)
rf_r2_evaluator = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="r2"
)
rf_r2 = rf_r2_evaluator.evaluate(pred_rf_test)

print(f"RMSE: {rf_rmse:.4f}")
print(f"R^2 : {rf_r2:.4f}")

# 2-d) Save trained Random Forest pipeline
best_rf_pipeline.write().overwrite().save("/Volumes/workspace/default/data/rf_fare_pipeline")

Best Random Forest hyperparameters:
numTrees: 50
maxDepth: 10
+-----------+------------------+-------------+------------------+-----------+------------------+
|fare_amount|prediction        |trip_distance|trip_duration     |pickup_hour|pickup_day_of_week|
+-----------+------------------+-------------+------------------+-----------+------------------+
|4.5        |4.749704015071958 |0.5          |3.9166666666666665|0          |5                 |
|10.5       |10.278989160949859|0.7          |16.633333333333333|0          |5                 |
|10.0       |10.481617869131112|2.5          |10.816666666666666|0          |5                 |
|8.7        |9.475651170168826 |1.0          |12.816666666666666|0          |5                 |
|21.2       |21.51177412040871 |5.3          |26.35             |0          |5                 |
|12.2       |12.495581351502029|3.6          |10.766666666666667|0          |5                 |
|5.0        |5.961189831525769 |0.4          |5.466666666666667 |