In [2]:
from pyspark.context import SparkContext
#from pyspark.ml.classification import LogisticRegression
'''Random Forest'''
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [3]:
sc = SparkContext.getOrCreate()

## Preprocessing the data using OneHotEncoder

In [4]:
data_path = "HR_comma_sep.csv"
dataset = spark.read.options(header="true", parserLib="univocity", inferSchema="true").csv(data_path)
cols = dataset.columns
print dataset.dtypes

[('satisfaction_level', 'double'), ('last_evaluation', 'double'), ('number_project', 'int'), ('average_montly_hours', 'int'), ('time_spend_company', 'int'), ('Work_accident', 'int'), ('left', 'int'), ('promotion_last_5years', 'int'), ('sales', 'string'), ('salary', 'string')]


In [5]:
categoricalColumns = ["sales", "salary"]
stages = []

for categoricalCol in categoricalColumns: 
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
    stages += [stringIndexer, encoder]

In [6]:
label_string_indexer = StringIndexer(inputCol = "left", outputCol = "label")
stages += [label_string_indexer]

In [7]:
numericColumns = ['number_project',
                  'average_montly_hours',
                  'time_spend_company',
                  'Work_accident', 
                  'promotion_last_5years']

assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericColumns
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

In [13]:
print dataset.columns

['satisfaction_level', 'last_evaluation', 'number_project', 'average_montly_hours', 'time_spend_company', 'Work_accident', 'left', 'promotion_last_5years', 'sales', 'salary']


In [14]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
dataset.show()

+-----+--------------------+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|label|            features|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+-----+--------------------+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|  1.0|(16,[0,9,11,12,13...|              0.38|           0.53|             2|                 157|                 3|            0|   1|                    0|sales|   low|
|  1.0|(16,[0,10,11,12,1...|               0.8|           0.86|             5|                 262|                 6|            0|   1|                    0|sales|medium|
|  1.0|(16,[0,10,11,12,1...|              0.11|           0.88|             7|                 272|                 4|            0|   

In [15]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

10567
4432


## Validation sample with random forest algorithm

In [39]:
"""lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=1000)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)
predictions.printSchema()"""
'''featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)'''
from pyspark.ml.classification import RandomForestClassifier as RF
rf = RF(labelCol="label", featuresCol="features",numTrees=3)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)
'''pipeline = Pipeline(stages=[featureIndexer, rf])

modelRF = pipeline.fit(trainingData)

predictions = modelRF.transform(testData)
predictions.printSchema()'''

'pipeline = Pipeline(stages=[featureIndexer, rf])\n\nmodelRF = pipeline.fit(trainingData)\n\npredictions = modelRF.transform(testData)\npredictions.printSchema()'

In [41]:
selected = transformed.select("label", "prediction",  "probability")
selected.show()
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=3): ", metrics.areaUnderROC)
#print("The PR score is (@numTrees=3): ", metrics.areaUnderPR)
#print trainingData.schema

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       1.0|[0.44941686609916...|
|  0.0|       1.0|[0.31152456882009...|
|  0.0|       1.0|[0.31152456882009...|
|  0.0|       1.0|[0.44941686609916...|
|  0.0|       1.0|[0.44941686609916...|
|  0.0|       1.0|[0.31152456882009...|
|  0.0|       1.0|[0.44941686609916...|
|  0.0|       1.0|[0.44941686609916...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.67870470300377...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.67870470300377...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.67870470300377...|
|  0.0|       0.0|[0.67870470300377...|
|  0.0|       0.0|[0.54081240572470...|
|  0.0|       0.0|[0.54081240572470...|
+-----+----------+--------------------+
only showing top 20 rows

('The ROC scor

In [42]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
 
RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample
 
counts = trainingData.select('label').groupBy('label').count().collect()
higherBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound)
 
randGen = lambda x: randint(0, higherBound) if x == 'Positive' else -1
 
udfRandGen = udf(randGen, IntegerType())
trainingData = trainingData.withColumn("randIndex", udfRandGen("label"))
df_subsample = trainingData.filter(trainingData['randIndex'] < TRESHOLD_TO_FILTER)
df_subsample = df_subsample.drop('randIndex')
 
print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", df_subsample.groupBy("label").count().take(3))

('Distribution of Pos and Neg cases of the down-sampled training data are: \n', [Row(label=0.0, count=8054), Row(label=1.0, count=2513)])


## Evaluating Random Forest Algorithm with cross validation

In [43]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
import numpy as np
RATIO_ADJUST = 3.0 ## ratio of pos to neg in the df_subsample
TOTAL_MODELS = 10
total_results = None
final_result = None
 
#counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
highestBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound)
## UDF
randGen = lambda x: randint(0, highestBound) if x == 'Positive' else -1
udfRandGen = udf(randGen, IntegerType())
 
## ensembling
for N in range(TOTAL_MODELS):
    print("Round: ", N)
    trainingDataIndexed = trainingData.withColumn("randIndex", udfRandGen("label"))
    df_subsample = trainingDataIndexed.filter(trainingDataIndexed['randIndex'] < TRESHOLD_TO_FILTER).drop('randIndex')
    ## training and prediction
    rf = RF(labelCol='label', featuresCol='features',numTrees=3)
    fit = rf.fit(df_subsample)
    transformed = fit.transform(testData)
    result_pair = transformed.select(['probability', 'label'])
    result_pair = result_pair.collect()
    this_result = np.array([float(i[0][1]) for i in result_pair])
    this_result = list(this_result.argsort().argsort() / (float(len(this_result) + 1)))
 
    ## sum up all the predictions, and average to get final_result
    if total_results is None:
       total_results = this_result
    else:
       total_results = [i+j for i, j in zip(this_result, total_results)]
    final_result = [i/(N+1) for i in total_results]
 
    results_list = [(float(i), float(j[1])) for i, j in zip(final_result, result_pair)]
    scoreAndLabels = sc.parallelize(results_list)
 
    metrics = metric(scoreAndLabels)
    print("The ROC score is (@numTrees=3): ", metrics.areaUnderROC)

('Round: ', 0)
('The ROC score is (@numTrees=3): ', 0.906965362838021)
('Round: ', 1)
('The ROC score is (@numTrees=3): ', 0.906965362838025)
('Round: ', 2)
('The ROC score is (@numTrees=3): ', 0.9069653628380288)
('Round: ', 3)
('The ROC score is (@numTrees=3): ', 0.906965362838027)
('Round: ', 4)
('The ROC score is (@numTrees=3): ', 0.9069653628380272)
('Round: ', 5)
('The ROC score is (@numTrees=3): ', 0.906965362838025)
('Round: ', 6)
('The ROC score is (@numTrees=3): ', 0.9069653628380273)
('Round: ', 7)
('The ROC score is (@numTrees=3): ', 0.9069653628380294)
('Round: ', 8)
('The ROC score is (@numTrees=3): ', 0.9069653628380289)
('Round: ', 9)
('The ROC score is (@numTrees=3): ', 0.906965362838025)
