In [1]:
%matplotlib inline
import numpy as np
import random, math
from sklearn.datasets import load_digits
import matplotlib.pyplot as plt
from matplotlib.pylab import cm
from IPython.html.widgets import interact



The following is the code I've developed so far. There's some issue with the backpropagation step, and I have no idea what it is. I'm making some progress, though it's slow and arduous. The test at the bottom is indicative of what the issue is. For a more detailed description of this issue, see [the question I asked on StackOverflow](http://stackoverflow.com/questions/30491307/why-does-this-backpropagation-implementation-).

In the meantime, I've littered my code with a smattering of hopefully helpful comments. It's a little messy, and a little bit out of sensible order, but I've made an effort to de-spagghetify it for submission. O, for want of a refactor tool...

I should also note up front that much of my work here has been based on [this online text](http://neuralnetworksanddeeplearning.com/chap1.html) on neural networks and backpropagation. The longer I've worked on this, and the more I've referred to the text, the more similar my code has become to the example text online. I apologize for this, and intend to retool this the way I would write it with my understanding once it is functional.

In [2]:
# Sigmoid and cost function derivatives
def sigmoid_transfer(x):
    return 1.0/(1.0+np.exp(-x))
sigmoid_transfer = np.vectorize(sigmoid_transfer)

def sigmoid_transfer_deriv(x):
    return sigmoid_transfer(x)*(1-sigmoid_transfer(x))
sigmoid_transfer_deriv = np.vectorize(sigmoid_transfer_deriv)

def linear_transfer(x):
    return x;
linear_transfer = np.vectorize(linear_transfer)

def linear_transfer_deriv(x):
    return 1;
linear_transfer_deriv = np.vectorize(linear_transfer_deriv)

def cost_MSE_deriv(outputs, cost):
    return outputs-cost
cost_MSE_deriv = np.vectorize(cost_MSE_deriv)

transfer = sigmoid_transfer; transfer_deriv = sigmoid_transfer_deriv; cost_deriv = cost_MSE_deriv;
def nnet_setup(node_layout, transferp=sigmoid_transfer, 
               transfer_derivp=sigmoid_transfer_deriv, cost_derivp=cost_MSE_deriv):
    global transfer; global transfer_deriv; global cost_deriv;
    transfer = transferp; transfer_deriv = transfer_derivp; cost_deriv = cost_derivp;
    
    weights = []; biases = []
    for i in range(1, len(node_layout)):
        weights.append(np.random.randn(node_layout[i], node_layout[i-1]))
        biases.append(np.random.randn(node_layout[i], 1))
    return np.array(weights), np.array(biases)

# Basic forward propagation
def nnet_prop(weights, biases, inputs):
    assert(len(np.array(inputs).shape) == 2)
    for w, b in zip(weights, biases):
        inputs = transfer(np.dot(w, inputs) + b)
    return inputs

In [51]:
# The following functions split data into randomized subsets

# Splits the data set into a randomized training and test set
# with format [([image_matrix], exp_value), ([image_matrix], exp_value)...]
def split_set(dataset, point):
    training_temp = list(zip(dataset.images[:point], dataset.target[:point]))
    test_temp = list(zip(dataset.images[point:], dataset.target[point:]))
    random.shuffle(training_temp)
    random.shuffle(test_temp)
    training_set, training_sols = zip(*training_temp)
    test_set, test_sols = zip(*test_temp)
    return list(zip(training_set, training_sols)), list(zip(test_set, test_sols))

# Splits training data into batches with format
# [[([image_mat], exp_value), ([image_mat], exp_value), ...], ...]
def split_to_batch(trainset, size):
    return [trainset[n*size:(n+1)*size] for n in range(0, math.floor(len(trainset)/size))]

# Converts an image matrix into a column vector
def conv_to_col(vec, scale=1):
    vec = np.array(vec)
    assert(len(vec.shape) == 2)
    return np.rot90([vec.reshape((vec.shape[0] * vec.shape[1]))])/scale

# Rotates a list into a numpy column vector
def rotate_list(vec):
    return np.rot90([np.array(vec).reshape((len(vec)))])

# Creates a target training vector 
# [0, 0, 0, 0, 0, 0....] + e_pos (e being basis vector)
def create_tgt_vec(pos, length=10):
    tmp = np.zeros(length)
    tmp[pos] = 1
    return np.rot90([tmp])

def read_outp_vec(vec):
    maxInd = 0;
    for i in range(1, len(vec)+1):
        if vec[-i][0] > vec[-maxInd][0]: maxInd = i
    if maxInd == 0: maxInd = len(vec)
    return maxInd - 1

digits = load_digits()
# For debugging purposes, this loads the training set
# with one batch and one test image in that batch
train_set, test_set = split_set(digits, 400)
train_set = split_to_batch(train_set, 4)

In [52]:
def backpropagate(inp, outp, wts, bias, outp_length=10):
    del_w = [np.zeros(shape=wt.shape) for wt in wts]
    del_b = [np.zeros(shape=bt.shape) for bt in bias]
    
    next_input = conv_to_col(inp)
    outp = create_tgt_vec(outp, length=outp_length)
    
    pre_trans = []; post_trans = []
    for w, b in zip(wts, bias):
        next_input = np.dot(w, next_input) + b
        pre_trans.append(next_input)
        next_input = transfer(next_input)
        post_trans.append(next_input)
    
    #print("O: ", outp)
    delta = cost_deriv(post_trans[-1], outp) * transfer_deriv(pre_trans[-1])
    del_b[-1] = delta
    del_w[-1] = np.dot(delta, post_trans[-2].transpose())
    
    for i in range(2, len(wts)):
        pre_tr_vec = pre_trans[-i]
        tr_deriv = transfer_deriv(pre_tr_vec)
        delta = np.dot(wts[-i+1].transpose(), delta) * tr_deriv
        del_b[-i] = delta
        del_w[-i] = np.dot(delta, post_trans[-i-1].transpose())
    
    return del_w, del_b

def SGD(train_set, wts, bias, eta, backprop_fn=backpropagate, outp_length=10, decay_rate=0.0):
    training_size = 0
    if len(train_set) == 0: training_size = 1
    else: training_size = len(train_set[0])

    learning_coef = eta / training_size
    
    for next_set in train_set:
        sum_del_w = [np.zeros(w.shape) for w in wts]
        sum_del_b = [np.zeros(b.shape) for b in bias]
        
        for inp, outp in next_set:
            next_del_w, next_del_b = backprop_fn(inp, outp, wts, bias, outp_length=outp_length);
            sum_del_w = [nw + dw for nw, dw in zip(next_del_w, sum_del_w)];
            sum_del_b = [nb + db for nb, db in zip(next_del_b, sum_del_b)];
        
        wts  = [wt - learning_coef * (dw + decay_rate * wt) for wt, dw in zip(wts, sum_del_w)]
        bias = [bt - learning_coef * (db + decay_rate * bt) for bt, db in zip(bias, sum_del_b)]
    return wts, bias

In [None]:
def evaluate(wts, bias, test_set):
    correct = 0; total = 0;
    for i in test_set:
        out = read_outp_vec(nnet_prop(wts, bias, conv_to_col(i[0])))
        total = total + 1
        if out == i[1]: correct = correct + 1
    return correct, total, float(correct)/float(total)

def average_network(wts1, bias1, wts2, bias2):
    return 0.5 * (wts1 + wts2), 0.5 * (bias1 + bias2)

def evaluateN(networks, test_set):
    correct = 0; total = 0;
    for i in test_set:
        out = np.zeros(shape=create_tgt_vec(0).shape)
        sumEff = 0
        for wts, bias, efA in networks:
            out = out + efA * nnet_prop(wts, bias, conv_to_col(i[0]))
            sumEff += efA
        out = out / sumEff
        
        if read_outp_vec(out) == i[1]: correct = correct + 1
        total = total + 1
    return correct, total, float(correct)/float(total)

networks = []
train_set, test_set = split_set(digits, 1250)
for i in range(0, 10):
    wts, bias = nnet_setup([64, 30, 20, 10])

    wts_maxA = wts; bias_maxA = bias; effectivenessA = evaluate(wts, bias, test_set)[2]
    for j in range(0, 20):
        random.shuffle(train_set)
        train_set_split = split_to_batch(train_set, 2)
        wts, bias = SGD(train_set_split, wts, bias, 1.3, decay_rate=0.0003)

        new_effect = evaluate(wts, bias, test_set)[2]
        if new_effect > effectivenessA:
            effectivenessA = new_effect
            wts_maxA = wts; bias_maxA = bias
    
    print(i, evaluate(wts_maxA, bias_maxA, test_set))
    networks.append((wts_maxA, bias_maxA, effectivenessA))

print("Final average eval:")
print(evaluateN(networks, test_set))
print("Done")

0 (364, 546, 0.6666666666666666)
1 (424, 546, 0.7765567765567766)
2 (394, 546, 0.7216117216117216)
3 (372, 546, 0.6813186813186813)
4

In [77]:
# Unit Tests

# Sigmoid transfer
assert(np.allclose(sigmoid_transfer([-0.5, 0, 1]), [0.3775, 0.5, 0.7311], 0.01))
assert(sigmoid_transfer(-100) >= 0 and sigmoid_transfer(100) <= 1)
assert(np.allclose(sigmoid_transfer_deriv([-0.5, 0, 1]), [0.235, 0.25, 0.1966], 0.001))

# Linear transfer
assert(np.allclose(linear_transfer([1, 100, -1000]), [1, 100, -1000], 0))
assert(np.allclose(linear_transfer_deriv([1, 100, -1000]), [1, 1, 1], 0))

# MSE cost function
assert(np.allclose(cost_MSE_deriv([1, 0], [0, -1]), [1, 1], 0))

# NNet setup shaping
assert(nnet_setup([1, 2, 1])[0][0].shape == (2, 1))
assert(nnet_setup([1, 2, 1])[0][1].shape == (1, 2))
assert(nnet_setup([1, 2, 1])[1][0].shape == (2, 1))
assert(nnet_setup([1, 2, 1])[1][1].shape == (1, 1))

# Basic feedforward testing
wts, bias = nnet_setup([1, 2, 1], linear_transfer, linear_transfer_deriv)
for i in wts: 
    for j in i: j.fill(1)
for i in bias: 
    for j in i: j.fill(0)
out = nnet_prop(wts, bias, [[1]])
assert(len(out.shape) == 2)
assert(out[0][0] == 2)

for i in wts:
    for j in i: j.fill(1)
for i in bias:
    for j in i: j.fill(-1)

assert(nnet_prop(wts, bias, [[1]]) == -1)

# Batch creation testing
digits = load_digits()
train_set1, test_set1 = split_set(digits, 10)
train_set2, test_set2 = split_set(digits, 10)

assert(len(train_set1) == 10)
assert(len(test_set1) == len(digits.images) - len(train_set1))
assert(type(test_set1[0]) == tuple)
assert(not(np.allclose(test_set1[0][0], test_set2[0][0], 0)))

batch_1 = split_to_batch(train_set1, 2)
batch_2 = split_to_batch(train_set2, 2)
assert(len(batch_1) == 5)
assert(len(batch_1[0]) == 2)
assert(len(batch_1[0][0]) == 2)
assert(batch_1[0][0][0].shape == (8, 8))

image_1 = batch_1[0][0][0]
assert(np.allclose(conv_to_col([[1, 2], [3, 4]]), [[4],[3],[2],[1]], 0))
assert(conv_to_col(image_1).shape == (64, 1))

# Output reads correctly
assert(read_outp_vec(create_tgt_vec(5)) == 5)
assert(read_outp_vec(create_tgt_vec(0)) == 0)
assert(read_outp_vec(create_tgt_vec(9)) == 9)

# SGD testing
def mock_backprop(inp, outp, wts, bias, outp_length=10):
    return wts, bias

wts, bias = nnet_setup([1, 2, 1])
for i in wts:
    for j in i:
        j.fill(1)
for i in bias:
    i.fill(1)

for i in range(0, 1000):
    wts, bias = SGD([[(1, 1)]], wts, bias, 0.1, backprop_fn=mock_backprop)
for i in wts:
    for j in i:
        assert(np.allclose(j, 0, 0.001))
for i in bias:
    assert(np.allclose(i, 0, 0.001))