# Random Forest with Pyspark Introduction  

## Introduction

In this lesson, you will walk through how to use PySpark for the classification of Iris flowers with a Random Forest Classifier. The dataset is located under the `data` folder.

## Objectives  

* Read a dataset into a PySpark DataFrame
* Implement a random forest classifier with PySpark

> Before continuing, check the version of PySpark installed on the machine. It should be above 3.1.
> 
> You will run this notebook in a `pyspark-env` environment following [these setup instructions without docker](https://github.com/learn-co-curriculum/dsc-spark-docker-installation)

In [4]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest Iris").master("local[*]").getOrCreate()
)

ModuleNotFoundError: No module named 'pyspark'

After version 3.0, `SparkSession` is the main entry point for Spark. `SparkSession.builder` creates a spark session. Any thing can go into the `appName()` to specify which jobs you are running currently. Once the spark session is instantiated, if you are running on your local machine, you can access the Spark UI at `localhost:4040` to view jobs.

In [None]:
df = spark.read.csv("./data/IRIS.csv", header=True, inferSchema=True)
df.printSchema()  # to see the schema

In [None]:
df.show()  # or df.show(Truncate=false) if you'd like to see all the contents

Check to see what the type is for the DataFrame you have loaded.

In [None]:
type(df)

Go ahead and run some exploratory data analysis on the dataset. You can easily turn the PySpark DataFrame into a Pandas DataFrame.

In [None]:
import pandas as pd

pandas_df = pd.DataFrame(df.take(100), columns=df.columns)
pandas_df.describe()

In [None]:
pandas_df.dtypes

Once the exploratory data analysis is done, you can start feature transforming to prepare for feataure engineering. Feature transforming means scaling, modifying features to be used for train/test validation, and converting. For this purpose, you will use the `VectorAssembler` in PySpark.

In [None]:
from pyspark.ml.feature import VectorAssembler

numeric_cols = [
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
]  # insert numeric cols
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df)  # just use the same dataframe
df.show()

This should have created another column in your dataframe called `features` as you have denoted in `outputCol`. You can use the `StringIndexer` to encode the string column of species to a label index. By default, the labels are assigned according to the frequencies (for imbalanced dataset). The most frequent species would get an index of 0. For a balanced dataset, whichever string appears first will get 0, then so on.

In [None]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(inputCol="species", outputCol="encoded")
df = labeler.fit(df).transform(df)
df.show()

The DataFrame now has a new column named `encoded` with new values populated. You can check the new columns have been added to the PySpark DataFrame by creating a new Pandas DataFrame

In [None]:
pd.DataFrame(df.take(10), columns=df.columns)

Now you have transformed the data as needed. To begin building your model, you need to split the data into a train/test dataset.

In [None]:
train, test = df.randomSplit(
    [0.7, 0.3], seed=42
)
print(f"Train dataset count: {str(train.count())}")
print(f"Test dataset count: {str(test.count())}")

Next you will need to instantiate the `RandomForestClassifier` and train the model. At this point before you run the next cell, open up the Spark UI by typing `localhost:4040` into your browser, then navigating to the executors tab.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="encoded")
model = rf.fit(train)
predictions = model.transform(test)

`featuresCol` is the list of features of the dataframe, which means if you have more features you'd like to include, you could put in a list. You create the model by fitting on the training dataset, then validate it by making predictions on the test dataset. `model.transform(test)` will create new columns, like `rawPrediction`, `prediction`, and `probability`.

In [None]:
# if the columns names here are different, do a `printSchema` on top of predictions to see the correct column names
predictions.select(
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
    "encoded",
    "rawPrediction",
    "prediction",
    "probability",
)

You have a trained model, go ahead and evaluate the model by using the `MulticlassClassificationEvaluator`.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="encoded", predictionCol="prediction"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}%")
test_error = 1.0 - accuracy
print(f"Test Error = {test_error}")

As you can see, the model performs with 97.8% accuracy and has a test error of 0.021. 