### Problem Statement

We are using the Cars dataset with features including make, model, year, engine, and other properties of the car to predict its price.

[Data Source](https://www.kaggle.com/CooperUnion/cardataset)

### Notebook Setup

Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [3]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [17]:
#files.upload()

In [5]:
!ls

'data (1).csv'	 spark-2.4.6-bin-hadoop2.7
'data (2).csv'	 spark-2.4.6-bin-hadoop2.7.tgz
'data (3).csv'	 spark-2.4.6-bin-hadoop2.7.tgz.1
 data.csv	 spark-2.4.6-bin-hadoop2.7.tgz.2
 pipeline	 spark-2.4.6-bin-hadoop2.7.tgz.3
 sample_data


In [6]:
data = sc.read.csv('data.csv', inferSchema=True, header=True)

### Describe & Clean Dataset

In [7]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


As we can see above, there are 16 variables and a max of 11,914 observations. Some columns, like Engine HP and Number of Door have missing values and some columns, like Market Category, contain the value N/A which is not recognized as a null value by Spark and thus must be re-coded.

In [8]:
#Replace N/A with None
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))

data = data.withColumn("Market Category", replace(col("Market Category"), "N/A"))

In [9]:
#Count missing values
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



Notice that the Market Category column had 3,742 missing values. We will remove this column from the dataset as this makes up more than a third of the total observations. 

We will also drop observations with missing values for the other columns.

In [10]:
data = data.drop('Market Category')
data = data.na.drop()
print((data.count(), len(data.columns)))

(11812, 15)


There are now 11,812 observations of cars and 15 features.

### Create Random Forest Pipepline

In [11]:
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders',
                                       'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'],
                                       outputCol='Attributes')
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')
pipeline = Pipeline(stages=[assembler, regressor])
pipeline.write().overwrite().save("pipeline")
!ls

'data (1).csv'	 spark-2.4.6-bin-hadoop2.7
'data (2).csv'	 spark-2.4.6-bin-hadoop2.7.tgz
'data (3).csv'	 spark-2.4.6-bin-hadoop2.7.tgz.1
 data.csv	 spark-2.4.6-bin-hadoop2.7.tgz.2
 pipeline	 spark-2.4.6-bin-hadoop2.7.tgz.3
 sample_data


### Hyperparameter Tuning

In [12]:
pipelineModel = Pipeline.load("pipeline")
paramGrid = ParamGridBuilder() \
  .addGrid(regressor.numTrees, [100,500]) \
  .build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

### Train Model

In [13]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)
cvModel = crossval.fit(train_data)

In [14]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_aa059d78ff65
RandomForestRegressionModel (uid=RandomForestRegressor_5c1504c6d843) with 100 trees


In [15]:
pred = cvModel.transform(test_data)
pred.select('MSRP','prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|30938.520989171364|
|28030|31044.996623751053|
|30030|31044.996623751053|
|32700| 36143.01721391118|
|29350|24093.561057835173|
|31890|28529.330327273983|
|34980|28529.330327273983|
| 2799| 5111.214323991207|
| 2827|4980.2345328385845|
| 3381|5793.2328144324865|
|24450| 27004.04813030287|
|21050| 23061.62141201935|
| 2000| 5255.075659859029|
| 2181| 7637.455103651334|
| 2144| 5280.606139739243|
| 2265| 7667.741098810895|
|56780|39210.747445541776|
|49440|  39209.9225934399|
|50640|  39209.9225934399|
|52640|  39209.9225934399|
+-----+------------------+
only showing top 20 rows



### Evaluate Model Performance

In [16]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName: "mse"})
mae = eval.evaluate(pred, {eval.metricName: "mae"})
r2 = eval.evaluate(pred, {eval.metricName: "r2"})

print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("r2: %.3f" %r2)

RMSE: 16607.248
MSE: 275800697.566
MAE: 8282.264
r2: 0.879


### Conclusion

From the evaluation metrics, we can see that the model does a reasonable job of predicting car prices based on the features. 87.8% of the variance in price can be explained by the Random Forest Regression model
