# Decision Tree

In [1]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np
from pyspark.sql.types import DoubleType, IntegerType, LongType

In [2]:
# Spark Session
spark = SparkSession.builder \
    .appName("YellowTaxiTripPrediction") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

25/05/19 16:14:33 WARN Utils: Your hostname, inigo-pena-HP-Victus resolves to a loopback address: 127.0.1.1; using 10.166.50.10 instead (on interface wlo1)
25/05/19 16:14:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/19 16:14:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Prepare Data

In [4]:
df = spark.read.parquet("../data/processed.parquet")

df.printSchema()

                                                                                

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- extras: double (nullable = true)
 |-- PU_Borough: string (nullable = true)
 |-- PU_Zone: string (nullable = true)
 |-- DO_Borough: string (nullable = true)
 |-- DO_Zone: string (nullable = true)



In [5]:
# Relevant columns
selected_cols = ["trip_distance", "passenger_count", "PULocationID", "DOLocationID", 
                 "VendorID", "total_amount", "payment_type","hour", "day_of_week"]
df_model = df.select(*selected_cols)

In [6]:
# Train and test split
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)

In [7]:
# Cleaning data
train_data = train_data.na.drop(subset=["trip_distance", "passenger_count", "PULocationID", "DOLocationID", 
                                            "VendorID", "total_amount", "payment_type","hour", "day_of_week"
])

test_data = test_data.na.drop(subset=["trip_distance", "passenger_count", "PULocationID", "DOLocationID", 
                                        "VendorID", "total_amount", "payment_type","hour", "day_of_week"
])

In [None]:
amount_encoder = OneHotEncoder(
    inputCols=["PULocationID", "DOLocationID", "VendorID", "payment_type"],
    outputCols=["PULocationID_ohe", "DOLocationID_ohe", "VendorID_ohe", "payment_type_ohe"]
)


## Building The Model

In [None]:
feature_cols = [
    "trip_distance", "passenger_count", "hour", "day_of_week",
    "PULocationID_ohe", "DOLocationID_ohe", "VendorID_ohe", "payment_type_ohe"
]

# Assemble features
amount_assembler = VectorAssembler(
    inputCols= feature_cols,
    outputCol="features"
)

total_amount_dt = DecisionTreeRegressor(labelCol="total_amount", featuresCol="features", maxDepth=5)

pipeline_total = Pipeline(stages=[amount_encoder, amount_assembler, total_amount_dt])
model_total = pipeline_total.fit(train_data)

                                                                                

In [11]:
predictions = model_total.transform(test_data)

rmse = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse").evaluate(predictions)
mae = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="mae").evaluate(predictions)
r2 = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="r2").evaluate(predictions)

print(f"RMSE: {rmse}")
print(f"MAE: {mae}")
print(f"R2: {r2}")




RMSE: 11.346018981746182
MAE: 4.76599297524254
R2: 0.750139962903208


                                                                                

In [17]:
model_total.stages[-1].featureImportances

SparseVector(540, {0: 0.9152, 136: 0.0082, 142: 0.0029, 270: 0.0072, 535: 0.0026, 537: 0.0346, 538: 0.0293})

In [15]:
lines = model_total.stages[-1].toDebugString.split('\n')
for line in lines[:15]:
    print(line)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_f40687f9ce65, depth=5, numNodes=63, numFeatures=540
  If (feature 0 <= 6.285)
   If (feature 0 <= 2.205)
    If (feature 0 <= 1.3250000000000002)
     If (feature 537 in {0.0})
      If (feature 538 in {0.0})
       Predict: 2.485227072758029
      Else (feature 538 not in {0.0})
       Predict: 12.843589924503966
     Else (feature 537 not in {0.0})
      If (feature 136 in {0.0})
       Predict: 16.36519229037444
      Else (feature 136 not in {0.0})
       Predict: 94.90811581676705
    Else (feature 0 > 1.3250000000000002)


# Predicting Trip Distance

In [5]:
# Select just numerical columns
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, LongType, DoubleType))]

distance_model_df = df.select(*numerical_cols)

In [6]:
# Train and test split
train_data, test_data = distance_model_df.randomSplit([0.8, 0.2], seed=42)

In [7]:
# Cleaning data
train_data = train_data.na.drop(subset=["trip_distance", "passenger_count", "PULocationID", "DOLocationID", 
                                            "VendorID", "total_amount", "payment_type","hour", "day_of_week"
])

test_data = test_data.na.drop(subset=["trip_distance", "passenger_count", "PULocationID", "DOLocationID", 
                                        "VendorID", "total_amount", "payment_type","hour", "day_of_week"
])

In [8]:
distance_encoder = OneHotEncoder(
    inputCols=["PULocationID", "DOLocationID", "VendorID", "payment_type"],
    outputCols=["PULocationID_ohe", "DOLocationID_ohe", "VendorID_ohe", "payment_type_ohe"]
)

In [12]:
# Feature columns
feature_cols = [col for col in numerical_cols if col != "trip_distance"]

distance_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

dt_distance = DecisionTreeRegressor(labelCol="trip_distance", featuresCol="features", maxDepth=10, minInstancesPerNode=50)

pipeline_distance = Pipeline(stages= [distance_assembler, dt_distance])
model_distance = pipeline_distance.fit(train_data)

                                                                                

In [14]:
predictions_dist = model_distance.transform(test_data)

rmse_dist = RegressionEvaluator(labelCol="trip_distance", predictionCol="prediction", metricName="rmse").evaluate(predictions_dist)
mae_dist = RegressionEvaluator(labelCol="trip_distance", predictionCol="prediction", metricName="mae").evaluate(predictions_dist)
r2_dist = RegressionEvaluator(labelCol="trip_distance", predictionCol="prediction", metricName="r2").evaluate(predictions_dist)

print(f"RMSE: {rmse_dist}")
print(f"MAE: {mae_dist}")
print(f"R2: {r2_dist}")



RMSE: 112.64211126079385
MAE: 0.8963620960315555
R2: -0.01863136516395758


                                                                                

In [15]:
tree_model_dist = model_distance.stages[-1]
print(tree_model_dist.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_4987509b8e96, depth=10, numNodes=1485, numFeatures=11
  If (feature 5 <= 32.8)
   If (feature 5 <= 22.2)
    If (feature 5 <= 13.15)
     If (feature 5 <= 9.65)
      If (feature 10 <= 1.75)
       If (feature 1 <= 138.5)
        If (feature 1 <= 131.5)
         If (feature 4 <= 3.5)
          If (feature 5 <= 4.75)
           If (feature 0 <= 113.5)
            Predict: 0.8961319742489272
           Else (feature 0 > 113.5)
            Predict: 2.0771030993042374
          Else (feature 5 > 4.75)
           If (feature 5 <= 7.550000000000001)
            Predict: 0.6875579497224945
           Else (feature 5 > 7.550000000000001)
            Predict: 1.075191902150991
         Else (feature 4 > 3.5)
          If (feature 0 <= 250.5)
           If (feature 0 <= 113.5)
            Predict: 1.6004107505070997
           Else (feature 0 > 113.5)
            Predict: 2.461791723510687
          Else (feature 0 > 250.5)
           If (fe