# PySpark with RandomForest Model 

![](https://miro.medium.com/max/600/1*5C4UQznqEiN3D6Xutlgwlg.png)

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

!tar xf spark-3.0.3-bin-hadoop2.7.tgz

!pip install -q findspark

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [7]:
# to initiate a Spark session using all available cores
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
files.upload()

In [9]:
!ls

'data (1).csv'	 sample_data		     spark-3.0.3-bin-hadoop2.7.tgz
 data.csv	 spark-3.0.3-bin-hadoop2.7


In [10]:
data = sc.read.csv('/content/data.csv',header=True, inferSchema=True)

In [11]:
data.show()

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [12]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [13]:
data.describe().show()

+-------+-----+------------------+------------------+----------------+------------------+-----------------+-----------------+----------------+------------------+------------------+------------+-------------+------------------+-----------------+------------------+------------------+
|summary| Make|             Model|              Year|Engine Fuel Type|         Engine HP| Engine Cylinders|Transmission Type|   Driven_Wheels|   Number of Doors|   Market Category|Vehicle Size|Vehicle Style|       highway MPG|         city mpg|        Popularity|              MSRP|
+-------+-----+------------------+------------------+----------------+------------------+-----------------+-----------------+----------------+------------------+------------------+------------+-------------+------------------+-----------------+------------------+------------------+
|  count|11914|             11914|             11914|           11911|             11845|            11884|            11914|           11914|         

In [15]:
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [18]:
def replace_nan (column, value):
  return when(column != value, column).otherwise(lit(None))

data = data.withColumn('Market Category', replace_nan(col('Market Category'),'N/A'))

In [20]:
data.select([count(when (isnan(c) | col (c).isNull(), c)). alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [21]:
print(f'Before dropping the missing values {data.count()}, {len(data.columns)}')

data = data.drop('Market Category')

data = data.na.drop()

print(f'After dropping the missing values {data.count()}, {len(data.columns)}')

Before dropping the missing values 11914, 16
After dropping the missing values 11812, 15


In [28]:
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 
                                       'Number of Doors', 'highway MPG', 
                                       'city mpg', 'Popularity' ], outputCol='Attributes')

In [29]:
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')

In [30]:
pipeline= Pipeline(stages= [assembler, regressor])
pipeline.write().overwrite().save('pipeline')

In [31]:
!ls

'data (1).csv'	 pipeline      spark-3.0.3-bin-hadoop2.7
 data.csv	 sample_data   spark-3.0.3-bin-hadoop2.7.tgz


In [32]:
pipelineModel= Pipeline.load('pipeline')

paramGrid = ParamGridBuilder() \
.addGrid(regressor.numTrees, [100,500]) \
.build()

crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

In [33]:
train, test = data.randomSplit([0.7, 0.3], seed=6)

cvModel = crossval.fit(train)

In [34]:
bestModel = cvModel.bestModel

for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_3263b6690539
RandomForestRegressionModel: uid=RandomForestRegressor_e839bc82f1d6, numTrees=500, numFeatures=7


In [35]:
preds = cvModel.transform(test)
preds.select('MSRP','prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|   31627.009658729|
|30030|31829.253806594304|
|28200|31847.995119921612|
|30550|37305.112598291395|
|27050| 23265.88838009858|
|29200| 27992.19238576927|
|29290| 27992.19238576927|
|31980| 27992.19238576927|
|32990| 27992.19238576927|
|34600|22689.802576652553|
| 2827|4917.5536795962735|
| 3652| 5951.637329820936|
| 7398|7097.4536097597575|
|22300|24569.897357671758|
|20200|22269.379222260734|
| 2000| 5177.807730133248|
| 2000| 5140.642299987037|
| 2042| 4575.819198845447|
| 2356|5134.4254657107995|
| 2066| 5474.736453659591|
+-----+------------------+
only showing top 20 rows



In [37]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(preds)
mse = eval.evaluate(preds, {eval.metricName: 'mse'})
mae=  eval.evaluate(preds, {eval.metricName: 'mae'})
r2 = eval.evaluate(preds, {eval.metricName: 'r2'})

print(f'RMSE : {round(rmse),3}')

print(f'MSE : {round(mse),3}')

print(f'MAE : {round(mae),3}')

print(f'R2 : {r2}')






RMSE : (24480, 3)
MSE : (599251279, 3)
MAE : (9264, 3)
R2 : 0.8736507260214144
