In [1]:
# Set constants.
CSV_PATH = "/vagrant/data/creditcard.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 3
RF_MAX_DEPTH = 4
RF_MAX_BINS = 32

In [2]:
# Load the data set
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName(APP_NAME) \
    .master(SPARK_URL) \
    .getOrCreate()

df = spark.read \
    .options(header = "true", inferschema = "true") \
    .csv(CSV_PATH)
    
#df.printSchema()
print("Total number of rows: %d" % df.count())

Total number of rows: 284807


In [3]:
# Convert the data frame to a dense vector
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# The last column contains the classification outcome. Turn this into an RDD
# of Labeled Points.
transformed_df = df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

# Split the data into a training set and a test set.
splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)
print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

Number of training set rows: 199690
Number of test set rows: 85117


In [4]:
# Train the random forest model
from pyspark.mllib.tree import RandomForest
from time import *

start_time = time()

# Let's make sure we have the correct types.
#print("%s should be an RDD" % type(training_data))
#print("%s should be a LabeledPoint" % type(training_data.first()))

# Train our random forest model.
model = RandomForest.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="gini", \
    maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

#print("Learned classification forest model:")
#print(model.toDebugString())

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 11.066 seconds


In [5]:
# Make predictions and compute accuracy
predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)
model_accuracy = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Model accuracy: %.3f%%" % (model_accuracy * 100))

Model accuracy: 99.941%


In [6]:
# Model evaluation
from pyspark.mllib.evaluation import BinaryClassificationMetrics

start_time = time()

metrics = BinaryClassificationMetrics(labels_and_predictions)
print("Area under Precision/Recall (PR) curve: %.f" % (metrics.areaUnderPR * 100))
print("Area under Receiver Operating Characteristic (ROC) curve: %.3f" % (metrics.areaUnderROC * 100))

end_time = time()
elapsed_time = end_time - start_time
print("Time to evaluate model: %.3f seconds" % elapsed_time)

Area under Precision/Recall (PR) curve: 79
Area under Receiver Operating Characteristic (ROC) curve: 91.267
Time to evaluate model: 11.056 seconds
