## Connect to Spark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Random Forest") \
    .getOrCreate()

## Read CSV into data frame

In [None]:
df = spark.read.format("csv").option("inferschema", "true") \
  .option("header", "true").load("../sample10k.csv") 
df.show(5)

## Build the ML Pipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
stages = [] # stages in Pipeline

# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="target", outputCol="label")
stages += [label_stringIdx]

# Transform all features into a vector using VectorAssembler
numericCols = ['f1','f2','f3','f4','f5','f6','f7','f8','f9','f10','f11','f12']
assemblerInputs = numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
dataPipeline = Pipeline().setStages(stages)
pipelineModel = dataPipeline.fit(df)
modelingDF = pipelineModel.transform(df)
modelingDF.select('id','label','features').show(10)

## Split data into train / test

In [None]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = modelingDF.randomSplit([0.6, 0.4], seed=100)
print(trainingData.count())
print(testData.count())


## Train the RF Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=500, maxDepth=5)

# Train model with Training Data
rfModel = rf.fit(trainingData)

## Feature Importances

In [None]:
print("Col","\t","Relative Importance")
for i in range(0,len(numericCols)):
    print(numericCols[i],"\t",rfModel.featureImportances[i])


##  AUC on holdout data

In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

## ROC

In [None]:
import sklearn.metrics as metrics
# calculate the fpr and tpr for all thresholds of the classification
#probs = model.predict_proba(X_test)
#preds = predictions[:,1]
y_true = predictions.select("target").toPandas()
preds = predictions.select("probability").toPandas()
fpr, tpr, threshold = metrics.roc_curve(y_true, preds['probability'].str[-1])
roc_auc = metrics.auc(fpr, tpr)



In [None]:
import matplotlib.pyplot as plt
plt.title('Receiver Operating Characteristic')
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.legend(loc = 'lower right')
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()

## Confusion Matrix

In [None]:
#predictions.head()
from sklearn.metrics import confusion_matrix
y_true = predictions.select("target")
y_true = y_true.toPandas()
y_pred = predictions.select("prediction")
y_pred = y_pred.toPandas()
cnf_matrix = confusion_matrix(y_true, y_pred)

import matplotlib.pyplot as plt
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# Plot non-normalized confusion matrix
plt.figure()
class_names=['no', 'yes']
plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix, without normalization')
plt.show()