In [0]:
%sql
USE CATALOG nyc_taxi;
USE SCHEMA nyc_taxi_schema;

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS nyc_taxi.nyc_taxi_schema.sparkml_cache;

In [0]:
import os

os.environ["SPARKML_TEMP_DFS_PATH"] = "/Volumes/nyc_taxi/nyc_taxi_schema/sparkml_cache"

In [0]:
import gc

#delete any old models so Spark can free cache
for name in ["cvLr", "cvLrModel", "bestLrPipelineModel", "bestLrModel",
             "rfModel", "bestRfPipelineModel", "bestRfModel",
             "lrPipelineModel", "rfPipelineModel", "model"]:
    if name in globals():
        del globals()[name]

gc.collect()

In [0]:
df = spark.read.table("nyc_taxi.nyc_taxi_schema.yellow_trips_csv_v")
display(df)
df.printSchema()

2.1 Data Preparation

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import radians, sin, cos, atan2, sqrt

R = 3959  #radius of earth in miles

#filter original df
dfReg = (
    df
    .filter((F.col("fare_amount") > 0) &
            (F.col("fare_amount") < 500) &
            (F.col("trip_distance") > 0))
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
    .withColumn("pickup_dow", F.dayofweek("tpep_pickup_datetime"))
    .withColumn("pickup_ts",  F.unix_timestamp("tpep_pickup_datetime"))
    .withColumn("dropoff_ts", F.unix_timestamp("tpep_dropoff_datetime"))
    .withColumn("trip_duration_min",
                (F.col("dropoff_ts") - F.col("pickup_ts")) / 60.0)
)

#remove bad durations and passenger counts
dfReg = (
    dfReg
    .filter(F.col("trip_duration_min") > 0)
    .filter(F.col("passenger_count") > 0)
)

#straight line distance
dfReg = (
    dfReg
    .withColumn("pick_lat",  radians(F.col("pickup_latitude")))
    .withColumn("pick_long", radians(F.col("pickup_longitude")))
    .withColumn("drop_lat",  radians(F.col("dropoff_latitude")))
    .withColumn("drop_long", radians(F.col("dropoff_longitude")))
)

dfReg = (
    dfReg
    .withColumn("lat_diff",  F.col("pick_lat") - F.col("drop_lat"))
    .withColumn("long_diff", F.col("pick_long") - F.col("drop_long"))
)

dfReg = (
    dfReg
    .withColumn(
        "a",
        sin(F.col("lat_diff") / 2) ** 2
        + cos(F.col("pick_lat")) * cos(F.col("drop_lat"))
        * sin(F.col("long_diff") / 2) ** 2
    )
    .withColumn("c", 2 * atan2(sqrt(F.col("a")), sqrt(1 - F.col("a"))))
)

dfReg = dfReg.withColumn(
    "straight_line_distance",
    F.col("c") * F.lit(R)
)

#drop helper columns used for calculations
dfReg = dfReg.drop(
    "pick_lat", "pick_long", "drop_lat", "drop_long",
    "lat_diff", "long_diff", "a", "c"
)

#features that cant be null
featureCols = [
    "passenger_count",
    "trip_distance",
    "trip_duration_min",
    "pickup_hour",
    "pickup_dow",
    "straight_line_distance"
]

labelCol = "fare_amount"

#drop if rows are null
dfReg = dfReg.na.drop(subset=featureCols)

#numeric features list for the model
numericFeatures = featureCols + ["straight_line_distance"]

print("Rows after cleaning:", dfReg.count())

#train/test split
trainDf, testDf = dfReg.randomSplit([0.7, 0.3], seed=42)
print("Train rows:", trainDf.count())
print("Test rows:", testDf.count())

#cross validation sample. Instructor approved
train_count = trainDf.count()
fraction = min(1.0, 100000.0 / float(train_count))
trainDfCv = trainDf.sample(withReplacement=False,
                           fraction=fraction,
                           seed=42)
print("Rows used for CV:", trainDfCv.count())

display(dfReg)

Pipeline 1: Linear Regression

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

#assemble all features
assembler = VectorAssembler(
    inputCols=numericFeatures,
    outputCol="features_unscaled"
)

#scale for linear regression
#withMean=False keeps vector sparse to avoid huge memory usage
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features",
    withMean=True, #True
    withStd=True
)

#linear regression model
lr = LinearRegression(
    featuresCol="features",
    labelCol=labelCol,
    predictionCol="prediction"
)

#full linear regression pipeline
lrPipeline = Pipeline(
    stages=[assembler, scaler, lr]
)

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

#param grid
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.0001, 0.001, 0.01])
    .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5])
    .addGrid(lr.maxIter, [50, 100])
    .build()
)

evaluatorRmse = RegressionEvaluator(
    labelCol=labelCol,
    predictionCol="prediction",
    metricName="rmse"
)

cvLr = CrossValidator(
    estimator=lrPipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluatorRmse,
    numFolds=3,     #lower if needed
    parallelism=1  #lower if needed
)

In [0]:
cvLrModel = cvLr.fit(trainDfCv)

bestLrPipelineModel = cvLrModel.bestModel
bestLrModel = bestLrPipelineModel.stages[-1]

In [0]:
#predictions on test set
testPredLr = bestLrPipelineModel.transform(testDf)

#evaluation metrics
rmseTest = evaluatorRmse.evaluate(testPredLr)
evaluatorR2 = RegressionEvaluator(
    labelCol=labelCol,
    predictionCol="prediction",
    metricName="r2"
)
r2Test = evaluatorR2.evaluate(testPredLr)

print("Linear Regression Evaluation Metrics:")
print("RMSE:", rmseTest)
print("R2:", r2Test)

#predictions vs actual
testPredLr.select("fare_amount", "prediction").show(10)

In [0]:
print("Best Linear Regression Hyperparameters")
print("regParam:", bestLrModel.getRegParam())
print("elasticNetParam:", bestLrModel.getElasticNetParam())
print("maxIter:", bestLrModel.getMaxIter())

Pipeline 2: Random Forest

In [0]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol=labelCol,
    predictionCol="prediction",
    seed=42
)

rfPipeline = Pipeline(
    stages=[assembler, scaler, rf]
)

rfparamGrid = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [50, 100])
    .addGrid(rf.maxDepth, [5, 10, 15])
    .build()
)

rfCV = CrossValidator(
    estimator=rfPipeline,
    estimatorParamMaps=rfparamGrid,
    evaluator=evaluatorRmse, #same RMSE evaluator
    numFolds=3,     #lower if needed
)

In [0]:
rfCvModel = rfCV.fit(trainDfCv)

bestRfPipelineModel = rfCvModel.bestModel
bestRfModel = bestRfPipelineModel.stages[-1]

In [0]:
#predictions on test set
testPredRf = bestRfPipelineModel.transform(testDf)

#evaluation metrics
rmseTest = evaluatorRmse.evaluate(testPredRf)
evaluatorR2 = RegressionEvaluator(
    labelCol=labelCol,
    predictionCol="prediction",
    metricName="r2"
)
r2Test = evaluatorR2.evaluate(testPredRf)

print("Linear Regression Evaluation Metrics:")
print("RMSE:", rmseTest)
print("R2:", r2Test)

#predictions vs actual
testPredRf.select("fare_amount", "prediction").show(10)

In [0]:
print("Best Random Forest Hyperparameters")
print("numTrees:", bestRfModel.getNumTrees())
print("maxDepth:", bestRfModel.getMaxDepth())