# Random forest classifcation using Spark

This notebook tries to address the following problem:

*(Bonus) Train your learning algorithm for one of the above questions in a distributed fashion, such as using Spark. Here, you can assume either the data or the model is too large/efficient to be process in a single computer.*


Although in the original solution I tried different classifiers and ensemble, int his spark based one, I am only using RandomForest for simplicity. I don't have too much exposure to Spark's machine learning libraries, so this ia quite a bit of an exploration for me.

Although here I intended to generate all features (text and non-text), for limited time, I will finally be using only the text features for classification purpose.

## Necessary imports

In [65]:
import datetime
import calendar
import numpy as np
    
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder

**Initiate sark session**

In [25]:
spark = SparkSession.builder.getOrCreate()

**Read n the data in datafarames and create temp views**

In [26]:
df_train = spark.read.format('csv').options(header='true', inferschema='true').load('training_data_example.csv')

df_val = spark.read.format('csv').options(header='true', inferschema='true').load('validation_data_example.csv')

df_employee = spark.read.format('csv').options(header='true', inferschema='true').load('employee.csv')
df_employee = df_employee.withColumnRenamed('employee id', 'employee_id')
df_employee.createOrReplaceTempView('employee')

In [27]:
lencoder = LabelEncoder()
lencoder.fit(df_train.select('category').rdd.map(lambda x: x[0]).collect())
names = set(df_val.select('category').rdd.map(lambda x: x[0]).collect()) # label names to be used later
y_train = lencoder.transform(df_train.select('category').rdd.map(lambda x: x[0]).collect())
y_val = lencoder.transform(df_val.select('category').rdd.map(lambda x: x[0]).collect())
val_categoroes = []
for clazz in lencoder.classes_:
    if clazz in names:
        val_categoroes.append(clazz)

**Define some UDFs to use in spark SQL**

In [28]:
def get_weekday(date):
    month, day, year = (int(x) for x in date.split('/'))    
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]

def get_month(date):
    month, day, year = (int(x) for x in date.split('/'))    
    return month

spark.udf.register('get_weekday', get_weekday)
spark.udf.register('get_month', get_month)


<function __main__.get_month(date)>

**Next define some tokenizer and vectorizer for the TF-IDF vectorization purpose**

In [29]:
tokenizer = Tokenizer(inputCol="expense_description", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)

**Define a couple of methods for getting a bit more formatted features and vectorization**

In [30]:
def pre_process(df, table):
    df = df.withColumnRenamed('employee id', 'employee_id') \
         .withColumnRenamed('expense description', 'expense_description') \
         .withColumnRenamed('pre-tax amount', 'pre_tax_amount') \
         .withColumnRenamed('tax amount', 'tax_amount') \
         .withColumnRenamed('tax name', 'tax_name')
            
    df.createOrReplaceTempView(table)
    
    df = spark.sql("""
        select employee_id,
           get_weekday(date) as weekday,
           cast(get_month(date) as int) as month,
           pre_tax_amount,
           role,
           expense_description,
           case when category = 'Computer - Hardware' then 0
               when category = 'Computer - Software'  then 1
               when category = 'Meals and Entertainment' then 2
               when category = 'Office Supplies' then 3
               else 4
           end as category
        from 
        {table} 
        inner join employee using(employee_id)
    
    """.format(table=table))
    
    return df

def vectorize(df):
    wordsData = tokenizer.transform(df)
    
    featurizedData = hashingTF.transform(wordsData)

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)

    rescaledData = rescaledData.select("features", "category")
    return rescaledData

**Get the features with pre-processing and vectorization done**

In [31]:
df_train = pre_process(df_train, 'train')
data_train = vectorize(df_train)
df_val = pre_process(df_val, 'validation')
data_val = vectorize(df_val)

**Tranform the data in a way so that it can be fed to Spark's RandomForest Classifier**

In [32]:
data_train_rdd = data_train.rdd.map(lambda x: LabeledPoint(x.category, MLLibVectors.fromML(x.features)))
data_val_rdd = data_val.rdd.map(lambda x: LabeledPoint(x.category, MLLibVectors.fromML(x.features)))

**Train model and make prediction**

In [33]:
model = RandomForest.trainClassifier(data_train_rdd, numClasses=5, categoricalFeaturesInfo={},
                                     numTrees=50, featureSubsetStrategy="sqrt",
                                     impurity='gini', maxDepth=3, maxBins=32, seed=1)

In [34]:
train_predictions = model.predict(data_train_rdd.map(lambda x: x.features))

In [35]:
val_predictions = model.predict(data_val_rdd.map(lambda x: x.features))

In [36]:
train_labelsAndPredictions = data_train_rdd.map(lambda lp: lp.label).zip(train_predictions)
val_labelsAndPredictions = data_val_rdd.map(lambda lp: lp.label).zip(val_predictions)

In [64]:
actual = np.array(data_train_rdd.map(lambda lp: lp.label).collect())
predictions = np.array(train_predictions.collect())
train_accuracy = sum(actual == predictions) / float(len(actual))

actual = np.array(data_val_rdd.map(lambda lp: lp.label).collect())
predictions = np.array(val_predictions.collect())
val_accuracy = sum(actual == predictions) / float(len(actual))

print('Training accuracy = ' + str(train_accuracy))
print('Validation accuracy = ' + str(val_accuracy))
# print('Learned classification forest model:')
# print(model.toDebugString())

Training accuracy = 0.875
Validation accuracy = 0.833333333333


**So it turns out the training accuracy is 87.5% and validation accuracy is 83.33%**

**Let's also print the classification report with precision, recall and f1-score**

In [38]:
print(classification_report(val_predictions.collect(), df_val.select('category').rdd.map(lambda x:x[0]).collect(), target_names=val_categoroes))

                         precision    recall  f1-score   support

    Computer - Hardware       1.00      1.00      1.00         1
Meals and Entertainment       1.00      0.78      0.88         9
        Office Supplies       0.00      0.00      0.00         0
                 Travel       1.00      1.00      1.00         2

            avg / total       1.00      0.83      0.91        12



  'recall', 'true', average, warn_for)
