# **Task 1**  
Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [24]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# install findspark using pip
!pip install -q findspark

In [25]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [26]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# files.upload()

# **Task 2**  
Describe and clean dataset   

In [28]:
data = sc.read.csv('data.csv', inferSchema=True, header=True)

In [29]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [30]:
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [31]:
print((data.count(), len(data.columns)))

(11914, 16)


In [32]:
def replace(column, value):
   return when(column != value, column).otherwise(lit(None))

data.withColumn("Market Category", replace(col("Market category"), "N/A"))

DataFrame[Make: string, Model: string, Year: int, Engine Fuel Type: string, Engine HP: int, Engine Cylinders: int, Transmission Type: string, Driven_Wheels: string, Number of Doors: int, Market Category: string, Vehicle Size: string, Vehicle Style: string, highway MPG: int, city mpg: int, Popularity: int, MSRP: int]

In [33]:
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [34]:
data.select([count(when(isnan(c) | col(c).isNull(), c).alias(c)) for c in data.columns]).show()

+------------------------------------------------------------------------+----------------------------------------------------------------------------+------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------

In [35]:
data = data.drop('Market Category')
data = data.na.drop()
print((data.count(), len(data.columns)))

(11812, 15)


# **Task 3**  
Create a Random Forest pipeline to predict car prices

In [36]:
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'],
                            outputCol='Attributes')

In [37]:
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')

In [38]:
pipeline = Pipeline(stages=[assembler, regressor])

In [39]:
pipeline.write().overwrite().save("pipeline")

# **Task 4**  
Create a cross validator for hyperparameter tuning

In [40]:
pipelineModel = Pipeline.load("pipeline")
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [100, 500]).build()

In [41]:
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

# **Task 5**  
Train model and predict test set car prices

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)
cvModel = crossval.fit(train_data)

In [None]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

In [None]:
pred = cvModel.transform(test_data)
pred.select('MSRP', 'prediction').show()

# **Task 6**  
Evaluate model’s performance via several metrics

In [None]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred)
mse  = eval.evaluate(pred, {eval.metricName: "mse"})
mae  = eval.evaluate(pred, {eval.metricName: "mae"})
r2   = eval.evaluate(pred, {eval.metricName: "r2"})

print("RMSE: %3.f" % rmse)
print("MSE : %3.f" % mse)
print("MAE : %3.f" % mae)
print("R2  : %3.f" % r2)