In [46]:
# number of workers you have started
numPartitions = 4

# loading the text file and convert to RDD
rdd = sc.textFile("s3n://dist_computing/data/bank.csv", numPartitions)
type(rdd)

pyspark.rdd.RDD

In [47]:
# count the number of observations in the dataset
noObs = rdd.count()
print noObs

# get only 3 observations from the whole dataset
noObservations = 3
fewObs = rdd.take(noObservations)
print fewObs

for x in rdd.take(noObservations):
    print x

41188
[u'0,56,261,4.857,5191,0,1.1,93.994,-36.4', u'0,57,149,4.857,5191,0,1.1,93.994,-36.4', u'0,37,226,4.857,5191,0,1.1,93.994,-36.4']
0,56,261,4.857,5191,0,1.1,93.994,-36.4
0,57,149,4.857,5191,0,1.1,93.994,-36.4
0,37,226,4.857,5191,0,1.1,93.994,-36.4


In [48]:
# checking for correct result
assert noObs == 41188, "Something is wrong here!"
assert len(fewObs) == noObservations, "You did not extract correct number of observations"

In [49]:
import numpy as np
print np.__version__

1.9.2


In [50]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

# input: line, a single observation consisting of a string
# output: LabeledPoint instance, text converted to distinct variables, label and features

def parseTextLine(line):
    parts = line.split(',')
    label, features = parts[0], parts[1:]
    return LabeledPoint(label, features)


<type 'list'>
[57.0,149.0,4.857,5191.0,0.0,1.1,93.994,-36.4]
0.0


In [51]:
assert len(parsedLines[1].features) == 8, 'You should have 8 features!'

In [52]:
# use your parseTextLine function on every observation

parsedRDD = rdd.map(parseTextLine)
#parsedRDD.take(3)

In [53]:
# Normalizing features

# take a single row and compute the number of features in it
noFeatures = len(parsedRDD.first().features)

print noFeatures  # you should get 8

8


In [54]:
# get min and max for each feature 
featuresMin = []; featuresMax = []
for i in range(noFeatures):
    print i
    feature = parsedRDD.map(lambda lp: lp.features[i])
    featuresMin.append(feature.min())
    featuresMax.append(feature.max())

# print min and max values
print featuresMin
print featuresMax


0
1
2
3
4
5
6
7
[17.0, 0.0, 0.63400000000000001, 4963.6000000000004, 0.0, -3.3999999999999999, 92.200999999999993, -50.799999999999997]
[98.0, 4918.0, 5.0449999999999999, 5228.1000000000004, 7.0, 1.3999999999999999, 94.766999999999996, -26.899999999999999]


In [108]:
# normalizing each feature according to min and max information

# input: lp, LabeledPoint, a single observation
#        featuresMin, a list of minimum values for each feature
#        featureMax, a list of maximum values for each feature
# output: a LabeledPoint with original label, but new, transformed features


def normalize(lp, featuresMin, featuresMax):
    trnsFeatures = range(noFeatures)
    for i in range(noFeatures):
        trnsFeatures[i] = (lp.features[i] - featuresMin[i])/(featuresMax[i] - featuresMin[i])
        return LabeledPoint(lp.label, trnsFeatures)


normedRDD = parsedRDD.map(lambda lp: normalize(lp, featuresMin, featuresMax))

# should see 0 and 1 as a result here
print normedRDD.map(lambda lp: lp.features[0]).min()
print normedRDD.map(lambda lp: lp.features[0]).max()

0.0
1.0


In [56]:
# do not change the weights and the seed
weights = [.7, .15, .15]
seed = 1111

# use randomSplit with weights and seed defined above on the trainRDD
trainRDD, valRDD, testRDD = normedRDD.randomSplit(weights, seed)

In [57]:
# cache the data
trainRDD.cache()
valRDD.cache()
testRDD.cache()

# get the number of observation in each subset
noTrain = trainRDD.count()
noVal = valRDD.count()
noTest = testRDD.count()

print "Training: {0:.3f}, Validation: {1:.3f}, Test: {2:.3f}, All: {3:.3f}".format(noTrain, noVal, noTest, noTrain + noVal + noTest)


Training: 28940.000, Validation: 6121.000, Test: 6127.000, All: 41188.000


In [109]:
# compute mean over label part of LabeledPoints
import numpy
meanLabel = (trainRDD
             .map(lambda lp: lp.label)
             .mean())
print meanLabel

0.112370421562


In [110]:

# inputs: probability, float, that the observation is 1
#         threshold, float, for classifying predicted probability as 0 or 1
# output: float, predicted label, either 0 or 1

def classify(probability, threshold):
    if probability > threshold:
        return 1
    else:
        return 0

# compute the misclassification error for a single observation and its prediction,
# use classify function to convert probability into predicted labels

# input: probability, float between 0 and 1
#        predictedLabel, float, either 0 or 1, 
#        threshold, float, for classifying predicted probability as 0 or 1
# output: float, 0 if correct and 1 if incorrect

def misErrorSingle(probability, label, threshold):
    pred = classify(probability,threshold)
    if pred==label:
        return 0
    else:
        return 1
    
    
# compute mean misclassification error on RDD labPred (see example below)
# apply the function misErrorSingle on the whole RDD

# input: labPredRDD, a label prediction tuples
#        threshold, float, for classifying predicted probability as 0 or 1
# output: mean misclassification error

def misError(labPredRDD, threshold):
    misClsError = labPredRDD.map(lambda lp: misErrorSingle(lp[0], lp[1], threshold))
    return misClsError.mean()


# check on an easy example, 1 observation misclassified, 2 correct
labPredRDD_ex = sc.parallelize([(0., 1.), (0., 0.), (1., 1.)])
threshold = 0.5
misError_ex = misError(labPredRDD_ex, threshold)
print misError_ex

0.333333333333


In [62]:
assert np.allclose(misError_ex, 0.333333333333), 'incorrect value for misError_ex'

In [63]:
# set the threshold for evaluating the probabiliteis
threshold = 0.5

# for each dataset first create RDD's of tuples with meanLabel and label in each tuple   
# and then feed these RDD's into misError function
baseTrain = trainRDD.map(lambda x: (meanLabel, x.label))
baseTrain_misError = misError(baseTrain, threshold)

baseVal = valRDD.map(lambda x: (meanLabel, x.label))
baseVal_misError = misError(baseVal, threshold)

baseTest = testRDD.map(lambda x: (meanLabel, x.label))
baseTest_misError = misError(baseTest, threshold)

print 'Baseline model - Train misclassification error = {0:.3f}'.format(baseTrain_misError)
print 'Baseline model - Validation misclassification error = {0:.3f}'.format(baseVal_misError)
print 'Baseline model - Test misclassification error = {0:.3f}'.format(baseTest_misError)

Baseline model - Train misclassification error = 0.112
Baseline model - Validation misclassification error = 0.110
Baseline model - Test misclassification error = 0.117


In [64]:
from pyspark.mllib.linalg import DenseVector

In [100]:
from math import log

# input: prob, float, value between 0 and 1, predicted probability that label=1
#        lab, float, label of the observation, either 0 or 1
# output: float, negative log likelihood of a single observation

def logLossSingle(prob, lab):
    
    # if probability is too small/large add/subtract this epsilon value
    # this is because log(0) is not defined
    epsilon = 10e-12
    if np.isclose(prob, 0.0):
        prob = prob + epsilon
    elif np.isclose(prob, 1.0):
         prob = prob - epsilon
        
    # now return the log loss
    return -(lab*log(prob) + (1 - lab)*(1 - prob))

# if you get an error on any of these check your computations, especially epsilon
print logLossSingle(.5, 1)   # 0.69314718056
print logLossSingle(.99, 1)  # 0.0100503358535
print logLossSingle(.01, 1)  # 4.60517018599
print logLossSingle(0, 1)    # 25.3284360229
print logLossSingle(1, 1)    # 1.00000008275e-11


0.69314718056
0.0100503358535
4.60517018599
25.3284360229
1.00000008275e-11


In [111]:
# compute the mean log loss on the whole RDD probLab 
# input: probLab, RDD of probability label tuples
# output: float, mean log loss

def logLoss(probLab):
    # apply the function logLossSingle on the whole RDD and compute the mean
    meanLogLoss = probLab.map(lambda lp: logLossSingle(lp[0], lp[1])).mean()
    return meanLogLoss

# check it on an easy example
probLabRDD_ex = sc.parallelize([(0.5, 1.), (0.99, 0.), (0.01, 1.)])
logLoss_ex = logLoss(probLabRDD_ex)
print logLoss_ex

1.76277245552


In [112]:
import math

# define sigmoid function
# input: z, a float, result of a dot product between weights and feature values
# output: a float, dot product transformed to 0-1 range

def sigmoid(z):
    return (1 / (1 + math.exp(-z)))

print sigmoid(0)  # should produce 0.5
print sigmoid(100)  # should produce number close to 1
print sigmoid(-100)  # should produce number close to 0

0.5
1.0
3.72007597602e-44


In [87]:
# compute the gradient for a single observation
# inputs: weights, an array of regression coefficients; 
#         lp, a LabeledPoint of a single observation
# output: DenseVector, an array of values, same length as weights

def gradient(weights, lp):
    crsprod = weights.dot(lp.features)
    DenseVector = (lp.label - sigmoid(crsprod)) * lp.features
    return DenseVector

weights_ex = DenseVector([1, 2, 3, 4, 5])
lp_ex = LabeledPoint(1.0, [1, 1, 1, 1, 1])
gradient_ex = gradient(weights_ex, lp_ex)
print gradient_ex

# you should see a following vector: 
# [3.05902226994e-07,3.05902226994e-07,3.05902226994e-07,3.05902226994e-07,3.05902226994e-07]
# sigmoid should produce smth close to 1 so when it is subtracted from 1 we get smth close to 0

[3.05902226994e-07,3.05902226994e-07,3.05902226994e-07,3.05902226994e-07,3.05902226994e-07]


In [113]:

# input: weights, a numpy array, and lp, a LabeledPoint
# output: a tuple consisting of a predicted probability and a label

def probLabTuple(weights, lp):
    dotProd = weights.dot(lp.features)
    predProb = sigmoid(dotProd)
    return (predProb, lp.label)

# check it on an easy example
weights_ex = np.array([1, 2, 3, 4, 5])
data_ex = sc.parallelize([LabeledPoint(1, np.array([-1, -1, -1, 1, 1])),
                          LabeledPoint(0, np.array([-2, 2, -2, -2, 2]))])
probLab_ex = data_ex.map(lambda lp: probLabTuple(weights_ex, lp))

print probLab_ex.collect()
print logLoss(probLab_ex)

[(0.9525741268224334, 1.0), (0.11920292202211755, 0.0)]
-0.416104863202


In [107]:
# set the parameters, setting diagnostics to True will increase the time 
# with 14 workers this took about 5 min
alpha = 6
noIter = 500
diagnostics = False


# run the gradient descent on whole training set
weights_gd = trainRDD.map(lambda lp: gradientDescent(lp, noIter, alpha, diagnostics))
print "weights\n", weights_gd


weights
PythonRDD[389] at RDD at PythonRDD.scala:43


In [None]:
# threshold parameter for misclassification
threshold = 0.5

# compute the log loss and missclassification error on training 
probLabTrain = trainRDD.map(lambda lp: probLabTuple(weights_gd, lp)) 
logLossTrain_gd = logLoss(probLabTrain)
misErrorTrain_gd = misError(probLabTrain, threshold)

print logLossTrain_gd, misErrorTrain_gd

# and validation set
probLabVal = valRDD.map(lambda lp: probLabTuple(weights_gd, lp))
logLossVal_gd = logLoss(probLabVal)
misErrorVal_gd = misError(probLabVal, threshold)

print logLossVal_gd, misErrorVal_gd

In [118]:
# compare it with the logLoss of the baseline model
# training set
probLabTrain = trainRDD.map(lambda lp: (meanLabel, lp.label))                            
logLossTrain_base = logLoss(probLabTrain)

print logLossTrain_base, baseTrain_misError

# validation set
probLabVal = valRDD.map(lambda lp: (meanLabel, lp.label))
logLossVal_base = logLoss(probLabVal)

print logLossVal_base, baseVal_misError

-0.542249636521 0.112370421562
-0.549690989212 0.109949354681


In [119]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

# set parameters
noIter = 500
alpha = 60  # MLlib algorithm uses decaying learning rate, so we need to increase it
batchSize = 0.003  # defined as proportion of the whole dataset, ~100 observations
regLambda = 1e-6
regType = 'l2'
intercept = False

In [123]:
# training the model on trainRDD
model_sgd = LogisticRegressionWithSGD.train(data = trainRDD, 
                                           iterations = noIter, 
                                           step = alpha, 
                                           miniBatchFraction = batchSize, 
                                           initialWeights = None, 
                                           regParam = regLambda, 
                                           regType = regType, 
                                           intercept = intercept 
                                           )


In [124]:
# check the weights
weights_sgd = model_sgd.weights
print weights_sgd

[4.2985946267,-1.36479713136,-2.72959426271,-4.09439139407,-5.45918852543,-6.82398565679,-8.18878278814,-9.5535799195]


In [125]:
# evaluate the model

# create probLabRDD and compute the log loss and missclassification error on training, using the new SGD weights
probLabTrain = trainRDD.map(lambda lp: probLabTuple(weights_sgd, lp))
logLossTrain_sgd = logLoss(probLabTrain)
misErrorTrain_sgd = misError(probLabTrain, threshold)

print logLossTrain_sgd, misErrorTrain_sgd

# for validation set, using the new SGD weights
probLabVal = valRDD.map(lambda lp: probLabTuple(weights_sgd, lp))
logLossVal_sgd = logLoss(probLabVal)
misErrorVal_sgd = misError(probLabVal, threshold)

print logLossVal_sgd, misErrorVal_sgd

1.95853745497 0.112370421562
1.89479455048 0.109949354681


In [126]:
noIter = 500
batchSize = 0.003  # defined as proportion of the whole dataset, ~100 observations
regType = 'l2'
intercept = False
modelLoss = []  # storing losses of each model

for alpha in [0.1, 1, 10, 50, 100]:
    for regLambda in [1e-6, 1e-3, 1]:
        model = LogisticRegressionWithSGD.train(data = trainRDD, 
                                           iterations = noIter, 
                                           step = alpha, 
                                           miniBatchFraction = batchSize, 
                                           initialWeights = None, 
                                           regParam = regLambda, 
                                           regType = regType, 
                                           intercept = intercept 
                                           )
        
        # evaluate the model
        probLabVal = valRDD.map(lambda lp: probLabTuple(weights_sgd, lp))  
        logLossVal = logLoss(probLabVal)
        modelLoss.append(logLossVal)
        
        # some printout in each iteration
        print 'alpha = {0:.0e}, lambda = {1}, loss = {2:.3f}'.format(alpha, regLambda, logLossVal)


alpha = 1e-01, lambda = 1e-06, loss = 1.895
alpha = 1e-01, lambda = 0.001, loss = 1.895
alpha = 1e-01, lambda = 1, loss = 1.895
alpha = 1e+00, lambda = 1e-06, loss = 1.895
alpha = 1e+00, lambda = 0.001, loss = 1.895
alpha = 1e+00, lambda = 1, loss = 1.895
alpha = 1e+01, lambda = 1e-06, loss = 1.895
alpha = 1e+01, lambda = 0.001, loss = 1.895
alpha = 1e+01, lambda = 1, loss = 1.895
alpha = 5e+01, lambda = 1e-06, loss = 1.895
alpha = 5e+01, lambda = 0.001, loss = 1.895
alpha = 5e+01, lambda = 1, loss = 1.895
alpha = 1e+02, lambda = 1e-06, loss = 1.895
alpha = 1e+02, lambda = 0.001, loss = 1.895
alpha = 1e+02, lambda = 1, loss = 1.895


In [129]:
# set parameters
noIter = 500
alpha = 50  
batchSize = 0.003  
regLambda = 1
regType = 'l2'
intercept = False

# train it on trainRDD
theModel = LogisticRegressionWithSGD.train(data = trainRDD, 
                                           iterations = noIter, 
                                           step = alpha, 
                                           miniBatchFraction = batchSize, 
                                           initialWeights = None, 
                                           regParam = regLambda, 
                                           regType = regType, 
                                           intercept = intercept 
                                           )

In [130]:
# evaluate the final model

# create probLabRDD and compute the log loss and missclassification error on training, using the new SGD weights
probLabTrain = trainRDD.map(lambda lp: probLabTuple(theModel.weights, lp))
logLossTrain_sgd = logLoss(probLabTrain)
misErrorTrain_sgd = misError(probLabTrain, threshold)

print logLossTrain_sgd, misErrorTrain_sgd

# for validation set, using the new SGD weights
probLabVal = valRDD.map(lambda lp: probLabTuple(theModel.weights, lp))
logLossVal_sgd = logLoss(probLabVal)
misErrorVal_sgd = misError(probLabVal, threshold)

print logLossVal_sgd, misErrorVal_sgd

-7.75259221021e-12 0.887629578438
-7.80101355184e-12 0.890050645319


In [131]:
# finally, assesing our final chosen model on the test set
probLabTest = testRDD.map(lambda lp: probLabTuple(theModel.weights, lp))
logLossTest_sgd = logLoss(probLabTest)
misErrorTest_sgd = misError(probLabTest, threshold)

print logLossTest_sgd, misErrorTest_sgd

-7.66606885691e-12 0.883303411131
