In [1]:
import csv
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

In [2]:
# start Spark Session
from pyspark.sql import SparkSession
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel, LabeledPoint, RandomForest

app_name = "w261_final_training"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [3]:
rawRDD = spark.read.csv("data/sample.txt", header=False, sep="\t").rdd
dataRDD = rawRDD.map(lambda row: ([None if el is None else int(el) for el in row[1:14]] + list(row[14:]), int(row[0])))

In [4]:
# load the data we saved from the EDA. This helps us engineer the features and configure the model

frequent_feats = {}

with open("data/freq_category_counts.csv") as csvfile:
    for row in csv.DictReader(csvfile):
        total = int(row["total"])
        if total >= 10:
            key = "{}-{}".format(row["col_name"], row["category"])
            frequent_feats[key] = int(row["category_id"])

with open("data/num_significant_categories.csv") as csvfile:
    num_significant_categories = { row["field"]: int(row["count"]) for row in csv.DictReader(csvfile) }

In [5]:
# take a row of features and the label
# leave features 0-13 the same (already integers), covert and hash features 
def hashTrick(features, label, N):
    newFeatures = list(features[0:13])
    for i in range(14, len(features)):
        if features[i] == None:
            newFeatures += list([None])
        else:
            newFeatures += list([int(features[i], 16) % 2**N])
    yield (newFeatures, label)

In [6]:
dataRDD = dataRDD.flatMap(lambda r: hashTrick(r[0], r[1], 16)).cache()

# Ensembles of Trees

The numerical columns are pretty much used directly. Note that NULLs are encoded with the value -10 (also tried 0 and imputing with the medians). Categorical values are kept if they are in the common categories from the EDA. They were assigned an integer ID, which is in the frequent_feats dict. This is used to encode each value. All rare values are converted to a special ID. NULLs are converted to yet another special ID. Note that this adds 2 additional categories from the ones we picked from the EDA.

In [7]:
def to_labeled(pair):
    """transform input data into the features"""
    row, label = pair
    # collect the converted values here
    vector = []
    
    for i, val in enumerate(row):
        # if this is an numerical column
        if i < 13:
            if val is None:
                val = -10
        # if this is categorical
        else:
            if val is not None:
                key = "C{}-{}".format(i - 13, val)
                # if its one of our "common" values
                if key in frequent_feats:
                    # look up its ID
                    val = frequent_feats[key]
                else:
                    # give it the special value for RARE
                    val = num_significant_categories["C" + str(i - 13)]
            else:
                # give it the special value for NULL
                val = num_significant_categories["C" + str(i - 13)] + 1
        vector.append(val)
    return LabeledPoint(label, vector)

def resample(pair):
    """sample the positive examples twice to increase their importance"""
    if pair.label == 1:
        return [pair, pair]
    else:
        return [pair]

labeledRDD = dataRDD.map(to_labeled)

# set model params
categoricalFeaturesInfo = { int(feat[1:]) + 13: count + 2 for feat, count in num_significant_categories.items() }
maxBins = max(num_significant_categories.values()) + 2
trainingData, validationData = labeledRDD.randomSplit([0.9, 0.1])
# re-samples the positive class
#trainingData = trainingData.flatMap(resample)

labels = validationData.map(lambda lp: lp.label).collect()

In [20]:
def trainGBT(trainingData, maxBins, maxDepth, numIterations):

    model_gbdt = GradientBoostedTrees.trainClassifier(trainingData,
                                                      categoricalFeaturesInfo={},
                                                      maxBins=maxBins,
                                                      maxDepth=8,
                                                      numIterations=10) # how many trees

    # Evaluate model on test instances and compute validation accuracy
    predictions_gbdt = model_gbdt.predict(validationData.map(lambda x: x.features))
    #labelsAndPredictions = validationData.map(lambda lp: lp.label).zip(predictions)
    #testErr = labelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count() / float(validationData.count())

    preds_gbdt = predictions_gbdt.collect()
    accuracy_gbdt = np.mean(np.array(labels) == np.array(preds_gbdt))
    print('accuracy = ' + str(accuracy_gbdt))

    model_name = "models/gbdt-model"
    # save the model
    !rm -rf /media/notebooks/{model_name}
    model_gbdt.save(sc, model_name)

    # how to load the model again, tho not necessary in this file
    #sameModel = GradientBoostedTreesModel.load(sc, model_name)
    #print(sameModel.toDebugString())
    
    return (labels, preds_gbdt)

In [22]:
# loop through various combinations of tree depth, bins, and iterations
# this will help us hone in on which parameters to use for our final model
for td in range(3,12,3):
    for b in range(10, maxBins,5):
        for i in range(5,12,4):
            print("Depth = " + str(td) + ", Max Bins = " + str(b) + ", Iterations = " + str(i))
            (labels, preds_gbdt) = trainGBT(trainingData, b, td, i)
            print(classification_report(labels, preds_gbdt))
            print(confusion_matrix(labels, preds_gbdt))
            print("")

Depth = 3, Max Bins = 10, Iterations = 5
accuracy = 0.7691511387163561
             precision    recall  f1-score   support

        0.0       0.81      0.92      0.86       745
        1.0       0.49      0.26      0.34       221

avg / total       0.73      0.77      0.74       966

[[686  59]
 [164  57]]

Depth = 3, Max Bins = 10, Iterations = 9
accuracy = 0.7691511387163561
             precision    recall  f1-score   support

        0.0       0.81      0.92      0.86       745
        1.0       0.49      0.26      0.34       221

avg / total       0.73      0.77      0.74       966

[[686  59]
 [164  57]]

Depth = 3, Max Bins = 15, Iterations = 5
accuracy = 0.7484472049689441
             precision    recall  f1-score   support

        0.0       0.79      0.92      0.85       745
        1.0       0.38      0.15      0.22       221

avg / total       0.69      0.75      0.71       966

[[689  56]
 [187  34]]

Depth = 3, Max Bins = 15, Iterations = 9
accuracy = 0.7484472049689441

In [11]:
def trainRF(trainingData, maxBins, numClasses, maxDepth, numTrees):
    model_rf = RandomForest.trainClassifier(trainingData,
                                        categoricalFeaturesInfo={},
                                        maxBins=maxBins,
                                        numClasses=numClasses,
                                        maxDepth=maxDepth,
                                        numTrees=numTrees)

    # Evaluate model on test instances and compute validation accuracy
    predictions_rf = model_rf.predict(validationData.map(lambda x: x.features))

    preds_rf = predictions_rf.collect()
    accuracy_rf = np.mean(np.array(labels) == np.array(preds_rf))
    print('accuracy = ' + str(accuracy_rf))

    model_name = "models/rf-model"
    # save the model
    !rm -rf /media/notebooks/{model_name}
    model_rf.save(sc, model_name)
    
    return (labels, preds_rf)

#trainRF(trainingData, maxBins, 2, 15, 10)

In [None]:
for b in range(maxBins, maxBins+1):
    for d in range(5,26,3):
        for t in range(10,51,5):
            print("Max Bins = " + str(b) + ", Max Depth = " + str(d) + ", Num Trees = " + str(t))
            (labels, preds_rf) = trainRF(trainingData, b, 2, d, t)
            print(classification_report(labels, preds_rf))
            print(confusion_matrix(labels, preds_rf))
            print("")

Max Bins = 52, Max Depth = 5, Num Trees = 10
accuracy = 0.7727272727272727
             precision    recall  f1-score   support

        0.0       0.78      0.99      0.87       800
        1.0       0.47      0.03      0.06       234

avg / total       0.71      0.77      0.69      1034

[[792   8]
 [227   7]]

Max Bins = 52, Max Depth = 5, Num Trees = 15
accuracy = 0.7775628626692457
             precision    recall  f1-score   support

        0.0       0.78      0.99      0.87       800
        1.0       0.60      0.05      0.09       234

avg / total       0.74      0.78      0.70      1034

[[792   8]
 [222  12]]

Max Bins = 52, Max Depth = 5, Num Trees = 20
accuracy = 0.7727272727272727
             precision    recall  f1-score   support

        0.0       0.78      0.99      0.87       800
        1.0       0.47      0.03      0.06       234

avg / total       0.71      0.77      0.69      1034

[[791   9]
 [226   8]]

Max Bins = 52, Max Depth = 5, Num Trees = 25
accuracy = 0.

# Using Full Data

In [None]:
fullRDD = spark.read.csv("data/train.txt", header=False, sep="\t").rdd
fullDataRDD = rawRDD.map(lambda row: ([None if el is None else int(el) for el in row[1:14]] + list(row[14:]), int(row[0]))) \
                    .flatMap(lambda r: hashTrick(r[0], r[1], 16)) \
                    .cache()

In [None]:
fullLabeledRDD = fullDataRDD.map(to_labeled)

# set model params
# TODO: need this for full data
categoricalFeaturesInfo = { int(feat[1:]) + 13: count + 2 for feat, count in num_significant_categories.items() }
maxBins = max(num_significant_categories.values()) + 2
fullTrainingData, fullValidationData = labeledRDD.randomSplit([0.9, 0.1])
# re-samples the positive class
#trainingData = trainingData.flatMap(resample)

labels = fullValidationData.map(lambda lp: lp.label).collect()

# Test Against Test Dataset

# CONCEPTS NOTES

bias variance tradeoff / model complexity / regularization
One Hot Encoding / vector embeddings / feature selection
assumptions (for different algorithms - for example OLS vs Trees)
