In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder.appName('a4-DecisionTreeRegressor').getOrCreate()

25/04/28 19:41:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df = spark.read.csv('gs://dataproc-staging-us-central1-920508693171-4om4tbcd/artificial_data_files_ws78/4columns.csv', header=True, inferSchema=True)

                                                                                

In [4]:
df.show(10)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [5]:
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)

In [6]:
assembler = VectorAssembler(inputCols=['passenger_count', 'pulocationid', 'dolocationid'], outputCol='features')

In [7]:
dtr = DecisionTreeRegressor(featuresCol='features', labelCol='total_amount', maxBins=500)

In [8]:
pipeline = Pipeline(stages=[assembler, dtr])

In [9]:
model = pipeline.fit(trainDF)

                                                                                

In [10]:
prediction = model.transform(testDF)

In [11]:
prediction.select('passenger_count', 'pulocationid', 'dolocationid', 'prediction').show(10)

[Stage 17:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         1.0|         1.0| 89.51189368770764|
|            0.0|         4.0|         4.0|17.863974882487984|
|            0.0|         4.0|         4.0|17.863974882487984|
|            0.0|         4.0|        68.0|17.863974882487984|
|            0.0|         4.0|        79.0|17.863974882487984|
|            0.0|         4.0|        80.0|17.863974882487984|
|            0.0|         4.0|       107.0|17.863974882487984|
|            0.0|         4.0|       114.0|17.863974882487984|
|            0.0|         4.0|       161.0|17.863974882487984|
|            0.0|         4.0|       161.0|17.863974882487984|
+---------------+------------+------------+------------------+
only showing top 10 rows



                                                                                

In [12]:
evaluator = RegressionEvaluator(labelCol='total_amount', predictionCol='prediction', metricName='rmse')

rmse = evaluator.evaluate(prediction)
print(f"RMSE on test data = {rmse:.2f}")



RMSE on test data = 60.93


                                                                                