# Random Forest Classifier in PySpark - Lab

## Introduction  

In this lab, you will build a Random Forest Classifier model to study the ecommerce behavior of consumers from a multi-category store. First, you will need to download the data to your local machine, then you will load the data from the local machine onto a Pandas Dataframe.

## Objectives  

* Use the kaggle eCommerce dataset in PySpark
* Build and train a random forest classifier in PySpark

## Instruction
* Accept the Kaggle policy and download the data from [Kaggle](https://www.kaggle.com/code/tshephisho/ecommerce-behaviour-using-xgboost/data)
* For the first model you will only use the 2019-Nov csv data (which is still around ~2gb zipped)
* You will run this notebook in a new `pyspark-env` environment following [these setup instructions without docker](https://github.com/learn-co-curriculum/dsc-spark-docker-installation)

In [1]:
!pip install pandas



In [2]:
# import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as dates
from datetime import datetime

In [5]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest eCommerce")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .master("local[*]")
    .getOrCreate()
)

Exception: Java gateway process exited before sending its port number

In [6]:
path = "../archive/2019-Nov.csv"  # wherever path you saved the kaggle file to
df = spark.read.csv(path, header=True, inferSchema=True)
df.printSchema()  # to see the schema

NameError: name 'spark' is not defined

If you want to use Pandas to explore the dataset instead of Pyspark, you have to use the `action` functions, which then means there will be a network shuffle. For smaller dataset such as the Iris dataset which is about ~1KB this is no problem. The current dataset may be too large, and may throw an `OutOfMemory` error if you attempt to load the data into a Pandas dataframe. You should only take a few rows for exploratory analysis if you are more comfortable with Pandas. Otherwise, stick with native PySpark functions. 

In [7]:
pd.DataFrame(df.take(10), columns=df.columns).transpose()

NameError: name 'df' is not defined

### Know your Customers

How many unique customers visit the site?

In [8]:
# using native pyspark
from pyspark.sql.functions import countDistinct

df.select(countDistinct("user_id")).show()

NameError: name 'df' is not defined

Did you notice the spark progress bar when you triggered the `action` function? The `show()` function is the `action` function which means the lazy evaluation of Spark was triggered and completed a certain job. `read.csv` should have been another job. If you go to `localhost:4040` you should be able to see 2 completed jobs under the `Jobs` tab, which are `csv` and `showString`. While a heavy job is getting executed, you can take a look at the `Executors` tab to examine the executors completing the tasks in parellel. Now, you may not see if we run this on a local machine, but this behavior should definitely be visible if you're on a cloud system, such as EMR.

### (Optional) Visitors Daily Trend

Does traffic flunctuate by date? Try using the event_time to see traffic, and draw the plots for visualization.

In [None]:
# for event_time you should use a window and groupby a time period
from pyspark.sql.functions import window

Question: You would still like to see the cart abandonment rate using the dataset. What relevant features can we use for modeling?

In [None]:
# your answer

Now, you will start building the model. Add the columns you would like to use for predictor features in the model to the `feature_cols` list

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_cols = []  # columns you'd like to use
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)
df.show()

To use a string column, you can use the `StringIndexer` to encode the column. Update the `inputCol` keyword argument so that you can encode the target feature.

In [None]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(
    inputCol="", outputCol="encoded"
)  # what should we use for the inputCol here?
df = labeler.fit(df).transform(df)
df.show()

Now build the train/test dataset with a 70/30 `randomSplit` and a random seed set to 42

In [None]:
train, test = df.randomSplit()
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Next you need to add in the name of the feature column and the name of the `labelCol` you previously encoded for training the model.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="", labelCol="")
model = rf.fit(train)
predictions = model.transform(test)
# what goes in the select() function?
predictions.select().show(25)

Once the job execution is done, evaluate the model's performance. Add in the `labelCol` below.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

### Extra: Use the confusion matrix to see the other metrics

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = (
    predictions.select(["prediction", "encoded"])
    .withColumn("encoded", F.col("encoded").cast(FloatType()))
    .orderBy("prediction")
)
preds_and_labels = preds_and_labels.select(["prediction", "encoded"])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())