# PySpark Case Demo - Products Recommendation using Random Forest ###########

# Question 
## What is the probability that a given customer will like a certain product?

<br>


# The Data 
### 1) The data set includes both product and customer Info
###  2) Feedback column is customer's response to a certain product 
### 3) All mock-up stuff, only to highlight what's possible using PySpark scripts
<br>

![caption](data_head.png)

<br>
<br>


# Steps covered in this notebook: 
### 1) data pre-processing
### 2) Using ml random forest classifier for binary classification 
### 3) Measuring performance using AUC score 
### 4) Different strategies to handle the problem of unbalanced dataset
<br>
<br>


# PySpark, start!

### Load libraries

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
import numpy as np
import functools
from pyspark.ml.feature import OneHotEncoder

### Read in data

In [None]:
# ensure all the 'context' things will be taken care of by your Big Data team
tableData = sqlContext.table('db.your_table_containing_products_feedback_information')

In [3]:
cols_select = ['prod_price',
               'prod_feat_1',
               'prod_feat_2',
               'cust_age',
               'prod_feat_3',
               'cust_region',
               'prod_type',
               'cust_sex',
               'cust_title',
               'feedback']
df = tableData.select(cols_select).dropDuplicates()

### Data preprocessing - Combine 'Neutral's and 'Negative's into 'Negative's

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline
print("Histograms below: ")

responses = df.groupBy('feedback').count().collect() # list of Rows
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]

ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width=width, color='r')

plt.ylabel('counts')
plt.title('Response distribution')
plt.xticks(ind + width/2., categories)

Histograms below: 

![caption](files/hist.png)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
binarize = lambda x: 'Negative' if x == 'Neutral' else x
 
udfValueToCategory = udf(binarize, StringType())
df = df.withColumn("binary_response", udfConvertResponse("feedback"))

### Filling NA values and casting data types

In [None]:
cols_select = ['prod_price',
               'prod_feat_1',
               'prod_feat_2',
               'cust_age',
               'prod_feat_3',
               'cust_region',
               'prod_type',
               'cust_sex',
               'cust_title',
               'feedback',
               'binary_response']
 
df = df.select(df.prod_price.cast('float'), # convert numeric cols (int or float) into a 'int' or 'float'
               df.prod_feat_1.cast('float'),
               df.prod_feat_2.cast('float'),
               df.cust_age.cast('int'),
               *cols_select[4:])
 
df = df.fillna({'cust_region': 'NA', 'cust_title': 'NA', 'prod_type': 'NA'}) # fill in 'N/A' entries for certain cols

### Convert categorical col that has too many discrete values - 'prod_feat_3'

In [None]:
for col in df.columns[4:-2]:
    print(col, df.select(col).distinct().count())

prod_feat_3 553
cust_region 12
prod_type 35
cust_sex 2
cust_title 12

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
COUNT_THRESHOLD = 150 # threshold to filter 
 
# create a temporary col "count" as counting for each value of "prod_feat_3"
prodFeat3Count = df.groupBy("prod_feat_3").count()
df = df.join(prodFeat3Count, "prod_feat_3", "inner")
 
def convertMinority(originalCol, colCount):
    if colCount > COUNT_THRESHOLD:
        return originalCol
    else:
        return 'MinorityCategory'
    
createNewColFromTwo = udf(convertMinority, StringType())
df = df.withColumn('prod_feat_3_reduced', createNewColFromTwo(df['prod_feat_3'], df['count']))
df = df.drop('prod_feat_3')
df = df.drop('count')

### One-hot encodings

In [None]:
column_vec_in = ['prod_feat_3_reduced', 'cust_region', 'prod_type', 'cust_sex', 'cust_title']
column_vec_out = ['prod_feat_3_reduced_catVec','cust_region_catVec', 'prod_type_catVec','cust_sex_catVec',
'cust_title_catVec']
 
indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp') for x in column_vec_in ]
 
encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y)
            for x,y in zip(column_vec_in, column_vec_out)]

tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]

### Prepare labeled set - 'features' and 'label

In [None]:
# prepare labeled sets
cols_now = ['prod_price',
            'prod_feat_1',
            'prod_feat_2',
            'cust_age',
            'prod_feat_3_reduced_catVec',
            'cust_region_catVec',
            'prod_type_catVec',
            'cust_sex_catVec',
            'cust_title_catVec']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='binary_response', outputCol="label")
tmp += [assembler_features, labelIndexer]
pipeline = Pipeline(stages=tmp)

### Split into training and validation sets - remember to set seed

In [None]:
allData = pipeline.fit(df).transform(df)
allData.cache()
trainingData, testData = allData.randomSplit([0.8,0.2], seed=0) # need to ensure same split for each time
print("Distribution of Pos and Neg in trainingData is: ", trainingData.groupBy("label").count().take(3))

Distribution of Pos and Neg in trainingData is:  [Row(label=1.0, count=144014), Row(label=0.0, count=520771)]

### Train and prediction

In [None]:
rf = RF(labelCol='label', featuresCol='features', numTrees=200)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

### AUC

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

The ROC score is (@numTrees=200): 0.6425143766095695

### Visualize AUC

In [None]:
from sklearn.metrics import roc_curve, auc
 
fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
%matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

![caption](files/auc.png)

### Plot distribution of predictions

In [None]:
all_probs = transformed.select("probability").collect()
pos_probs = [i[0][0] for i in all_probs]
neg_probs = [i[0][1] for i in all_probs]
 
from matplotlib import pyplot as plt
%matplotlib inline
 
# pos
plt.hist(pos_probs, 50, normed=1, facecolor='green', alpha=0.75)
plt.xlabel('predicted_values')
plt.ylabel('Counts')
plt.title('Probabilities for positive cases')
plt.grid(True)
plt.show()
 
# neg
plt.hist(neg_probs, 50, normed=1, facecolor='green', alpha=0.75)
plt.xlabel('predicted_values')
plt.ylabel('Counts')
plt.title('Probabilities for negative cases')
plt.grid(True)
plt.show()

![caption](files/neg.png)
![caption](files/pos.png)

### Down - sampling: reduce samples of negative cases

In [None]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
 
RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample
 
counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
higherBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound)
 
randGen = lambda x: randint(0, higherBound) if x == 'Positive' else -1
 
udfRandGen = udf(randGen, IntegerType())
trainingData = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
df_subsample = trainingData.filter(trainingData['randIndex'] < TRESHOLD_TO_FILTER)
df_subsample = df_subsample.drop('randIndex')
 
print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", df_subsample.groupBy("label").count().take(3))

Distribution of Pos and Neg cases of the down-sampled training data are: 
 [Row(label=1.0, count=144014), Row(label=0.0, count=287482)]

### Test down-sampling result - slightly improved

In [None]:
## training and prediction
rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(df_subsample)
transformed = fit.transform(testData)

In [None]:
## results and evaluation
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])
 
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

The ROC score is (@numTrees=200):  0.6463328674547113

### Ensemble of down-samplings: 
### 1) down-sampling N times. 
### 2) each time, train our model and make prediction on test data
### 3) take the average of all N predictions on test!

In [None]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
 
RATIO_ADJUST = 3.0 ## ratio of pos to neg in the df_subsample
TOTAL_MODELS = 10
total_results = None
final_result = None
 
#counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
highestBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound)
## UDF
randGen = lambda x: randint(0, highestBound) if x == 'Positive' else -1
udfRandGen = udf(randGen, IntegerType())
 
## ensembling
for N in range(TOTAL_MODELS):
    print("Round: ", N)
    trainingDataIndexed = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
    df_subsample = trainingDataIndexed.filter(trainingDataIndexed['randIndex'] < TRESHOLD_TO_FILTER).drop('randIndex')
    ## training and prediction
    rf = RF(labelCol='label', featuresCol='features',numTrees=200)
    fit = rf.fit(df_subsample)
    transformed = fit.transform(testData)
    result_pair = transformed.select(['probability', 'label'])
    result_pair = result_pair.collect()
    this_result = np.array([float(i[0][1]) for i in result_pair])
    this_result = list(this_result.argsort().argsort() / (float(len(this_result) + 1)))
 
    ## sum up all the predictions, and average to get final_result
    if total_results is None:
       total_results = this_result
    else:
       total_results = [i+j for i, j in zip(this_result, total_results)]
    final_result = [i/(N+1) for i in total_results]
 
    results_list = [(float(i), float(j[1])) for i, j in zip(final_result, result_pair)]
    scoreAndLabels = sc.parallelize(results_list)
 
    metrics = metric(scoreAndLabels)
 print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

Round:  0
The ROC score is (@numTrees=200):  0.6456296366007628
Round:  1
The ROC score is (@numTrees=200):  0.6475210701955153
Round:  2
The ROC score is (@numTrees=200):  0.6488169677072237
Round:  3
The ROC score is (@numTrees=200):  0.6490333812262444
Round:  4
The ROC score is (@numTrees=200):  0.6490997896881725
Round:  5
The ROC score is (@numTrees=200):  0.648347665785477
Round:  6
The ROC score is (@numTrees=200):  0.6486544723987375
Round:  7
The ROC score is (@numTrees=200):  0.6492410064530146
Round:  8
The ROC score is (@numTrees=200):  0.6493154941849306
Round:  9
The ROC score is (@numTrees=200):  0.6483560027574977