In [None]:
import pandas as pd
from pyspark.sql.functions import *
import pyspark.sql.functions as F



In [None]:
storage_account_name = "utsbdeat1"
storage_account_access_key = [ACCESS KEY]
blob_container_name = "assignment2"

In [None]:
#mounting azure container to databricks
dbutils.fs.mount(
  source = f'wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net',
  mount_point = f'/mnt/{blob_container_name}/',
  extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net': storage_account_access_key}
)

Out[113]: True

In [None]:
#listing files
dbutils.fs.ls("/mnt/assignment2/")

Out[115]: [FileInfo(path='dbfs:/mnt/assignment2/green/', name='green/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/assignment2/yellow/', name='yellow/', size=0, modificationTime=0)]

In [None]:
#reading parquet for yellow taxis in April 2022
yellow_apr22 = spark.read.parquet("/mnt/assignment2/yellow/double/yellow_tripdata_2022-04.parquet")

spark.conf.set(
  "fs.azure.account.key.%s.blob.core.windows.net" % storage_account_name,
  storage_account_access_key)

output_container_path = "wasbs://%s@%s.blob.core.windows.net" % (blob_container_name, storage_account_name)
output_blob_folder = "%s/csv" % output_container_path

# write the dataframe as a single file to blob storage
(yellow_apr22
 .coalesce(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .format("com.databricks.spark.csv")
 .save(output_blob_folder))


In [None]:
#reading parquet
green_cabs = spark.read.parquet("/mnt/assignment2/green/")
green_cabs.cache()

Out[2]: DataFrame[VendorID: bigint, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: double, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double]

In [None]:
#reading parquet
yellow_double = spark.read.parquet("/mnt/assignment2/yellow/double/")
yellow_double.cache()


Out[3]: DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

In [None]:
#reading parquet
yellow_int = spark.read.parquet("/mnt/assignment2/yellow/int/")
yellow_int.cache()

#changing datatype of airport_fee column to double
yellow_int = yellow_int.withColumn("airport_fee",yellow_int.airport_fee.cast('double'))


In [None]:
#combining yellow_double and yellow_int dataframes
yellow_cabs = yellow_double.union(yellow_int)

In [None]:
yellow_cabs.count()

Out[6]: 152823008

In [None]:
green_cabs.count()

Out[7]: 9390483

In [None]:
#new columns with duration (in seconds, hours, minutes), distance (in km), speed (in mph and kph) for both datasets

green_cabs = green_cabs.withColumn("DurationInSeconds", col("lpep_dropoff_datetime").cast("long") - col("lpep_pickup_datetime").cast("long"))
green_cabs = green_cabs.withColumn("DurationInHours",F.round(col("DurationInSeconds")/3600.0, 3))
green_cabs = green_cabs.withColumn("DurationInMinutes",F.round(col("DurationInSeconds")/60.0, 3))
green_cabs = green_cabs.withColumn("trip_distance_km",green_cabs.trip_distance * 1.60934)
green_cabs = green_cabs.withColumn("speed",F.round(green_cabs.trip_distance/green_cabs.DurationInHours, 3))
green_cabs = green_cabs.withColumn("speed_km_hr",F.round(green_cabs.trip_distance_km/green_cabs.DurationInHours, 3))


yellow_cabs = yellow_cabs.withColumn("DurationInSeconds", col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long"))
yellow_cabs = yellow_cabs.withColumn("DurationInHours",F.round(col("DurationInSeconds")/3600.0, 3))
yellow_cabs = yellow_cabs.withColumn("DurationInMinutes",F.round(col("DurationInSeconds")/60.0, 3))
yellow_cabs = yellow_cabs.withColumn("trip_distance_km",yellow_cabs.trip_distance * 1.60934)
yellow_cabs = yellow_cabs.withColumn("speed",F.round(yellow_cabs.trip_distance/yellow_cabs.DurationInHours, 3))
yellow_cabs = yellow_cabs.withColumn("speed_km_hr",F.round(yellow_cabs.trip_distance_km/yellow_cabs.DurationInHours, 3))


In [None]:
#Removing trips finishing before the starting time
green_cabs = green_cabs.filter(green_cabs.lpep_dropoff_datetime > green_cabs.lpep_pickup_datetime)
yellow_cabs = yellow_cabs.filter(yellow_cabs.tpep_dropoff_datetime > yellow_cabs.tpep_pickup_datetime)

#Removing trips with negative speed
green_cabs = green_cabs.filter(green_cabs.speed > 0)
yellow_cabs = yellow_cabs.filter(yellow_cabs.speed > 0)

#Trips within date range (between 2019-01-01 and 2022-05-01)
green_cabs = green_cabs.filter(col("lpep_pickup_datetime") > F.lit("2019-01-01"))
green_cabs = green_cabs.filter(col("lpep_pickup_datetime") < F.lit("2022-05-01"))
yellow_cabs = yellow_cabs.filter(col("tpep_pickup_datetime") > F.lit("2019-01-01"))
yellow_cabs = yellow_cabs.filter(col("tpep_pickup_datetime") < F.lit("2022-05-01"))


In [None]:
#checking percentiles for speed, distance and duration for yellow and green cabs to see distribution
green_cabs.selectExpr('percentile_approx(speed, 0.99)').show()
yellow_cabs.selectExpr('percentile_approx(speed, 0.99)').show()

green_cabs.selectExpr('percentile_approx(speed, 0.05)').show()
yellow_cabs.selectExpr('percentile_approx(speed, 0.05)').show()

green_cabs.selectExpr('percentile_approx(trip_distance, 0.99)').show()
yellow_cabs.selectExpr('percentile_approx(trip_distance, 0.99)').show()

green_cabs.selectExpr('percentile_approx(trip_distance, 0.05)').show()
yellow_cabs.selectExpr('percentile_approx(trip_distance, 0.05)').show()

green_cabs.selectExpr('percentile_approx(DurationInSeconds, 0.99)').show()
yellow_cabs.selectExpr('percentile_approx(DurationInSeconds, 0.99)').show()

green_cabs.selectExpr('percentile_approx(DurationInSeconds, 0.01)').show()
yellow_cabs.selectExpr('percentile_approx(DurationInSeconds, 0.01)').show()

+-------------------------------------+
|percentile_approx(speed, 0.99, 10000)|
+-------------------------------------+
|                               34.821|
+-------------------------------------+

+-------------------------------------+
|percentile_approx(speed, 0.99, 10000)|
+-------------------------------------+
|                               36.058|
+-------------------------------------+

+-------------------------------------+
|percentile_approx(speed, 0.05, 10000)|
+-------------------------------------+
|                                5.816|
+-------------------------------------+

+-------------------------------------+
|percentile_approx(speed, 0.05, 10000)|
+-------------------------------------+
|                                4.773|
+-------------------------------------+

+---------------------------------------------+
|percentile_approx(trip_distance, 0.99, 10000)|
+---------------------------------------------+
|                                        21.19|
+---

In [None]:
#Removing trips with very high speed (look for NYC and outside of NYC speed limit 
green_cabs = green_cabs.filter(green_cabs.speed <= 36)
yellow_cabs = yellow_cabs.filter(yellow_cabs.speed <= 36)

#Removing trips that are travelling too short or too long (duration wise)
green_cabs = green_cabs.filter(green_cabs.DurationInSeconds >= 60)
green_cabs = green_cabs.filter(green_cabs.DurationInSeconds <= 4500)
yellow_cabs = yellow_cabs.filter(yellow_cabs.DurationInSeconds >= 60)
yellow_cabs = yellow_cabs.filter(yellow_cabs.DurationInSeconds <= 4500)

#Removing trips that are travelling too short or too long (distance wise)
green_cabs = green_cabs.filter(green_cabs.trip_distance >= 0.4)
green_cabs = green_cabs.filter(green_cabs.trip_distance <= 22)
yellow_cabs = yellow_cabs.filter(yellow_cabs.trip_distance >= 0.4)
yellow_cabs = yellow_cabs.filter(yellow_cabs.trip_distance <= 22)


In [None]:
#adding new column with cab colour 
green_cabs = green_cabs.withColumn("cab_colour", lit("green"))
yellow_cabs = yellow_cabs.withColumn("cab_colour", lit("yellow"))

#renaming pickup and dropoff datetime columns to match
green_cabs = green_cabs.withColumnRenamed("lpep_pickup_datetime","pep_pickup_datetime")
green_cabs = green_cabs.withColumnRenamed("lpep_dropoff_datetime","pep_dropoff_datetime")

yellow_cabs = yellow_cabs.withColumnRenamed("tpep_pickup_datetime","pep_pickup_datetime")
yellow_cabs = yellow_cabs.withColumnRenamed("tpep_dropoff_datetime","pep_dropoff_datetime")

#merging green and yellow taxi datasets
df = yellow_cabs.unionByName(green_cabs, allowMissingColumns=True)

In [None]:
#writing to parquet in dbfs
df.write.parquet("/cleaned/all_cabs", mode='overwrite')

In [None]:
#reading parquet
df = spark.read.parquet("/cleaned/all_cabs")

In [None]:
#creating temporary view for SQL analysis
df.createTempView("all_cabs") 

In [None]:
#run line if delta exception error for 'hr' table in sql script
dbutils.fs.rm("dbfs:/user/hive/warehouse/hr/", True)

Out[11]: True

In [None]:
#run line if delta exception error for 'cab_colour_stats' table in sql script
dbutils.fs.rm("dbfs:/user/hive/warehouse/cab_colour_stats/", True)

Out[13]: True

In [None]:
%sql
CREATE OR REPLACE TABLE month_year AS
SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year
FROM all_cabs;


CREATE OR REPLACE TABLE trips AS
SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, count(*) as total_trips
FROM all_cabs
GROUP BY month_year;


CREATE OR REPLACE TABLE day AS
SELECT month_year, dayofweek
FROM(SELECT *, ROW_NUMBER()OVER(PARTITION BY month_year ORDER BY trips DESC) AS rownum
     FROM (SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, DAYOFWEEK(pep_pickup_datetime) AS dayofweek, COUNT(*) AS trips
           FROM all_cabs
           GROUP BY month_year, dayofweek))
WHERE rownum = 1;


-- modified from https://stackoverflow.com/questions/20515656/most-active-time-of-day-based-on-start-and-end-time
CREATE OR REPLACE TABLE hr ( hr int not null);
INSERT INTO hr(hr)
VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)
     , (11), (12), (13), (14), (15), (16), (17), (18), (19), (20)
     , (21), (22), (23);


CREATE OR REPLACE TABLE hour AS
SELECT * FROM
(SELECT *, ROW_NUMBER()OVER(PARTITION BY month_year ORDER BY activity DESC) AS rownum
FROM(SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, hr, count(*) AS activity
     FROM all_cabs, hr
     WHERE ( hr BETWEEN hour(pep_pickup_datetime) AND hour(pep_dropoff_datetime)
        OR hour(pep_pickup_datetime) BETWEEN hour(pep_dropoff_datetime) AND hr
        OR hour(pep_dropoff_datetime) BETWEEN hr AND hour(pep_pickup_datetime) )
     GROUP BY month_year, hr))
WHERE rownum = 1;


CREATE OR REPLACE TABLE avg_passengers AS
SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, ROUND(AVG(passenger_count),2) AS avg_passenger_count
FROM all_cabs
GROUP BY month_year;


CREATE OR REPLACE TABLE avg_amt AS
SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, ROUND(AVG(total_amount),2) AS avg_total_amount
FROM all_cabs
GROUP BY month_year;


CREATE OR REPLACE TABLE avg_passenger_amt AS
SELECT DATE_FORMAT(pep_pickup_datetime,'yyyy-MM') AS month_year, ROUND(AVG(total_amount/passenger_count),2) AS avg_amt_per_passenger
FROM all_cabs
GROUP BY month_year;


SELECT month_year.month_year, trips.total_trips, day.dayofweek, hour.hr, avg_passengers.avg_passenger_count, avg_amt.avg_total_amount, avg_passenger_amt.avg_amt_per_passenger
FROM (SELECT month_year FROM trips UNION
      SELECT month_year FROM day UNION 
      SELECT month_year FROM hour UNION
      SELECT month_year FROM avg_passengers UNION
      SELECT month_year FROM avg_amt UNION
      SELECT month_year FROM avg_passenger_amt
     ) month_year 
LEFT OUTER JOIN trips ON month_year.month_year = trips.month_year 
LEFT OUTER JOIN day ON month_year.month_year = day.month_year 
LEFT OUTER JOIN hour ON month_year.month_year = hour.month_year 
LEFT OUTER JOIN avg_passengers ON month_year.month_year = avg_passengers.month_year 
LEFT OUTER JOIN avg_amt ON month_year.month_year = avg_amt.month_year
LEFT OUTER JOIN avg_passenger_amt ON month_year.month_year = avg_passenger_amt.month_year
ORDER BY month_year ASC;



month_year,total_trips,dayofweek,hr,avg_passenger_count,avg_total_amount,avg_amt_per_passenger
2019-01,7969544,5,18,1.55,15.44,12.67
2019-02,7316958,6,18,1.55,18.3,15.0
2019-03,8128986,6,18,1.56,18.69,15.29
2019-04,7685471,3,18,1.56,18.81,15.26
2019-05,7761065,5,18,1.56,18.99,15.49
2019-06,7111000,7,18,1.55,19.02,15.55
2019-07,6441573,4,18,1.56,18.86,15.34
2019-08,6196916,5,18,1.56,18.98,15.37
2019-09,6653629,5,18,1.54,19.18,15.67
2019-10,7316092,5,18,1.53,19.15,15.63


In [None]:
%sql
--q2
CREATE OR REPLACE TABLE cab_colour_stats(cab_colour varchar(6) not null, feature varchar(25) not null, average float not null, median float not null, minimum float not null, maximum float not null);  --(cab_colour)
INSERT INTO cab_colour_stats SELECT * FROM
(SELECT cab_colour, "Trip Duration (mins)" AS feature, ROUND(AVG(DurationInMinutes),2) AS average, ROUND(PERCENTILE(DurationInMinutes,0.5),2) AS median, ROUND(MIN(DurationInMinutes),2) AS minimum, ROUND(MAX(DurationInMinutes),2) AS maximum
FROM all_cabs
GROUP BY cab_colour);

INSERT INTO cab_colour_stats SELECT * FROM
(SELECT cab_colour, "Trip distance (km)" AS feature, ROUND(AVG(trip_distance_km),2) AS average, ROUND(PERCENTILE(trip_distance_km,0.5),2) AS median, ROUND(MIN(trip_distance_km),2) AS minimum, ROUND(MAX(trip_distance_km),2) AS maximum
FROM all_cabs
GROUP BY cab_colour);

INSERT INTO cab_colour_stats SELECT * FROM
(SELECT cab_colour, "Trip speed (km/hr)" AS feature, ROUND(AVG(speed_km_hr),2) AS average, ROUND(PERCENTILE(speed_km_hr,0.5),2) AS median, ROUND(MIN(speed_km_hr),2) AS minimum, ROUND(MAX(speed_km_hr),2) AS maximum
FROM all_cabs
GROUP BY cab_colour);

SELECT * FROM cab_colour_stats;

cab_colour,feature,average,median,minimum,maximum
yellow,Trip Duration (mins),14.16,11.22,1.0,75.0
green,Trip Duration (mins),16.38,12.68,1.0,75.0
yellow,Trip distance (km),4.72,2.75,0.64,35.41
green,Trip distance (km),5.86,3.6,0.64,35.41
yellow,Trip speed (km/hr),18.58,16.48,0.52,57.94
green,Trip speed (km/hr),20.15,17.88,0.57,57.94


In [None]:
%sql
--q3
SELECT ROUND((COUNT(CASE WHEN tip_amount > 0 THEN 1 ELSE NULL END)/COUNT(*) * 100), 2) AS percentage_trips_with_tips
FROM all_cabs;


percentage_trips_with_tips
69.67


In [None]:
%sql
--q4
SELECT ROUND((COUNT(CASE WHEN tip_amount >= 10 THEN 1 ELSE NULL END)/COUNT(CASE WHEN tip_amount > 0 THEN 1 ELSE NULL END) * 100), 2) AS percentage_tips_greater_10
FROM all_cabs;

percentage_tips_greater_10
2.85


In [None]:
%sql
--q5
SELECT DurationBins, ROUND(AVG(speed_km_hr),2) AS avg_speed, ROUND(AVG(trip_distance_km/total_amount),2) AS distance_per_dollar
FROM (SELECT
        CASE WHEN DurationInMinutes > 0 AND DurationInMinutes < 5    THEN 'Under 5 Mins'
             WHEN DurationInMinutes >= 5 AND DurationInMinutes < 10    THEN '5-10 Mins'
             WHEN DurationInMinutes >= 10 AND DurationInMinutes < 20    THEN '10-20 mins'
             WHEN DurationInMinutes >= 20 AND DurationInMinutes < 30    THEN '20-30 mins'
             WHEN DurationInMinutes >= 30 AND DurationInMinutes < 60    THEN '30-60 mins'
             WHEN DurationInMinutes >= 60 THEN '60 or more mins'
        END DurationBins,
        *
      FROM all_cabs)
GROUP BY DurationBins
ORDER BY distance_per_dollar ASC;
    

DurationBins,avg_speed,distance_per_dollar
Under 5 Mins,20.17,0.13
5-10 Mins,16.88,0.17
10-20 mins,17.41,0.23
20-30 mins,20.43,0.28
30-60 mins,26.09,0.36
60 or more mins,22.67,0.4


In [None]:
#breaking pickup and dropoff datetimes into month, year, hour and day of week

df = df.withColumn("PUmonth", month(df.pep_pickup_datetime))
df = df.withColumn("DOmonth", month(df.pep_dropoff_datetime))
df = df.withColumn("PUyear", year(df.pep_pickup_datetime))
df = df.withColumn("DOyear", year(df.pep_dropoff_datetime))
df = df.withColumn("PUhour", hour(df.pep_pickup_datetime))
df = df.withColumn("DOhour", hour(df.pep_dropoff_datetime))
df = df.withColumn("PUdayofweek", dayofweek(df.pep_pickup_datetime))
df = df.withColumn("DOdayofweek", dayofweek(df.pep_dropoff_datetime))

#filtering for only positive values of fare_amount
df = df.filter(df.fare_amount > 0)


In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

#column names that will be used in ML model
cols_list = ['VendorID', 'pep_pickup_datetime', 'pep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'cab_colour', 'PUmonth', 'PUyear', 'PUhour', 'PUdayofweek', 'DOmonth', 'DOyear', 'DOhour', 'DOdayofweek', 'trip_distance', 'DurationInSeconds', 'fare_amount']

#defining categorical columns
cat_cols = ['VendorID', 'PULocationID', 'DOLocationID', 'cab_colour'] 

stages = []

#string indexer and one hot encoder for categorical columns
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind").setHandleInvalid("keep")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

#defining numerical columns
num_cols = ['PUmonth', 'PUyear', 'PUhour', 'PUdayofweek', 'DOmonth', 'DOyear', 'DOhour', 'DOdayofweek', 'trip_distance', 'DurationInSeconds']

#vector assembler
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType

#subset of data frame with columns to be used by ML model
df_ml = df.select(cols_list)

#creating a pipeline of all string indexing, onehot encoding and vector assembler stages
pipeline = Pipeline(stages=stages)

#fitting pipeline on data
pipeline_model = pipeline.fit(df_ml)

df_ml = pipeline_model.transform(df_ml)

#dataframe with vector assembled column and all other columns to be used by ML model
df_ml = df_ml.select(['features'] + cols_list)
df_ml.show()

+--------------------+--------+-------------------+--------------------+------------+------------+----------+-------+------+------+-----------+-------+------+------+-----------+-------------+-----------------+-----------+
|            features|VendorID|pep_pickup_datetime|pep_dropoff_datetime|PULocationID|DOLocationID|cab_colour|PUmonth|PUyear|PUhour|PUdayofweek|DOmonth|DOyear|DOhour|DOdayofweek|trip_distance|DurationInSeconds|fare_amount|
+--------------------+--------+-------------------+--------------------+------------+------------+----------+-------+------+------+-----------+-------+------+------+-----------+-------------+-----------------+-----------+
|(544,[1,30,332,53...|       1|2022-03-01 00:13:08| 2022-03-01 00:24:35|          90|         209|    yellow|      3|  2022|     0|          3|      3|  2022|     0|          3|          2.4|              687|       10.0|
|(544,[1,41,278,53...|       1|2022-03-01 00:47:52| 2022-03-01 01:00:08|         148|         234|    yellow|   

In [None]:
df_ml.write.parquet("/ml_df/", mode='overwrite')

In [None]:
df_ml = spark.read.parquet("/ml_df/")

In [None]:
#separating April 2022 data into testing set
test_data = df_ml.filter(col("pep_pickup_datetime") > F.lit("2022-04-01"))
df_ml = df_ml.filter(col("pep_pickup_datetime") < F.lit("2022-04-01"))

#splitting remaining data into training and validation set
train_data, valid_data = df_ml.randomSplit([0.8, 0.2], seed=8)

In [None]:
#took 2.41 hours to run

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

#defining features and target variable for linear regression
lr = LinearRegression(featuresCol="features", labelCol='fare_amount')

#fitting linear regression model on training data
lr_model = lr.fit(train_data)

#predicting on validation data
lr_predictions = lr_model.transform(valid_data)
lr_predictions.select("prediction", "fare_amount", "features").show(5)

#evaluating predictions using rmse metric
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(lr_predictions)
print("Root Mean Squared Error (RMSE) on validation data = %g" % rmse)

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|5.990135355102641|        5.5|(544,[0,5,268,532...|
|7.906220977460777|        7.5|(544,[0,5,268,532...|
|7.139663962607301|        7.0|(544,[0,5,268,532...|
| 5.29055486273062|        5.0|(544,[0,5,268,532...|
|4.979343169111063|        5.0|(544,[0,5,268,532...|
+-----------------+-----------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on validation data = 219.41


In [None]:
#took 4.33 hours to run

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features", labelCol='fare_amount')

# Training model
dt_model = dt.fit(train_data)


In [None]:
#Make predictions on validation data
dt_predictions = dt_model.transform(valid_data)

dt_predictions.select("prediction", "fare_amount", "features").show(5)

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|4.843929651250149|        5.5|(544,[0,5,268,532...|
|4.843929651250149|        4.5|(544,[0,5,268,532...|
|6.175920954003805|        6.0|(544,[0,5,268,532...|
|4.843929651250149|        3.5|(544,[0,5,268,532...|
|4.843929651250149|        4.0|(544,[0,5,268,532...|
+-----------------+-----------+--------------------+
only showing top 5 rows



In [None]:
# Get error rmse for decision tree predictions on validation data
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on validation data = %g" % rmse)

treeModel = dt_model.stages[1]
# summary only
print(treeModel)

Root Mean Squared Error (RMSE) on validation data = 160.689


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-2027284577216766>[0m in [0;36m<cell line: 6>[0;34m()[0m
[1;32m      4[0m [0mprint[0m[0;34m([0m[0;34m"Root Mean Squared Error (RMSE) on validation data = %g"[0m [0;34m%[0m [0mrmse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m[0m[0m
[0;32m----> 6[0;31m [0mtreeModel[0m [0;34m=[0m [0mdt_model[0m[0;34m.[0m[0mstages[0m[0;34m[[0m[0;36m1[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      7[0m [0;31m# summary only[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      8[0m [0mprint[0m[0;34m([0m[0mtreeModel[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mAttributeError[0m: 'DecisionTreeRegressionModel' object has no attribute 'stages'

In [None]:
# Make predictions on test data with better model.
test_predictions = dt_model.transform(test_data)

# Select example rows to display.
test_predictions.select("prediction", "fare_amount", "features").show(5)

#get rmse of predictions on test data
rmse = evaluator.evaluate(test_predictions)
print("Root Mean Squared Error (RMSE) on test data - Yellow taxis April 22 = %g" % rmse)


+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|9.471475163006883|        9.5|(544,[0,14,289,53...|
|7.669028261384619|        7.5|(544,[0,32,281,53...|
|9.471475163006883|       10.5|(544,[0,7,299,532...|
|9.471475163006883|       10.5|(544,[0,15,272,53...|
|9.471475163006883|        8.5|(544,[0,31,270,53...|
+-----------------+-----------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data - Yellow taxis April 22 = 4.50032
