# Random Forest Classifier in PySpark - Lab

## Introduction  

In this lab, you will build a Random Forest Classifier model to study the ecommerce behavior of consumers from a multi-category store. First, you will need to download the data to your local machine, then you will load the data from the local machine onto a Pandas Dataframe.

## Objectives  

* Use the kaggle eCommerce dataset in PySpark
* Build and train a random forest classifier in PySpark

## Instruction
* Accept the Kaggle policy and download the data from [Kaggle](https://www.kaggle.com/code/tshephisho/ecommerce-behaviour-using-xgboost/data)
* For the first model you will only use the 2019-Nov csv data (which is still around ~2gb zipped)
* You will run this notebook in a new `pyspark-env` environment following [these setup instructions without docker](https://github.com/learn-co-curriculum/dsc-spark-docker-installation)

In [1]:
!pip install pandas

Collecting pandas
  Downloading pandas-2.0.3-cp38-cp38-win_amd64.whl.metadata (18 kB)
Collecting python-dateutil>=2.8.2 (from pandas)
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
     ---------------------------------------- 0.0/247.7 kB ? eta -:--:--
     - -------------------------------------- 10.2/247.7 kB ? eta -:--:--
     ------ ------------------------------ 41.0/247.7 kB 487.6 kB/s eta 0:00:01
     -------------------------------------  245.8/247.7 kB 2.1 MB/s eta 0:00:01
     -------------------------------------- 247.7/247.7 kB 1.5 MB/s eta 0:00:00
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2023.3.post1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.1 (from pandas)
  Downloading tzdata-2023.4-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting numpy>=1.20.3 (from pandas)
  Downloading numpy-1.24.4-cp38-cp38-win_amd64.whl.metadata (5.6 kB)
Collecting six>=1.5 (from python-dateutil>=2.8.2->pandas)
  Downloading six-1.16.0-py2.py3

In [2]:
# import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as dates
from datetime import datetime

In [3]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest eCommerce")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .master("local[*]")
    .getOrCreate()
)

ModuleNotFoundError: No module named 'pyspark'

In [None]:
path = "../archive/2019-Nov.csv"  # wherever path you saved the kaggle file to
df = spark.read.csv(path, header=True, inferSchema=True)
df.printSchema()  # to see the schema

If you want to use Pandas to explore the dataset instead of Pyspark, you have to use the `action` functions, which then means there will be a network shuffle. For smaller dataset such as the Iris dataset which is about ~1KB this is no problem. The current dataset may be too large, and may throw an `OutOfMemory` error if you attempt to load the data into a Pandas dataframe. You should only take a few rows for exploratory analysis if you are more comfortable with Pandas. Otherwise, stick with native PySpark functions. 

In [4]:
pd.DataFrame(df.take(10), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
event_time,2019-11-01 00:00:00 UTC,2019-11-01 00:00:00 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:02 UTC,2019-11-01 00:00:02 UTC,2019-11-01 00:00:02 UTC
event_type,view,view,view,view,view,view,view,view,view,view
product_id,1003461,5000088,17302664,3601530,1004775,1306894,1306421,15900065,12708937,1004258
category_id,2053013555631882655,2053013566100866035,2053013553853497655,2053013563810775923,2053013555631882655,2053013558920217191,2053013558920217191,2053013558190408249,2053013553559896355,2053013555631882655
category_code,electronics.smartphone,appliances.sewing_machine,,appliances.kitchen.washer,electronics.smartphone,computers.notebook,computers.notebook,,,electronics.smartphone
brand,xiaomi,janome,creed,lg,xiaomi,hp,hp,rondell,michelin,apple
price,489.07,293.65,28.31,712.87,183.27,360.09,514.56,30.86,72.72,732.07
user_id,520088904,530496790,561587266,518085591,558856683,520772685,514028527,518574284,532364121,532647354
user_session,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33,8e5f4f83-366c-4f70-860e-ca7417414283,755422e7-9040-477b-9bd2-6a6e8fd97387,3bfb58cd-7892-48cc-8020-2f17e6de6e7f,313628f1-68b8-460d-84f6-cec7a8796ef2,816a59f3-f5ae-4ccd-9b23-82aa8c23d33c,df8184cc-3694-4549-8c8c-6b5171877376,5e6ef132-4d7c-4730-8c7f-85aa4082588f,0a899268-31eb-46de-898d-09b2da950b24,d2d3d2c6-631d-489e-9fb5-06f340b85be0


### Know your Customers

How many unique customers visit the site?

In [5]:
# using native pyspark
from pyspark.sql.functions import countDistinct

df.select(countDistinct("user_id")).show()

                                                                                

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                3696117|
+-----------------------+



Did you notice the spark progress bar when you triggered the `action` function? The `show()` function is the `action` function which means the lazy evaluation of Spark was triggered and completed a certain job. `read.csv` should have been another job. If you go to `localhost:4040` you should be able to see 2 completed jobs under the `Jobs` tab, which are `csv` and `showString`. While a heavy job is getting executed, you can take a look at the `Executors` tab to examine the executors completing the tasks in parellel. Now, you may not see if we run this on a local machine, but this behavior should definitely be visible if you're on a cloud system, such as EMR.

### (Optional) Visitors Daily Trend

Does traffic flunctuate by date? Try using the event_time to see traffic, and draw the plots for visualization.

In [None]:
# for event_time you should use a window and groupby a time period
from pyspark.sql.functions import window

Question: You would still like to see the cart abandonment rate using the dataset. What relevant features can we use for modeling?

In [None]:
# your answer

Now, you will start building the model. Add the columns you would like to use for predictor features in the model to the `feature_cols` list

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_cols = []  # columns you'd like to use
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)
df.show()

To use a string column, you can use the `StringIndexer` to encode the column. Update the `inputCol` keyword argument so that you can encode the target feature.

In [None]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(
    inputCol="", outputCol="encoded"
)  # what should we use for the inputCol here?
df = labeler.fit(df).transform(df)
df.show()

Now build the train/test dataset with a 70/30 `randomSplit` and a random seed set to 42

In [None]:
train, test = df.randomSplit()
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Next you need to add in the name of the feature column and the name of the `labelCol` you previously encoded for training the model.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="", labelCol="")
model = rf.fit(train)
predictions = model.transform(test)
# what goes in the select() function?
predictions.select().show(25)

Once the job execution is done, evaluate the model's performance. Add in the `labelCol` below.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

### Extra: Use the confusion matrix to see the other metrics

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = (
    predictions.select(["prediction", "encoded"])
    .withColumn("encoded", F.col("encoded").cast(FloatType()))
    .orderBy("prediction")
)
preds_and_labels = preds_and_labels.select(["prediction", "encoded"])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())