Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [None]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
spark

In [None]:
files.upload()

In [None]:
!ls

data.csv  sample_data  spark-3.0.3-bin-hadoop2.7  spark-3.0.3-bin-hadoop2.7.tgz


In [None]:
df = spark.read.csv('data.csv', inferSchema= True, header=True)

In [None]:
df.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


#### Missing Values

In [None]:
#if the columns match the unrequired string, it will replace with None
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))

In [None]:
#replace N/A with None, so spark know is missing value
df = df.withColumn("Market Category", replace(col('Market Category'), 'N/A'))

In [None]:
#check every column for missing values
df.select( [count(when(isnan(c) | col(c).isNull(), c).alias(c)) for c in df.columns]  ).show()

+------------------------------------------------------------------------+----------------------------------------------------------------------------+------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------

In [None]:
df = df.drop('Market Category')
df = df.na.drop()
print(df.count(), len(df.columns))

11812 15


#### Build a ML Pipeline using Vector Assembler and RF Regressor

In [None]:
#Use attributes as follow to predict MSRP
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', \
                                       'Number of Doors', 'highway MPG', 'city mpg',\
                                       'Popularity'],\
                            outputCol = 'Attributes')
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')

In [None]:
pipeline = Pipeline(stages = [assembler, regressor])
#save the pipeline
pipeline.write().overwrite().save('RF_pipeline_mtcars')

In [None]:
!ls #piepline is saved

data.csv	    sample_data		       spark-3.0.3-bin-hadoop2.7.tgz
RF_pipeline_mtcars  spark-3.0.3-bin-hadoop2.7


### Hyperparameter tuning

In [None]:
#load the pipeline
pipelineModel = Pipeline.load('RF_pipeline_mtcars')

In [None]:
paramGrid = ParamGridBuilder()\
            .addGrid(regressor.numTrees, [100,500])\
            .build()

In [None]:
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(labelCol='MSRP'),
                          numFolds=5)

### Train and Test

In [None]:
 train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)
 cvModel = crossval.fit(train_data)

In [None]:
#get best Model 
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_9ca8a46aaf5f
RandomForestRegressionModel: uid=RandomForestRegressor_459467eccbcf, numTrees=100, numFeatures=7


In [None]:
pred = cvModel.transform(test_data) #cvModel object is the best model to use
pred.select('MSRP', 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|28030|31189.125385063355|
|30550| 38462.27463001096|
|29350|27805.451507798723|
|27900|27144.066901466143|
|34890|27144.066901466143|
|32990|27144.066901466143|
| 2827| 5327.292289817532|
| 3000| 5327.292289817532|
| 3086| 5360.867401502229|
| 3130| 5360.867401502229|
| 3012| 5364.078575935064|
| 3622| 6412.001784104226|
|22300| 24806.57631196871|
|19400|23444.403593274612|
| 2042|  5966.80906577496|
| 2144|  6733.65112335129|
|49440| 39429.82823384768|
|52640| 39429.82823384768|
|47440| 39564.84796164898|
|58400|  39492.5828065963|
+-----+------------------+
only showing top 20 rows



### Evaluation

In [None]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName: 'mse'})
mae = eval.evaluate(pred, {eval.metricName: 'mae'})
r2 = eval.evaluate(pred, {eval.metricName: 'r2'})

In [None]:
print('RMSE: %.3f' %rmse)
print('MSE: %.3f' %mse)
print('MAE: %.3f' %mae)
print('R2: %.3f' %r2)

RMSE: 35598.588
MSE: 1267259466.410
MAE: 9758.658
R2: 0.787
