# Random Forest with Pyspark Introduction  

## Introduction

In this lesson, you will walk through how to use PySpark for the classification of Iris flowers with a Random Forest Classifier. The dataset is located under the `data` folder.

## Objectives  

* Read a dataset into a PySpark DataFrame
* Implement a random forest classifier with PySpark

> Before continuing, check the version of PySpark installed on the machine. It should be above 3.1.
> 
> You will run this notebook in a `pyspark-env` environment following [these setup instructions without docker](https://github.com/learn-co-curriculum/dsc-spark-docker-installation)

In [1]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest Iris").master("local[*]").getOrCreate()
)

23/09/07 11:44:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


After version 3.0, `SparkSession` is the main entry point for Spark. `SparkSession.builder` creates a spark session. Any thing can go into the `appName()` to specify which jobs you are running currently. Once the spark session is instantiated, if you are running on your local machine, you can access the Spark UI at `localhost:4040` to view jobs.

In [2]:
df = spark.read.csv("./data/IRIS.csv", header=True, inferSchema=True)
df.printSchema()  # to see the schema

                                                                                

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [3]:
df.show()  # or df.show(Truncate=false) if you'd like to see all the contents

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

Check to see what the type is for the DataFrame you have loaded.

In [4]:
type(df)

pyspark.sql.dataframe.DataFrame

Go ahead and run some exploratory data analysis on the dataset. You can easily turn the PySpark DataFrame into a Pandas DataFrame.

In [5]:
import pandas as pd

pandas_df = pd.DataFrame(df.take(100), columns=df.columns)
pandas_df.describe()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
count,100.0,100.0,100.0,100.0
mean,5.471,3.094,2.862,0.785
std,0.641698,0.476057,1.448565,0.566288
min,4.3,2.0,1.0,0.1
25%,5.0,2.8,1.5,0.2
50%,5.4,3.05,2.45,0.8
75%,5.9,3.4,4.325,1.3
max,7.0,4.4,5.1,1.8


In [6]:
pandas_df.dtypes

sepal_length    float64
sepal_width     float64
petal_length    float64
petal_width     float64
species          object
dtype: object

Once the exploratory data analysis is done, you can start feature transforming to prepare for feataure engineering. Feature transforming means scaling, modifying features to be used for train/test validation, and converting. For this purpose, you will use the `VectorAssembler` in PySpark.

In [7]:
from pyspark.ml.feature import VectorAssembler

numeric_cols = [
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
]  # insert numeric cols
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df)  # just use the same dataframe
df.show()

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|
+------------+-----------+------------+-----------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|[4.4,2.9,1.4,0.2]|
|         4.9|  

This should have created another column in your dataframe called `features` as you have denoted in `outputCol`. You can use the `StringIndexer` to encode the string column of species to a label index. By default, the labels are assigned according to the frequencies (for imbalanced dataset). The most frequent species would get an index of 0. For a balanced dataset, whichever string appears first will get 0, then so on.

In [8]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(inputCol="species", outputCol="encoded")
df = labeler.fit(df).transform(df)
df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|encoded|
+------------+-----------+------------+-----------+-----------+-----------------+-------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|    0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|    0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|    0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|    0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|    0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|    0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|    0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|    0.0|
|         

The DataFrame now has a new column named `encoded` with new values populated. You can check the new columns have been added to the PySpark DataFrame by creating a new Pandas DataFrame

In [9]:
pd.DataFrame(df.take(10), columns=df.columns)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species,features,encoded
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0
5,5.4,3.9,1.7,0.4,Iris-setosa,"[5.4, 3.9, 1.7, 0.4]",0.0
6,4.6,3.4,1.4,0.3,Iris-setosa,"[4.6, 3.4, 1.4, 0.3]",0.0
7,5.0,3.4,1.5,0.2,Iris-setosa,"[5.0, 3.4, 1.5, 0.2]",0.0
8,4.4,2.9,1.4,0.2,Iris-setosa,"[4.4, 2.9, 1.4, 0.2]",0.0
9,4.9,3.1,1.5,0.1,Iris-setosa,"[4.9, 3.1, 1.5, 0.1]",0.0


Now you have transformed the data as needed. To begin building your model, you need to split the data into a train/test dataset.

In [10]:
train, test = df.randomSplit(
    [0.7, 0.3], seed=42
)
print(f"Train dataset count: {str(train.count())}")
print(f"Test dataset count: {str(test.count())}")

Train dataset count: 104
Test dataset count: 46


Next you will need to instantiate the `RandomForestClassifier` and train the model. At this point before you run the next cell, open up the Spark UI by typing `localhost:4040` into your browser, then navigating to the executors tab.

In [11]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="encoded")
model = rf.fit(train)
predictions = model.transform(test)

`featuresCol` is the list of features of the dataframe, which means if you have more features you'd like to include, you could put in a list. You create the model by fitting on the training dataset, then validate it by making predictions on the test dataset. `model.transform(test)` will create new columns, like `rawPrediction`, `prediction`, and `probability`.

In [12]:
# if the columns names here are different, do a `printSchema` on top of predictions to see the correct column names
predictions.select(
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
    "encoded",
    "rawPrediction",
    "prediction",
    "probability",
)

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, encoded: double, rawPrediction: vector, prediction: double, probability: vector]

You have a trained model, go ahead and evaluate the model by using the `MulticlassClassificationEvaluator`.

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="encoded", predictionCol="prediction"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}%")
test_error = 1.0 - accuracy
print(f"Test Error = {test_error}")

Accuracy: 0.9571428571428571%
Test Error = 0.04285714285714293


As you can see, the model performs with 97.8% accuracy and has a test error of 0.021. 