In [35]:
import math
import random
import pyspark
from itertools import chain
import datetime
import os
import time
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

global k_fold
global n_row
global y_h_m_y
global n_columns
global w
global OUTPUT
global ITERATIONS
global THRESHOLD
global averages 
global col_sums
global sigmas
global b

DEBUG = True
TRAIN_TEST = 0.8
k_fold=3
ITERATIONS=40
THRESHOLD = 0.5

DATASET="dataset/spam.data"
n_columns=56 #total number of columns (58) - the one deleted (57) - label (56)

OUTPUT="out/"

def touch(path):
    with open(path, 'w+'):
        os.utime(path, None)
        
def initializeAccumulators():
    global averages
    global col_sums
    global sigmas
    global maxFinder
    global minFinder
    i=0
    averages=[]
    col_sums=[]
    sigmas=[]
    while(i<n_columns):
        averages.append(sc.accumulator(0))
        col_sums.append(sc.accumulator(0))
        sigmas.append(sc.accumulator(0))
        maxFinder.append(sc.accumulator(0)) 
        minFinder.append(sc.accumulator(0)) 
        i+=1

def addToAccumulators(row):
    global col_sums
    if(len(row)!=len(col_sums)):
        raise Exception("Number of columns in the row doesn't mach the number of accumulators initiated. Len row: "+str(len(row))+" n_accomulators: "+str(len(col_sums)))
    i=0
    while(i<n_columns):
        col_sums[i].add(row[i])
        i+=1
        
def maxMinFinder(row):
    global maxFinder
    global minFinder
    if(len(row)!=len(col_sums)):
        raise Exception("Number of columns in the row doesn't mach the number of accumulators initiated. Len row: "+str(len(row))+" n_accomulators: "+str(len(col_sums)))
    i=0
    while(i<n_columns):
        if(row[i]>maxFinder[i].value):
            maxFinder[i]=sc.accumulator(row[i])
        if(row[i]<minFinder[i].value):
            minFinder[i]=sc.accumulator(row[i])
        i+=1

    
def doNormalize(x):
    i=0
    while(i<n_columns):
        row[2][i]=(row[2][i]-minFinder[i])/(maxFinder[i]-minFinder[i])
        i+=1
    return row


def preprocessing(row):
    split=row.split(" ")       #splitting
    label = int(split[57])
    del split[57] #label
    del split[56] #col 57th
    split=[float(col) for col in split]
    #addToAccumulators(split)   
    maxMinFinder(split)
    #assign random key to the datapoint
    key=random.getrandbits(64)
    #assign train/test 
    if(random.random()<TRAIN_TEST):
        train=1
    else:
        train=0    
    return (key,train,split,label)

def calcAvg(n_row):
    global averages
    i=0
    while(i<n_columns):
        averages[i]=col_sums[i].value/n_row

        i+=1
        
def calcResiduals(row):
    global sigmas
    i=0
    while(i<n_columns):       
        sigmas[i].add(math.pow(row[2][i]-averages[i],2))
        i+=1
    return row

def calcSigmas(n_row):
    global sigmas
    i=0
    while(i<n_columns):
        #sigmas[i]=math.sqrt(sigmas[i].value/float(n_row-1))
        sigmas[i]=sigmas[i].value/float(n_row)
        i+=1

def normalize(row):
    i=0
    while(i<n_columns):
        row[2][i]=(row[2][i]-averages[i])/sigmas[i]
        i+=1
    return row

def initializeWeights(random_init=False):
    if(random_init):
        #return sc.parallelize([(i, random.random()) for i in range(0,n_columns)])
        return [random.random() for i in range(0,n_columns)]
    else:
        #return sc.parallelize([(i, 0.0) for i in range(0,n_columns)])
        return [0.0 for i in range(0,n_columns)]

def initializeBias():
    return sc.parallelize(0.0)

def sigmoid(z):
    return 1/(1+math.exp(-z))

def predict(w,b,X):
    return sigmoid(sum([X[i]*w[i] for i in range(len(w))])+b)

def classify_prediction(pred_probability):
    if (pred_probability >= THRESHOLD):
        return 1
    return 0

def predict_parallel(w,b,X):
    X = sc.parallelize(X).zipWithIndex().map(lambda x: (x[1],x[0]))     #Change X representation
    wX_plus_b=X.join(w).map(lambda x: (x[1][0]*x[1][1])+b).sum()
    return sigmoid(wX_plus_b)

def compute_cost(dataset,w,b,lambda_reg, print_stats=False):
    cost = (-1/dataset.count())*dataset.map(lambda x: x[3]*math.log(predict(w,b,x[2])) \
                            +(1-x[3])*math.log(1-predict(w,b,x[2]))).sum() \
                            + lambda_reg/(2*dataset.count())*sum([i*i for i in w]);
    if (print_stats):
        stats = {"TP":0, "TN":0, "FP":0, "FN":0}
        statsMapping = dataset.map(lambda x: 2*(classify_prediction(predict(w, b, x[2])) - x[3]) + x[3])  #this function maps all 4 possible variations to an integer, from -1 to 3
        stats["TP"] = statsMapping.filter(lambda x: x == 1).count()
        stats["TN"] = statsMapping.filter(lambda x: x == 2).count()
        stats["FP"] = statsMapping.filter(lambda x: x == -1).count()
        stats["FN"] = statsMapping.filter(lambda x: x == 0).count()
        precision = stats["TP"]/(stats["TP"] + stats["FP"])
        recall = stats["TP"]/(stats["TP"] + stats["FN"])
        accuracy = (stats["TP"] + stats["TN"])/(stats["TP"] + stats["TN"] + stats["FP"] + stats["FN"])
        print("Precision: " + str(precision))
        print("Recall: " + str(recall))
        print("F1 score: " + str(2*(precision*recall)/(precision + recall)))
        print("Accuracy: " + str(accuracy))
    return cost

def gradientDescent(iterations,train,w,number_samples,lambda_reg,learning_rate,b,logCosts=False):
    if(logCosts):
        gd_cost_log = open(OUTPUT+"gradient_descent.csv","w+")
        gd_cost_log.write("iteration,cost\r\n")
        gd_cost_log.close()
    costs=[]
    for iteration in range(iterations):
        w,b = gradientDescentIteration(train,w,number_samples,lambda_reg,learning_rate,b)
        cost=compute_cost(train,w,b,lambda_reg)
        costs.append(cost)
        if(logCosts):
            gd_cost_log = open(OUTPUT+"gradient_descent.csv","a+")            
            gd_cost_log.write(str(iteration+1)+","+str(cost)+"\r\n")
            gd_cost_log.close()
        print("-> Iteration done: "+str(iteration+1)+" of "+str(iterations)+". Cost: "+str(cost))
    return w,b,costs
    
def gradientDescentIteration(train,w,number_samples,lambda_reg,learning_rate,b):
    dw=[i for i in range(0,n_columns)]
    j = 0
    for j in range(n_columns):
        X_j=train.map(lambda x: (predict(w,b,x[2])-x[3])*x[2][j]).sum()
        dw[j]=(1/number_samples)*X_j+(lambda_reg/number_samples)+w[j]
        w[j]-=learning_rate*dw[j]
    b-=learning_rate*(1/number_samples)*train.map(lambda x: predict(w,b,x[2])-x[3]).sum()
    return w,b

def kFoldsCV(k_fold,iterations,train,lambda_reg,learning_rate):
    fold_length=train.count()/k_fold
    train_errors_fold=[]
    test_errors_fold = []
    for i_fold in range(k_fold):
        w=initializeWeights()
        starting_fold=fold_length*i_fold
        end_fold=starting_fold+fold_length
        test_fold=train.zipWithIndex().filter(lambda t: (t[1]>=starting_fold and t[1]<end_fold)).map(lambda t: t[0]) #the map get rid of the index again
        train_fold=train.zipWithIndex().filter(lambda t: t[1]<starting_fold or t[1]>=end_fold).map(lambda t: t[0])
        train_fold_size=train_fold.count()
        b=0
        #Gradient descent
        w, b, train_errors = gradientDescent(iterations,train_fold,w,train_fold_size,lambda_reg,learning_rate,b)
        train_errors_fold.append(train_errors)
        train_errors_flattened =list(chain.from_iterable(train_errors_fold))
        test_error = compute_cost(test_fold, w, b, lambda_reg)
        test_errors_fold.append(test_error)
        if(DEBUG):
            print("--> Fold #"+str(i_fold+1)+" of "+str(k_fold)+" is done. Train error: " \
                  +str(sum(train_errors_flattened)/((i_fold + 1) * iterations)) \
                  + " Test error: " + str(sum(test_errors_fold)/(i_fold+ 1)))
    return w,b,train_errors_fold, test_errors_fold

def trainTestSplit(dataset):
    #dataset=dataset.map(normalize)
    dataset=dataset.map(doNormalize)
    train=dataset.filter(lambda x: x[1]==1)
    train_size=train.count()
    test=dataset.filter(lambda x: x[1]==0)
    return train, test

def train(filename, iterations, learning_rate, lambda_reg, cv=False):
    global w
    global b
    w = []
    initializeAccumulators()
    w = initializeWeights()
    dataset=sc.textFile(filename).map(preprocessing).take(2)#.sortBy(lambda x: x[0])
    print(dataset)
    
    

    n_row=dataset.count()
    calcAvg(n_row)
    gg = dataset.map(calcResiduals).collect()
    calcSigmas(n_row)
    
    train, test = trainTestSplit(dataset)
    print("Split train/test done. Train contains "+str(train.count())+" elements, Test contains "+str(test.count())+" elements")
    if (cv):
        w, b, train_errors_fold, test_errors_fold = kFoldsCV(k_fold,iterations,train,lambda_reg,learning_rate)
        train_errors_fold =list(chain.from_iterable(train_errors_fold))
        average_train_error = sum(train_errors_fold)/(iterations*k_fold)
        average_test_error = sum(test_errors_fold)/k_fold
        print("---> "+str(k_fold)+"-fold validation done.  Train error: " \
            +str(average_train_error) \
            + " Test error: " + str((average_test_error)))
        return average_train_error, average_test_error
    else:                                                  #Testing the parameters and weights on the actual test set
        w,b, train_errors = gradientDescent(iterations, train, w, train.count(), lambda_reg, learning_rate, b,True)
        wBest = open(OUTPUT+"w_bestModel.csv","w+")
        wBest.write(str(w))
        wBest.close()
        average_train_error = sum(train_errors)/iterations
        average_test_error = compute_cost(test, w, b, lambda_reg, True)
        print("---> Model done. Train error: " \
              +str(average_train_error) \
              + " Test error: " + str((average_test_error)))
        return average_train_error, average_test_error
                    
workers = range(8,9) #8 cores
working_time = open(OUTPUT+"working_times.csv","w+")
working_time.write("workers,grid_time,best_model_time,total\r\n")
working_time.close()
grid_log = open(OUTPUT+"grid.csv","w+")
grid_log.write("learning_rate,lambda_red,train_error,test_error\r\n")
grid_log.close()
for worker in workers:
    global b
    working_time = open(OUTPUT+"working_times.csv","a+")
    print("****Worker: "+str(worker)+"****\n")
    sc = pyspark.SparkContext.getOrCreate()
    #sc.stop()
    #conf = (SparkConf().set("spark.cores.max", str(worker)))
    #sc = SparkContext(conf=conf)
    b = 0
    col_sums=[]
    averages=[]
    sigmas=[]
    minFinder=[]
    maxFinder=[]
    w=[]
    workers_executioTime_grid=time.time()
    if (False):
        grid=[]
        for i in range(1,5):
                for j in range(1,14,3):
                    grid+=[(i*0.09,j*0.028)]

        #grid=[(0.01,)] #(learning_rate,lambda_reg)
        #grid=list(chain.from_iterable(grid))
        grid_results = []
        for par in grid:
            print("|---- Starting training for parameters (learning_rate,lambda_reg) = "+str(par)+" ----|")
            result=(par, train(DATASET,ITERATIONS,par[0],par[1], True))
            grid_results.append(result)
            grid_log = open(OUTPUT+"grid.csv","a+")
            grid_log.write(str(result[0][0])+","+str(result[0][1])+","+str(result[1][0])+","+str(result[1][1])+"\r\n")
            grid_log.close()
        workers_executioTime_grid=time.time()-workers_executioTime_grid
        workers_executioTime_bestTraining= time.time()
        best_par = sorted(grid_results, key=lambda x: x[1])[0]
    best_par = (0.1, 0.1)
    print("\n--------------------------------------------------")
    print("Best parameters: " + str(best_par))
    smallest_error = train(DATASET,ITERATIONS,best_par[0],best_par[1])
    print("Best performance: " + str(smallest_error))
    print("--------------------------------------------------")
    
    #workers_executioTime_bestTraining=time.time()-workers_executioTime_bestTraining
    #working_time.write(str(worker)+","+str(workers_executioTime_grid)+","+str(workers_executioTime_bestTraining)+","+str(workers_executioTime_grid+workers_executioTime_bestTraining)+"\r\n")
    working_time.close()
    print("++++Worker "+str(worker)+" has terminated with running time: "+str(workers_executioTime_grid+workers_executioTime_bestTraining)+"++++\n")

    

****Worker: 8****

|---- Starting training for parameters (learning_rate,lambda_reg) = (0.04, 0.028) ----|


Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.3.2/libexec/python/pyspark/cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "/anaconda3/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/anaconda3/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/apache-spark/2.3.2/libexec/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/Cellar/apache-spark/2.3.2/libexec/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

In [6]:
grid = []
for i in range(1,5):
    for j in range(1,14,4):
        grid+=[(i*0.02,j*0.002)]
grid

[(0.02, 0.002),
 (0.02, 0.01),
 (0.02, 0.018000000000000002),
 (0.02, 0.026000000000000002),
 (0.04, 0.002),
 (0.04, 0.01),
 (0.04, 0.018000000000000002),
 (0.04, 0.026000000000000002),
 (0.06, 0.002),
 (0.06, 0.01),
 (0.06, 0.018000000000000002),
 (0.06, 0.026000000000000002),
 (0.08, 0.002),
 (0.08, 0.01),
 (0.08, 0.018000000000000002),
 (0.08, 0.026000000000000002)]