In [1]:
!pip install pyspark



In [2]:
!pip install tabulate



In [3]:
from pyspark import SparkContext
from tabulate import tabulate
import math
import numpy as np

In [4]:
sc = SparkContext("local[*]", "First App")

In [17]:
text_file = sc.textFile("/home/alumno/ML-serialized-parallelized/Datasets/spam.data")
records_Xy = text_file.map(lambda x: x.split(" "))\
            .map(lambda x: [float(z) for z in x])\
            .map(lambda x: (np.array(x[:57]), x[57]))

In [18]:
def normalize(records_Xy):
  records_count = float(records_Xy.count())
  records_mean = records_Xy.map(lambda record: record[0]).reduce(lambda x, y: x+y)/records_count
  records_std = records_Xy.map(lambda record: np.power(record[0]-records_mean,2))\
                          .reduce(lambda record_1, record_2: record_1+record_2)
  return records_Xy.map(lambda record: ((record[0]-records_mean)/np.sqrt(records_std/records_count),record[1]))

In [19]:
def train_test_splitter(records_Xy, train_ratio):
  if train_ratio>0.9:
    raise Exception('''Sorry, you are setting very small ratio for test set and it's not acceptable
                    please choose a train_ratio less than or equal 0.9''')
  records_Xy = records_Xy.repartition(int(train_ratio*20))
  record_index = records_Xy.zipWithIndex()
  splitting_index = int(records_Xy.count() * train_ratio)
  train_set = record_index.filter(lambda record: record[1] <= splitting_index).map(lambda record: record[0])
  test_set = record_index.filter(lambda record: record[1] > splitting_index).map(lambda record: record[0])
  return train_set, test_set

In [20]:
def sigmoid(records_X1y, Wb):
    # returns records_X1y_yhat
    return records_X1y.map(lambda record: (record[0], record[1], record[0].dot(Wb)))\
                      .map(lambda record: (record[0], record[1], 1 / (1 + np.exp(-record[2]) )))

In [21]:
def cost_function(records_X1y_yhat, Wb, lambda_reg):
  epsilon = 1e-5
  records_count = records_X1y_yhat.count()
  cost = records_X1y_yhat.map(lambda record: record[1]*math.log(record[2] + epsilon) + (1-record[1])*math.log(1-record[2] + epsilon))\
                        .reduce(lambda record_1, record_2: record_1+record_2)
  return (-1/records_count)*cost + lambda_reg/(2*records_count)*np.sum(np.power(Wb,2))

In [22]:
def train(record_Xy, iterations, learning_rate, lambda_reg, print_cost_per_iteration = False):
  records_count = record_Xy.count()
  features_counts = len(record_Xy.take(1)[0][0])
  np.random.seed(123)
  Wb = np.random.random([features_counts+1,])
  loss = []
  records_X1y = record_Xy.map(lambda record: (np.append(record[0], 1), record[1]))
  for i in range(iterations):
    records_X1y_yhat = sigmoid(records_X1y, Wb)
    dw_init = records_X1y_yhat.map(lambda record: record[0]*(record[2]-record[1]))\
                         .reduce(lambda record_1, record_2 : np.subtract(record_1, record_2))
    dw = np.add(dw_init, lambda_reg*Wb)/records_count
    Wb = np.subtract(Wb, learning_rate*dw)
    cost = cost_function(records_X1y_yhat, Wb, lambda_reg)
    loss.append(cost)
    if print_cost_per_iteration:
      print(f'Iteratoin {i} : cost is : {cost}')
    
  return Wb

In [23]:
def predict(record_Xy, Wb):
  records_X1y = record_Xy.map(lambda record: (np.append(record[0], 1), record[1]))
  return sigmoid(records_X1y, Wb).map(lambda record :  (record[1], 1 if record[2] > 0.5 else 0))

In [31]:
def accuracy(record_Xy, Wb):
    def MapToBinaryState(record):
      if record[0] == 0 and record[1] == 0:
          return ('true_negative', 1)
      elif record[0] == 1 and record[1] == 0:
        return ('false_negative', 1)
      elif record[0] == 1 and record[1] == 1:
        return ('true_positive', 1)
      else:
        return ('false_positive', 1)
    records_count = record_Xy.count()
    record_y_yhat = predict(record_Xy, Wb)
    binary_states = record_y_yhat.map(MapToBinaryState).reduceByKey(lambda record_1, record_2: record_1+record_2).collect()
    binary_state_dictionary = dict(binary_states)
    precision = binary_state_dictionary.get('true_positive',0) / (binary_state_dictionary.get('true_positive',0) + binary_state_dictionary.get('false_positive',0)
    recall =  binary_state_dictionary.get('true_positive',0) / (binary_state_dictionary.get('false_negative',0) + binary_state_dictionary.get('true_positive',0))
    accuracy = (binary_state_dictionary.get('true_positive',0) + binary_state_dictionary.get('true_negative',0))/records_count
    table = [
              ["Precision", precision],
              ["Recall", recall],
              ["Accuracy", accuracy]
            ]
    print(tabulate(table))
    return accuracy


SyntaxError: invalid syntax (<ipython-input-31-d83e17137a36>, line 16)

In [25]:
import os
os.getcwd()

'/home/alumno/ML-serialized-parallelized'

In [29]:
records_Xy = normalize(records_Xy)
train_set, test_set = train_test_splitter(records_Xy, 0.8)
Wb = train(train_set, 50, 0.5, 10)
accuracy(test_set, Wb)

KeyboardInterrupt: 

In [27]:
records_Xy = normalize(records_Xy)
cross_validation_accuray = []

def BlockHandler(block):
  block_rdd = sc.parallelize(block)
  train_set, test_set = train_test_splitter(block_rdd, 0.8)
  Wb = train(train_set, 50, 0.5, 10)
  cross_validation_accuray.append(accuracy(test_set, Wb))


glomed_records_Xy = records_Xy.repartition(10).glom().collect()
for record in glomed_records_Xy:
  BlockHandler(record)
# acc.take(1)

# print(glomed_records_Xy.take(1))
# accuracy = records_Xy.mapPartitions(BlockHandler)

---------  --------
Precision  0.761905
Recall     0.8
Accuracy   0.898876
---------  --------
---------  --------
Precision  0.8
Recall     0.8
Accuracy   0.912088
---------  --------


KeyError: 'false_negative'