In [1]:
import pyspark
print(pyspark.__version__)

4.0.0


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TaxiTotalAmmountPrediction") \
    .getOrCreate()

spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [3]:
df = spark.read.csv("2019-04.csv", header=True, inferSchema=True)

selected_cols = ["passenger_count", "PULocationID", "DOLocationID", "total_amount"]
df_selected = df.select(selected_cols)
df_selected.show(10)

+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows


In [4]:
trainDF, testDF = df_selected.randomSplit([0.8,0.2], seed=42)

In [5]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=["passenger_count", "PULocationID", "DOLocationID"],
    outputCol="features"
)

dt = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")

In [6]:
pipeline = Pipeline(stages=[assembler, dt])

In [7]:
model = pipeline.fit(trainDF)

In [8]:
predictions = model.transform(testDF)
predictions.select("passenger_count", "PULocationID", "DOLocationID", "prediction").show(10)

+---------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         1.0|         1.0|20.381364073006495|
|            0.0|         4.0|         4.0|20.381364073006495|
|            0.0|         4.0|         4.0|20.381364073006495|
|            0.0|         4.0|        79.0| 15.74033835258631|
|            0.0|         4.0|        88.0| 15.74033835258631|
|            0.0|         4.0|       114.0| 15.74033835258631|
|            0.0|         4.0|       186.0|17.986261707235286|
|            0.0|         4.0|       257.0|17.986261707235286|
|            0.0|         7.0|       146.0|17.986261707235286|
|            0.0|         7.0|       146.0|17.986261707235286|
+---------------+------------+------------+------------------+
only showing top 10 rows


In [9]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 14.518795974849455
