In [1]:
import pyspark
import numpy as np
import pandas as pd
import time
import sys

sc = pyspark.SparkContext('local[*]')

In [2]:
sc.defaultParallelism

2

In [3]:
class DataUtil:
    def __init__(self, sc, inputPath='data/spam.data.txt', statsInputPath='data/mean_std.txt', standardize=True):
        self.inputPath = inputPath
        self.standardize = standardize
        self.mean_std = pd.read_csv(statsInputPath, delimiter=',', header=None)
        self.mean = sc.broadcast(self.mean_std.iloc[:,0].values.astype(float))
        self.std = sc.broadcast(self.mean_std.iloc[:,1].values.astype(float))
        
        
    def read(self, sc):
        if self.standardize:
            return sc.textFile(self.inputPath).map(lambda x: np.asarray(x.split(' ')).astype(float)) \
                    .map(lambda x: (x[:56], x[57])) \
                    .map(lambda x: ((x[0] - self.mean.value[:56])/self.std.value[:56], x[1]))
        else:
            return sc.textFile(self.inputPath).map(lambda x: np.asarray(x.split(' ')).astype(float)) \
                    .map(lambda x: (x[:56], x[57]))

In [4]:
dataUtil = DataUtil(sc, 'data/spam.data.txt', 'data/mean_std.txt', True)
rdd = dataUtil.read(sc)
rdd.count()

4601

In [5]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def cost_function(y, h):
    eps = sys.float_info.epsilon
    return y * np.log(h + eps) + (1 - y) * np.log(1 - h + eps)

def stats(data):
    def f(y, pred):
        if y == pred:
            return 1,0,0,0 if y == 1 else 0,1,0,0
        elif pred == 1:
            return 0,0,1,0
        else:
            return 0,0,0,1
    tp, tn, fp, fn = data.map(lambda x: f(x[2], x[4])).reduce(lambda a, b: tuple(map(sum, zip(a, b))))
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * precision * recall / (precision + recall)
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    return precision, recall, f1, accuracy

In [9]:
class ParallelLogReg():
    
    def __init__(self, data, iterations, learning_rate, lambda_reg):
        self.data = data
        self.numberObservations = self.data.count()
        self.numberFeatures = len(self.data.first()[0])
        self.iterations = iterations
        self.lr = learning_rate
        self.lambda_reg = lambda_reg
        
    def __add_intercept(self, rdd):
        return rdd.map(lambda x: (1, x[0], x[1]))
        
    def train(self, train_rdd=None, SGD=False, SGD_pct=0.5, threshold=0.5):
        eps = sys.float_info.epsilon
        
        # initialize the weights
        # w[0]: bias weight
        # w[1]: rest of the weights
        w = (0, np.zeros(self.numberFeatures))
        
        if train_rdd == None:
            data = self.__add_intercept(self.data)
        else:
            data = self.__add_intercept(train_rdd)

        # initialize prediction to rdd
        # x[0]: bias/intercept
        # x[1]: rest features
        # x[2]: true y
        # adding x[3]: predicted y
        data = data.map(lambda x: (x[0], x[1], x[2], sigmoid(w[1].dot(x[1]) + w[0] * x[0])))
        numObs = data.count()
        
        train = data.cache()
        
        for i in range(self.iterations):
            start = time.time()
            
            # sample for SGD
            if SGD:
                train = data.sample(False, SGD_pct).repartition(sc.defaultParallelism).cache()
                if i == 0:
                    numObs = train.count()
            
            # compute derivatives
            temp = train.map(lambda x: ((x[3] - x[2]) * x[0], (x[3] - x[2]) * x[1])) \
                       .reduce(lambda a,b: (a[0] + b[0], a[1] + b[1]))
            dw = (temp[0]/numObs, (temp[1]/numObs) + (self.lambda_reg/numObs) * w[1])
            
            # update weights
            w = (w[0] - self.lr * dw[0], w[1] - self.lr * dw[1])
            
            # update prediction
            train = train.map(lambda x: (x[0], x[1], x[2], sigmoid(w[1].dot(x[1]) + w[0] * x[0]))).cache()
            
            if (i%10 == 0):
                end = time.time()
                
                model_stats = self.validate(train, w, add_intercept=False)

                print("Iteration: " + str(i) + ", Total Time: " + str(end - start))
                print('Precision: {}, Recall: {}, F1: {}, Accuracy: {}'.format(model_stats['precision'], model_stats['recall'], model_stats['f1'], model_stats['accuracy']))
                print('Current loss: {}\n'.format(model_stats['loss']))
                
        model_stats = self.validate(data, w, add_intercept=False)
        
        print('Training stats:')
        print('Precision: {}, Recall: {}, F1: {}, Accuracy: {}'.format(model_stats['precision'], model_stats['recall'], model_stats['f1'], model_stats['accuracy']))
        print('Final training loss: {}\n'.format(model_stats['loss']))
            
        return {**model_stats, 'w': w}
    
    def validate(self, val_rdd, w, threshold=0.5, add_intercept=True):
        if add_intercept:
            val_rdd = self.__add_intercept(val_rdd)
        
        numObs = val_rdd.count()
        
        # update prediction
        val_rdd = val_rdd.map(lambda x: (x[0], x[1], x[2], sigmoid(w[1].dot(x[1]) + w[0] * x[0])))
        
        loss = val_rdd.map(lambda x: cost_function(x[2], x[3])) \
                    .reduce(lambda a,b: a + b)
        loss = -(1/numObs) * loss + (self.lambda_reg/(2*numObs)) * np.sum(w[1]**2)
        
        # calculate stats
        val_rdd = val_rdd.map(lambda x: (x[0], x[1], x[2], x[3], 1 if x[3] >= threshold else 0)).cache()
        precision, recall, f1, accuracy = stats(val_rdd)
        return {'loss': loss, 'precision': precision, 'recall': recall, 'f1': f1, 'accuracy': accuracy}
    
    def cross_validate(self, k, SGD=False, SGD_pct = 0.5):
        
        if k == 1:
            print("Please choose a k > 1. Or use regular train function")
            return
        
        dataWithIndex = self.data.zipWithIndex();
        
        model_stats = []
        models = []
        
        for i in range(k):
            print("Fold: " + str(i))
            
            fold = dataWithIndex.filter(lambda x: x[1] % k != i).repartition(sc.defaultParallelism) \
                                .map(lambda x: x[0])
            
            model = self.train(fold, SGD, SGD_pct)
            models.append(model)
            
            val = dataWithIndex.filter(lambda x: x[1] % k == i).repartition(sc.defaultParallelism) \
                                .map(lambda x: x[0])
            
            val_stats = self.validate(val, model['w'])
            model_stats.append(val_stats)
            print('\nValidation stats:')
            print('Precision: {}, Recall: {}, F1: {}, Accuracy: {}'.format(val_stats['precision'], val_stats['recall'], val_stats['f1'], val_stats['accuracy']))
            print('Loss: {}\n'.format(val_stats['loss']))

        avg_model_stats = {}
        for key in model_stats[0].keys():
            avg_model_stats['avg_' + key] = sum(stat[key] for stat in model_stats) / len(model_stats)

        print('Averaged validation stats:')
        print('Precision: {}, Recall: {}, F1: {}, Accuracy: {}'.format(avg_model_stats['avg_precision'], avg_model_stats['avg_recall'], avg_model_stats['avg_f1'], avg_model_stats['avg_accuracy']))
        print('Loss: {}'.format(avg_model_stats['avg_loss']))

In [10]:
logReg = ParallelLogReg(data=rdd, iterations=20, learning_rate=0.1, lambda_reg=0.1)

In [11]:
logReg.train()

Iteration: 0, Total Time: 0.6018731594085693
Precision: 0.9370725034199726, Recall: 0.9502890173410404, F1: 0.9436344851337389, Accuracy: 0.8932840686807216
Current loss: 0.6503220242271294

Iteration: 10, Total Time: 0.2959308624267578
Precision: 0.9489772466099747, Recall: 0.9429093400319708, F1: 0.9459335624284078, Accuracy: 0.8974136057378831
Current loss: 0.4581797205838391

Training stats:
Precision: 0.9527104959630911, Recall: 0.9394904458598726, F1: 0.9460542893139389, Accuracy: 0.8976309497935231
Final training loss: 0.3978590235323393



{'loss': 0.3978590235323393,
 'precision': 0.9527104959630911,
 'recall': 0.9394904458598726,
 'f1': 0.9460542893139389,
 'accuracy': 0.8976309497935231,
 'w': (-0.16846390920849122,
  array([ 0.04161319, -0.0235643 ,  0.09330902,  0.03924626,  0.14085123,
          0.12141658,  0.20443337,  0.11736477,  0.11807889,  0.06668334,
          0.10965589, -0.01883552,  0.05827087,  0.02834613,  0.09633879,
          0.15976749,  0.14054955,  0.11042104,  0.12661014,  0.10544192,
          0.19799772,  0.07004972,  0.18937721,  0.12163123, -0.12437616,
         -0.10593532, -0.10477538, -0.05383362, -0.05292923, -0.06760501,
         -0.03932439, -0.02692089, -0.06758337, -0.02612387, -0.0566243 ,
         -0.03716163, -0.08392888, -0.02151699, -0.06108481, -0.00341057,
         -0.05012921, -0.07976456, -0.06309622, -0.05535688, -0.08693929,
         -0.08836406, -0.03047346, -0.04882305, -0.03888846, -0.0344738 ,
         -0.0294441 ,  0.14340113,  0.18873553,  0.04297038,  0.05706421,
   

In [None]:
logReg.cross_validate(2)

Fold: 0
