In [9]:
# Part 1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("NYC Taxi Fare Prediction").getOrCreate()

# Load the CSV
df = spark.read.csv("2019-04.csv", header=True, inferSchema=True)

# Choose the columns at indices 3, 7, 8, 16 since it starts at 0
selected_cols = [df.columns[3], df.columns[7], df.columns[8], df.columns[16]]

df_selected = df.select(*selected_cols)

# Show the first 10 rows
df_selected.show(10)


[Stage 24:>                                                         (0 + 8) / 8]

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows


                                                                                

In [10]:
# Part 2
trainDF, testDF = df_selected.randomSplit([0.8, 0.2], seed=42)

In [11]:
# Part 3
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

# feature_cols has all 4 columns from selected_cols except the last one, total_amount
feature_cols = selected_cols[:-1] 

# Single vector called assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Predicts the total_amount which is last column in selected_cols
dt = DecisionTreeRegressor(featuresCol="features", labelCol=selected_cols[-1])

In [12]:
# Part 4
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, dt])

In [13]:
# Part 5

# Trains pipeline on training data
model = pipeline.fit(trainDF)

25/07/26 18:27:51 WARN MemoryStore: Not enough space to cache rdd_106_1 in memory! (computed 28.4 MiB so far)
25/07/26 18:27:51 WARN BlockManager: Persisting block rdd_106_1 to disk instead.
25/07/26 18:27:51 WARN MemoryStore: Not enough space to cache rdd_106_6 in memory! (computed 28.4 MiB so far)
25/07/26 18:27:51 WARN BlockManager: Persisting block rdd_106_6 to disk instead.
25/07/26 18:27:51 WARN MemoryStore: Not enough space to cache rdd_106_2 in memory! (computed 28.4 MiB so far)
25/07/26 18:27:51 WARN BlockManager: Persisting block rdd_106_2 to disk instead.
25/07/26 18:27:51 WARN MemoryStore: Not enough space to cache rdd_106_3 in memory! (computed 42.9 MiB so far)
25/07/26 18:27:51 WARN BlockManager: Persisting block rdd_106_3 to disk instead.
25/07/26 18:27:51 WARN MemoryStore: Not enough space to cache rdd_106_0 in memory! (computed 28.4 MiB so far)
25/07/26 18:27:51 WARN BlockManager: Persisting block rdd_106_0 to disk instead.
25/07/26 18:27:51 WARN MemoryStore: Not enoug

In [14]:
# Part 6

# Apply model to testing dataset
predict = model.transform(testDF)

# Show first 10 rows for selected_cols, actual total_amount, and prediction
predict.select(*feature_cols, selected_cols[-1], "prediction").show(10)

[Stage 40:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         1.0|         1.0|       103.3|27.714119597463643|
|            0.0|         4.0|         4.0|         6.8|27.714119597463643|
|            0.0|         4.0|        33.0|       31.55|21.079365103486097|
|            0.0|         4.0|        79.0|         7.8|21.079365103486097|
|            0.0|         4.0|       107.0|        11.8|21.079365103486097|
|            0.0|         4.0|       144.0|        11.3|21.079365103486097|
|            0.0|         4.0|       234.0|        11.0|21.079365103486097|
|            0.0|         7.0|       121.0|        28.8|21.079365103486097|
|            0.0|         7.0|       223.0|         6.8|21.079365103486097|
|            0.0|         7.0|       223.0|         8.3|21.079365103486097|
+-----------

                                                                                

In [15]:
# Part 7

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol=selected_cols[-1], predictionCol="prediction", metricName="rmse"
)

# Calculate RMSE
rmse = evaluator.evaluate(predict)
print(f"RMSE = {rmse}")

[Stage 41:>                                                         (0 + 8) / 8]

RMSE = 12.576215578903543


                                                                                