In [1]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="NeuralNetwork")

In [262]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import random
import math
from operator import add
import datetime

In [327]:
def load_training_set(X,y):
#     X = np.load("C:/Users/saika/Desktop/X.npy")
#     y = np.load("C:/Users/saika/Desktop/y.npy")
    dataset = []
    for i in range(y.shape[0]):
        x_vector = X[i].reshape((-1, 1))
        y_vector = np.eye(10)[y[i]-1].reshape((-1, 1))
#         y_vector = np.eye(10)[y[i]-1].reshape((-1, 1))
        dataset.append((x_vector, y_vector))
        
    return dataset

In [331]:
def parse_data(data,labels):
    dataset = load_training_set(data,labels)
    data_set = sc.parallelize(dataset)
    weights = [.8, .2]
    seed = 8888
    train_set, validate_set = data_set.randomSplit(weights, seed)
    
    return (train_set, validate_set)
    
#     return (data_set)

In [265]:
def nn_predict(ann, inputs):
    x = inputs.reshape((-1, 1))
    a1 = np.vstack(([1.0], x))

    # hidden activations
    z2 = np.dot(ann.w1, a1)
    a2 = np.vstack(([1.0], sigmoid(z2)))

    # output activations
    z3 = np.dot(ann.w2, a2)
    a3 = sigmoid(z3)
    return a3

In [266]:
class NN:
    def __init__(self, ni, nh, no):
        # number of input, hidden, and output nodes
        self.n1 = ni + 1 # +1 for bias node
        self.n2 = nh
        self.n3 = no
        # create weights variables (the theta in model)
        self.w1 = self.weights_init(ni, nh)
        self.w2 = self.weights_init(nh, no)

        # to accumulate the gradient from all the train samples   
        self.Delta1 = np.zeros(shape=(self.n2, self.n1))  # for w1
        self.Delta2 = np.zeros(shape=(self.n3, self.n2+1))# for w2
        
    def weights_init(self, l_in, l_out):
            eps_init = 0.12
            ret = np.random.rand(l_out, 1+l_in) * 2 * eps_init - eps_init
            return ret

In [267]:
def dsigmoid(y):
    gz = sigmoid(y)
    return gz * (1.0 - gz)

def sigmoid(x):
    # exp function provided by numpy can support vector operation by default
    return 1.0 / (1.0 + np.exp(-x))

In [268]:
def ann_train_eval(w, sample):
    w1, w2 = w
    x = sample[0]
    y = sample[1]
    a1 = np.vstack(([1.0], x))

    # hidden activations
    z2 = np.dot(w1, a1)
    a2 = np.vstack(([1.0], sigmoid(z2)))

    # output activations
    z3 = np.dot(w2, a2)
    a3 = sigmoid(z3)
    
    ## start back propogation
    # calculate error terms for output
    delta3 = a3 - y
    # do not forget to skip the first column which is for bias and should not be included
    delta2 = (np.dot(w2.T, delta3))[1:] * dsigmoid(z2)
    
    # gradient from all the train samples for accumulating
    Delta1 = np.dot(delta2, a1.T)
    Delta2 = np.dot(delta3, a2.T)

    # mse for calculating train predict error, mse was used, just to show the minimization, 
    # if use other optimize method, should use cost function
    
    return np.array([Delta1, Delta2, np.mean((a3 - y)**2)])

In [269]:
def nn_train(ann, train_set, max_iter):
    Lambda = 10.01 # Regularization parameter (to avoid overfit issue)
    m = train_set.count()
    for iteration in range(max_iter):
        eval_res = train_set.map(lambda x: ann_train_eval((ann.w1, ann.w2), x))
        
        # calculate derivation of weights via average bp results
        average_eval = eval_res.reduce(add) / train_set.count()
        dw1 = average_eval[0]
        dw2 = average_eval[1]
        
        # mean error sololy for display
        mean_err = average_eval[2]
        
        # for all the weights, you should not apply regulization on the first column since they are for bias
        for i in range(ann.w1.shape[0]):
            ann.w1[i, 0] = ann.w1[i, 0] - dw1[i, 0]
        for i in range(ann.w1.shape[0]):
            for j in range(1, ann.w1.shape[1]):
                # here learn rate is 1.0, Lambda is the Regularization parameter
                ann.w1[i, j] = ann.w1[i, j] - (dw1[i, j] + (Lambda/m) * ann.w1[i, j])
        
        for i in range(ann.w2.shape[0]):
            ann.w2[i, 0] = ann.w2[i, 0] - dw2[i, 0]
        for i in range(ann.w2.shape[0]):
            for j in range(1, ann.w2.shape[1]):
                ann.w2[i, j] = ann.w2[i, j] - (dw2[i, j] + (Lambda/m) * ann.w2[i, j])
        if 0 == iteration % 50:
            print ("mean error p", mean_err)
    return ann

In [310]:
def convert(x):
    y=0
    for i in x[0:-1]:
        x[y] = float(i)
        y+=1
    x[-1] = int(x[-1])
    return x

In [320]:
trainRDD = sc.textFile("C:/Users/saika/Desktop/files/TestFiles/neural.csv").filter(lambda x:x[0]!=",").map(lambda x:x.split(",")[1:])
trainRDD = trainRDD.map(convert)

In [321]:
trainRDD.collect()

[[0.0,
  241.0,
  1.0,
  5.0,
  149.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  31.0,
  0.0,
  14.0,
  8.0,
  11.0,
  13.0,
  15.0,
  14.0,
  32.0,
  36.0,
  71.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  4.0,
  70.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  159.0,
  44.0,
  9.0,
  10.0,
  11.0,
  12.0,
  13.0,
  19.0,
  42.0,
  68.0,
  105.0,
  167.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  78.0,
  56.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  38.0,
  10.0,
  13.0,
  25.0,
  18.0,
  23.0,
  26.0,
  48.0,
  97.0,
  118.0,
  146.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0

In [313]:
X = np.load("C:/Users/saika/Desktop/X.npy")
y = np.load("C:/Users/saika/Desktop/y.npy")
print(np.shape(y))
print(len(y[0]))

(5000, 1)
1


In [323]:
data = trainRDD.map(lambda x:x[0:-1])
data = np.array(data.collect())
labels = trainRDD.map(lambda x:x[-1])
labels = np.array(list(labels.collect()))
labels = np.reshape(labels,(50,1))

In [332]:
print(np.shape(data))
print(np.shape(labels))
print(np.shape(X))
print(np.shape(y))

(50, 3012)
(50, 1)
(5000, 400)
(5000, 1)


In [343]:
train_set,validation_set = parse_data(data,labels)
# train_set, validate_set, test_set = parse_data(data,labels)
n = NN(3012, 500, 10)

In [344]:
nt = nn_train(n, train_set, 10)

mean error p 0.279620827303


In [346]:
n_set = validation_set.count()
val_res = validation_set.map(lambda x : 1 + np.argmax(nn_predict(nt, x[0]))).collect()
actual_res = validation_set.map(lambda x : 1 + np.argmax(x[1])).collect()

accurate = 0
for idx in range(n_set):
    if val_res[idx] == actual_res[idx]:
        accurate += 1
print("validation set accuracy: {0} %".format(100.0 * accurate / n_set))

validation set accuracy: 8.333333333333334 %
