In [45]:
# Create taxiDF and display the first 10 rows
filePath = "gs://class_demo_bucket-1/2019-04.csv"
taxiDF = spark.read.csv(filePath, header=True, inferSchema=True)
taxiDF.select("passenger_count", "pulocationid", "dolocationid", "total_amount").show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows



In [46]:
# Split the dataframe into trainDF and testDF
trainDF, testDF = taxiDF.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set.""")



There are 5946608 rows in the training set,
and 1486531 in the test set.


                                                                                

In [47]:
# Assemble features into a vector
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler (
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)
# vecTrainDF = vecAssembler.transform(trainDF)
# vecTrainDF.select("passenger_count", "pulocationid", "dolocationid", "features", "total_amount").show(10)

In [48]:
# Define and train the decision tree regressor
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(
    featuresCol="features", # column containing the feature vectors
    labelCol="total_amount" # target variable we are predicting
)
# dtModel = dt.fit(vecTrainDF)

In [49]:
# Pipeline
from pyspark.ml import Pipeline

# Create pipeline
pipeline = Pipeline(stages=[vecAssembler, dt])

# Train the model
pipelineModel = pipeline.fit(trainDF)

# Make predictions
predDF = pipelineModel.transform(testDF)
predDF.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)

[Stage 199:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            1.0|       246.0|        68.0|         8.3| 17.52601267249345|
|            1.0|       186.0|       261.0|       23.75|19.279406904832715|
|            3.0|       144.0|       170.0|       17.15|  15.8201607683849|
|            1.0|       100.0|       170.0|         9.8|17.900044761106553|
|            1.0|       161.0|       107.0|        10.8|16.356256622231676|
|            1.0|       239.0|        24.0|         9.8| 25.23215811655272|
|            1.0|       148.0|         4.0|        10.3| 30.57349933215938|
|            0.0|       186.0|        13.0|       19.58| 30.57349933215938|
|            1.0|       249.0|       144.0|        13.3|  15.8201607683849|
|            1.0|       249.0|        87.0|        13.8| 17.52601267249345|
+-----------

                                                                                

In [50]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator (
    predictionCol="prediction",
    labelCol="total_amount",
    metricName="rmse"
)
rmse = regressionEvaluator.evaluate(predDF) 
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")



Root Mean Squared Error (RMSE) on test data = 12.3755849351859


                                                                                