In [103]:
# Import necessary modules, numpy will be used for matrix and array with better performance

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import random
import math
from operator import add
import datetime
from pyspark.mllib.linalg import Vectors
#from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import BlockMatrix,IndexedRow,IndexedRowMatrix

In [2]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
def load_training_set():
    X = np.load("/home/rameshragala/Documents/a-simple-neural-network-on-spark-master/Implementation of Neural Network using Pyspark/X.npy")
    y = np.load("/home/rameshragala/Documents/a-simple-neural-network-on-spark-master/Implementation of Neural Network using Pyspark/y.npy")
    dataset = []
    for i in range(y.shape[0]):
        x_vector = X[i].reshape((-1, 1))
        y_vector = np.eye(10)[y[i]-1].reshape((-1, 1))
        dataset.append((x_vector, y_vector))
        
    return dataset

In [85]:
# parallelize data into spark nodes.
# spark context (sc) was created in another place, which comes with the virtual machine os image
def parse_data():
    dataset = load_training_set()
    data_set = sc.parallelize(dataset)
    #split it into training, validation and test sets. Use the randomSplit method
    weights = [.8, .1, .1]
    seed = 8888
    train_set, validate_set, test_set = data_set.randomSplit(weights, seed)
    
    return (train_set, validate_set, test_set)

In [86]:
# derivative of our sigmoid function, in terms of the output (i.e. y)
def dsigmoid(y):
    gz = sigmoid(y)
    return gz * (1.0 - gz)

def sigmoid(x):
    # exp function provided by numpy can support vector operation by default
    return 1.0 / (1.0 + np.exp(-x)) 


In [87]:
# the class defined here will be used to represent a aritificial neural network model, 
# different with another implementation, related opreation fuctions except wiehts initialization will not be included in class
class NN:
    def __init__(self, ni, nh, no):
        # number of input, hidden, and output nodes
        self.n1 = ni + 1 # +1 for bias node
        self.n2 = nh
        self.n3 = no
        # create weights variables (the theta in model)
        self.w1 = self.weights_init(ni, nh)
        
        
    def weights_init(self, l_in, l_out):
            eps_init = 0.12
            ret = np.random.rand(l_out, 1+l_in) * 2 * eps_init - eps_init
            return ret
        

In [88]:
# this function realize both forward and back propagation and return all Deltas 
# as well as prediction error for display.
def ann_train_eval(w, sample):
    w1  = w
    x = sample[0]
    y = sample[1]
    a1 = np.vstack(([1.0], x))

    # hidden activations
    z2 = np.dot(w1, a1)
    a2 = np.vstack(([1.0], sigmoid(z2)))
    return a2
    # output activations   
    

In [130]:
def RowToBlockMatrix(matrix):
    matrix = IndexedRowMatrix(matrix.rows.zipWithIndex().map(lambda x:IndexedRow(x[1],x[0]))).toBlockMatrix()
    return matrix

def DenseMatrixToBlockMatrix(matrix):
    matrix = matrix.toArray()
    matrix = IndexedRowMatrix(sc.parallelize([IndexedRow(i,matrix[i]) for i in range(len(matrix))])).toBlockMatrix()
    return matrix

def ShapeToDiagonalMatrix(denseVector):
    shapeMatrix = []
    n = len(denseVector)
    for i in range(n):
        l = [0]*n
        l[i] = 1/(denseVector[i])
        shapeMatrix.append(IndexedRow(i,l))
    return IndexedRowMatrix(sc.parallelize(shapeMatrix)).toBlockMatrix()


In [131]:
def MoorePenrose(h):
    #rows = sc.parallelize(h)
    mat = RowMatrix(h)
    n=mat.numCols()
    print("Num of Columns::",n)
    # Compute the top 5 singular values and corresponding singular vectors.
    svd = mat.computeSVD(n, computeU=True)
    U = svd.U       # The U factor is a RowMatrix.
    s = svd.s       # The singular values are stored in a local dense vector.
    V = svd.V       # The V factor is a local dense matrix.
    
    V = DenseMatrixToBlockMatrix(V)  # Converting Dense Matrix to Block Matrix
    U = RowToBlockMatrix(U)  # Converting RowMatrix to BlockMatrix
    Ut = U.transpose() 
    SD = ShapeToDiagonalMatrix(s)  # Shaping S to Diagonal nxn matrix  where diagonal elements are inverse of singular value
    output = V.multiply(SD).multiply(Ut)  #V*SD*Ut
    #L1 = output.toLocalMatrix().toArray()
    return(output)

In [132]:
def nn_train(ann,train_set):
    h = train_set.map(lambda x: ann_train_eval(ann.w1, x))
    #print(h.collect())
    #outputFile = "hdfs://localhost:9000/ANNSPARK/h.txt"
    #h.saveAsTextFile(outputFile)
    #a = np.array(h.collect())
    #a11 = np.squeeze(a)
    #print("TTT::",a11.shape)
    #print("A::is::",a)
    #np.savetxt('/home/rameshragala/Documents/a-simple-neural-network-on-spark-master/Implementation of Neural Network using Pyspark/h.out', a11, delimiter=',') 
    #print(a.shape)
    #B10 = np.linalg.pinv(h)
    #print(B10)
    '''
    mat = RowMatrix(h)
    svd = mat.computeSVD(5, computeU=True)
    U=svd.U
    S = svd.s
    V = svd.V
    #print("Rows:",U.numRows())
    #print("U data",U.rows.collect())
    S = 1/np.sqrt(S)
    Σ = DenseMatrix(len(S), len(S), np.diag(S).ravel("F"))
    MoorePenrose = np.dot()
    '''
    h1 = MoorePenrose(h)
    #print("Check::")
    #print(h1.toLocalMatrix().toArray())
    return(h1)

In [135]:
# load and parse data, randomly devide dataset to three parts for train, validate and test separately 
train_set, validate_set, test_set = parse_data()
#print train_set.count(), validate_set.count(), test_set.count()
#print(len(train_set).collect())

n = NN(len(train_set.take(1)[0][0]), 50, len(train_set.take(1)[0][1]))
#-----------------------------------------------------------------------------------------
print("This is the shape of vertical weight matrix ")
#print("Dimension of weight matrix is ",len(nn_train(n,train_set).collect()),"x",len(nn_train(n,train_set).collect()[0]))
nt= nn_train(n,train_set)
print(type(nt)) # error here
ntLocal = nt.toLocalMatrix().toArray()
print(ntLocal)
#print(nt.numCols())
#print(nt.numRows())
print("All::")

This is the shape of vertical weight matrix 
Num of Columns:: 51


Py4JJavaError: An error occurred while calling o3520.toLocalMatrix.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 453.0 failed 1 times, most recent failure: Lost task 0.0 in stage 453.0 (TID 906, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.mllib.linalg.distributed.BlockMatrix.toLocalMatrix(BlockMatrix.scala:313)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [134]:
# test trained neural network model on train dataset
n_set = train_set.count()
val_res = train_set.map(lambda x : 1 + np.argmax(nn_predict(nt, x[0]))).collect()
actual_res = train_set.map(lambda x : 1 + np.argmax(x[1])).collect()

accurate = 0
for idx in range(n_set):
    if val_res[idx] == actual_res[idx]:
        accurate += 1
print("train set accuracy: {0} %".format(100.0 * accurate / n_set))

Traceback (most recent call last):
  File "/opt/spark-build/spark-2.2.0/python/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/usr/lib64/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/usr/lib64/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark-build/spark-2.2.0/python/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/opt/spark-build/spark-2.2.0/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib64/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.6/pickle.py", line 736, in save_

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

In [None]:
# test trained neural network model on validation data set
n_val_set = validate_set.count()
val_res = validate_set.map(lambda x : 1 + np.argmax(nn_predict(nt, x[0]))).collect()
actual_res = validate_set.map(lambda x : 1 + np.argmax(x[1])).collect()

accurate = 0
for idx in range(n_val_set):
    if val_res[idx] == actual_res[idx]:
        accurate += 1
print("validation set accuracy: {0} %".format(100.0 * accurate / n_val_set))

In [None]:
# test trained neural network model on validation data set
n_val_set = test_set.count()

val_res = test_set.map(lambda x : 1 + np.argmax(nn_predict(nt, x[0]))).collect()
actual_res = test_set.map(lambda x : 1 + np.argmax(x[1])).collect()

accurate = 0
for idx in range(n_val_set):
    if val_res[idx] == actual_res[idx]:
        accurate += 1
print("test set accuracy: {0} %".format(100.0 * accurate / n_val_set))