In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("a4 Assignment").getOrCreate()

path = "gs://dataproc-staging-us-central1-265934923888-wnqace9g/datasets/2019-01-h1.csv"

df = spark.read.csv(path, header=True, inferSchema=True)

df_selected = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")

df_selected.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [4]:
df_selected.printSchema()

trainDF = df_selected.sample(withReplacement=False, fraction=.8, seed=42)

testDF = df_selected.subtract(trainDF)

root
 |-- passenger_count: double (nullable = true)
 |-- pulocationid: double (nullable = true)
 |-- dolocationid: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [5]:
from pyspark.ml.regression import DecisionTreeRegressor

regressor = DecisionTreeRegressor(featuresCol="features",
                                 labelCol="total_amount")

In [7]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["passenger_count", "pulocationid","dolocationid"], outputCol="features")

vecTrain = assembler.transform(trainDF)

vecTrain.select("passenger_count", "pulocationid", "dolocationid", "features", "total_amount").show(10)

+---------------+------------+------------+-----------------+------------+
|passenger_count|pulocationid|dolocationid|         features|total_amount|
+---------------+------------+------------+-----------------+------------+
|            1.0|       151.0|       239.0|[1.0,151.0,239.0]|        9.95|
|            1.0|       239.0|       246.0|[1.0,239.0,246.0]|        16.3|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|        7.55|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|       55.55|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|       13.31|
|            1.0|       163.0|       229.0|[1.0,163.0,229.0]|        9.05|
|            2.0|       141.0|       234.0|[2.0,141.0,234.0]|        13.0|
|            2.0|       246.0|       162.0|[2.0,246.0,162.0]|       19.55|
|            1.0|       238.0|       151.0|[1.0,238.0,151.0]|         8.5|
|            1.0|       163.0|        25.0| [1.0,163.0,25.0]|       42.95|
+---------------+--------

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

In [9]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, regressor])

In [10]:
model = pipeline.fit(trainDF)

pred = model.transform(testDF)

pred.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)

[Stage 22:>                                                         (0 + 4) / 4]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|       143.0|       141.0|        11.3|12.229978454915257|
|            0.0|       249.0|       231.0|        9.35|12.229978454915257|
|            1.0|        45.0|       186.0|        21.8|13.394432423171768|
|            1.0|        47.0|       140.0|       32.74|13.394432423171768|
|            1.0|        67.0|        22.0|         7.8|21.066133373723233|
|            1.0|        75.0|       142.0|       19.56|13.394432423171768|
|            1.0|        76.0|        17.0|        23.3|21.066133373723233|
|            1.0|        79.0|       236.0|       20.38|13.394432423171768|
|            1.0|       132.0|        79.0|       70.01|44.123787285390975|
|            1.0|       132.0|       152.0|       67.34| 48.95072481432126|
+-----------

                                                                                

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

regEval = RegressionEvaluator(predictionCol="prediction", labelCol="total_amount", metricName="rmse")

rmse = regEval.evaluate(pred)

                                                                                

In [13]:
print("RMSE is:", rmse)

RMSE is: 138.4884854025505
