<a href="https://colab.research.google.com/github/snknitin/pyspark-snippets/blob/main/ML_pipelines_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark # adds pyspark to sys path at runtime

In [12]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [13]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [14]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [17]:
sc_path = "/content/drive/MyDrive/datasets/data.csv"
data = sc.read.csv(sc_path, inferSchema=True,header=True)

In [18]:
data.show()

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [19]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Describe and Clean the Dataset

In [20]:
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [21]:
# Replace NA strings as null values since spark doesn't handle it properly
def replace(column,value):
    return when(column!=value,column).otherwise(lit(None))

data = data.withColumn("Market Category",replace(col("Market Category"),"N/A"))

In [24]:
# Which columns have the null values and how many
data.select([count(when(isnan(c)| col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [25]:
# Drop the bad columns
data = data.drop("Market Category")
data = data.na.drop()

# shape
print((data.count(),len(data.columns)))

(11812, 15)


Create a pipeline for ML models

In [40]:
# this will create a pipeline to make a feature vector of the chosen features and added as a column vector
assembler = VectorAssembler(inputCols=['Year','Engine HP','Engine Cylinders','Number of Doors','highway MPG','city mpg','Popularity'], outputCol = "Attributes")

In [41]:
regressor = RandomForestRegressor(featuresCol = 'Attributes', labelCol = "MSRP")

In [42]:
pipeline = Pipeline(stages = [assembler,regressor])
pipeline.write().overwrite().save("pipeline")
!ls

drive  pipeline  sample_data  spark-3.4.2-bin-hadoop3  spark-3.4.2-bin-hadoop3.tgz


Cross Validation

In [43]:
pipelineModel = Pipeline.load('pipeline')
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()

In [44]:
crossval = CrossValidator(estimator = pipelineModel,
                          estimatorParamMaps= paramGrid,
                          evaluator = RegressionEvaluator(labelCol = 'MSRP'),
                          numFolds = 3)

Training and Prediction

In [45]:
train_data, test_data = data.randomSplit([0.8,0.2],seed=1337)
cvModel = crossval.fit(train_data)

In [46]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_632a79fd1f67
RandomForestRegressionModel: uid=RandomForestRegressor_049df6f7f4be, numTrees=500, numFeatures=7


In [47]:
pred = cvModel.transform(test_data)
pred.select('MSRP','prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|30550| 37830.30235614553|
|29200|26660.289843433697|
|29990|26660.289843433697|
|32990|26660.289843433697|
| 3000| 5342.684113977234|
| 3222| 5357.872197239239|
| 3652| 6789.889779772829|
|22600|24938.129351018062|
|20200|22955.897450738426|
| 2265| 7355.805630853488|
| 2506| 7355.805630853488|
|44565|  39698.1501235928|
|56780|  39698.1501235928|
|43015|40022.378804901375|
|44515|40022.378804901375|
|47440|40022.378804901375|
|45950| 39674.44227136011|
|50360| 39674.44227136011|
|36870| 40060.04126978006|
|41870| 40060.04126978006|
+-----+------------------+
only showing top 20 rows



Evaluate Metrics

In [50]:
eval = RegressionEvaluator(labelCol = "MSRP")
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred,{eval.metricName:"mse"})
mae = eval.evaluate(pred,{eval.metricName:"mae"})
r2 = eval.evaluate(pred,{eval.metricName:"r2"})


print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("R2: %.3f" %r2)


RMSE: 18414.564
MSE: 339096149.453
MAE: 9002.796
R2: 0.863
