# Car Price Prediction Using Spark

## 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [None]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
files.upload()

In [None]:
!ls

data.csv  sample_data  spark-2.4.8-bin-hadoop2.7  spark-2.4.8-bin-hadoop2.7.tgz


In [None]:
data = sc.read.csv('data.csv', inferSchema=True, header=True)

## 2 - Explore/Clean the data

In [None]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [None]:
data.select('Market Category').distinct().show(50)

+--------------------+
|     Market Category|
+--------------------+
|Crossover,Flex Fu...|
|Crossover,Exotic,...|
|Hatchback,Factory...|
|Crossover,Hatchba...|
|Exotic,Flex Fuel,...|
|           Hatchback|
|Crossover,Hatchba...|
|Factory Tuner,Lux...|
|    Hatchback,Diesel|
|    Crossover,Hybrid|
|           Crossover|
|Crossover,Factory...|
|Hatchback,Luxury,...|
|       Diesel,Luxury|
|    Crossover,Diesel|
|              Diesel|
|    Flex Fuel,Diesel|
|Luxury,Performanc...|
|Exotic,Factory Tu...|
|    Hatchback,Hybrid|
|Crossover,Luxury,...|
|Crossover,Flex Fu...|
|  Exotic,Performance|
|Crossover,Luxury,...|
|    Flex Fuel,Hybrid|
|Exotic,Flex Fuel,...|
|Flex Fuel,Luxury,...|
|    Hatchback,Luxury|
|              Hybrid|
|  Performance,Hybrid|
|       Exotic,Luxury|
|    Flex Fuel,Luxury|
|       Luxury,Hybrid|
|           Flex Fuel|
|Luxury,High-Perfo...|
|Crossover,Factory...|
| Crossover,Hatchback|
|Crossover,Exotic,...|
|Crossover,Hatchba...|
|                 N/A|
|Factory Tu

In [None]:
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))
# Replace all strings "N/A" with None in Market Category column
data = data.withColumn('Market Category', replace(col("Market Category"), "N/A")) 

In [None]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [None]:
data = data.drop('Market Category')
data = data.na.drop()
print((data.count(), len(data.columns)))

(11812, 15)


## 3 - Build the VectorAssembler and RandomForestRegressor pipeline

In [None]:
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders',
                                       'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'],
                                       outputCol='Attributes')
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')
pipeline = Pipeline(stages=[assembler, regressor])
pipeline.write().overwrite().save("pipeline")
!ls

data.csv  sample_data		     spark-2.4.8-bin-hadoop2.7.tgz
pipeline  spark-2.4.8-bin-hadoop2.7  spark-warehouse


## 4 - Perform hyperparameter search and cross-validation

In [None]:
pipelineModel = Pipeline.load("pipeline")
paramGrid = ParamGridBuilder() \
  .addGrid(regressor.numTrees, [100, 500]) \
  .build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed = 123)
cvModel = crossval.fit(train_data)

## 5 - Select the best model and output evaluation metrics

In [None]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_02095512be72
RandomForestRegressionModel (uid=RandomForestRegressor_0262aa570834) with 100 trees


In [None]:
pred = cvModel.transform(test_data)
pred.select('MSRP', 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980| 33520.50318815142|
|28030| 33764.56824182833|
|30030| 33764.56824182833|
|32700| 36962.77688336396|
|29350|23317.806801819188|
|31890|28343.501912763575|
|34980|28343.501912763575|
| 2799| 4853.701507592904|
| 2827| 5592.605869786248|
| 3381| 6871.760433015199|
|24450|27636.626726629707|
|21050| 23523.81375519454|
| 2000| 6084.278178374128|
| 2181| 8169.680770744279|
| 2144| 5955.590712228466|
| 2265|  8202.59832982066|
|56780| 39272.98522006589|
|49440| 39272.98522006589|
|50640| 39272.98522006589|
|52640| 39272.98522006589|
+-----+------------------+
only showing top 20 rows



In [None]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName: 'mse'})
mae = eval.evaluate(pred, {eval.metricName: 'mae'})
r2 = eval.evaluate(pred, {eval.metricName: 'r2'})

print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("r2: %.3f" %r2)

RMSE: 16866.995
MSE: 284495514.942
MAE: 8413.007
r2: 0.875


## 6 - Next steps 
To further improve model performance, we can:
- Parse and encode the Market Category column into features for training
- Expand the grid search to more numTrees parameters
- Use different parameters for the gridsearch such as maxDepth
- Try out other algorithms (XGBoost, Neural Networks)
- Increase number of folds for cross-validation