First a utility script to download mnist and save it into a pickle context for later usage.

In [1]:
import numpy
from urllib import request
import gzip
import pickle
import os
import time
import sys
import hashlib
import random

filename = [
    ["training_images","train-images-idx3-ubyte.gz"],
    ["test_images","t10k-images-idx3-ubyte.gz"],
    ["training_labels","train-labels-idx1-ubyte.gz"],
    ["test_labels","t10k-labels-idx1-ubyte.gz"]
]

def download_mnist():
    base_url = "http://yann.lecun.com/exdb/mnist/"
    for name in filename:
        print("Downloading "+name[1]+"...")
        request.urlretrieve(base_url+name[1], name[1])
    print("Download complete.")

def save_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = numpy.frombuffer(f.read(), numpy.uint8, offset=16).reshape(-1,28*28)
    for name in filename[-2:]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = numpy.frombuffer(f.read(), numpy.uint8, offset=8)
    with open("mnist.pkl", 'wb') as f:
        pickle.dump(mnist,f)
    print("Save complete.")

def init():
    
    if os.path.exists("./mnist.pkl"):
        print("Pkl detected. Skipping download.")
        return
    
    download_mnist()
    save_mnist()

def load():
    with open("mnist.pkl",'rb') as f:
        mnist = pickle.load(f)
    return mnist["training_images"], mnist["training_labels"], mnist["test_images"], mnist["test_labels"]

init()

Pkl detected. Skipping download.


Define some parameters for the model

In [2]:
learning_rate = 0.05
seed = 402
epochs = 1000

training_images_count = 60000
testing_images_count = 10000
training_features_count = 784
testing_features_count = 784

numpy.random.seed(seed)

train_X, train_labels, test_X, test_labels = load()

# convert values to one-hot-encoded-vectors
train_Y = numpy.zeros((train_labels.size, train_labels.max() + 1))
train_Y[numpy.arange(train_labels.size), train_labels] = 1
test_Y = numpy.zeros((test_labels.size, test_labels.max() + 1))
test_Y[numpy.arange(test_labels.size), test_labels] = 1

scheduler_count = 7

w1 = numpy.random.randn(784, 128) * 0.01
b1 = numpy.zeros(128)
z1 = numpy.zeros(128)
a1 = numpy.zeros(128)
w2 = numpy.random.randn(128, 10) * 0.01
b2 = numpy.zeros(10)
z2 = numpy.zeros(10)
a2 = numpy.zeros(10)

In [3]:
def create_exposure_matrix(operation, a, b = None):
    
    print(operation, a.shape, b.shape if b is not None else 0)
    
    start = time.time()
    
    # dot uniquely uses different shaped a, b matrices
    # - exposures stores which scheduler is allocated to which "input pixel" position
    # - enshuffles stores the index ordering of b to shuffled prior to being passed to the applicible scheduler (exposures)
    # - unshuffles stores the index ordering of b to unshuffle/normalize the result into an aggregatable format
    
    if operation == 'dot':
        
        exposures = numpy.random.choice(scheduler_count, a.shape[1])
        numpy.random.shuffle(exposures)

        enshuffles = numpy.zeros(b.shape)
        for row in range(b.shape[0]):
            enshuffles[row] = numpy.arange(b.shape[1])
            random.shuffle(enshuffles[row])

        unshuffles = numpy.zeros(b.shape)
        for row in range(b.shape[0]):
            for col in range(b.shape[1]):
                unshuffles[row][int(enshuffles[row][col])] = col
        
        print(time.time() - start)
        
        return {
            'exposures': exposures,
            'enshuffles': enshuffles,
            'unshuffles': unshuffles
        }
    
    # all other operations use equal shaped a, b matrices
    # - exposures stores which scheduler is allocated to which position
    # - enshuffles stores the index ordering of b to shuffled prior to being passed to the applicible scheduler (exposures)
    # - unshuffles stores the index ordering of b to unshuffle/normalize the result into an aggregatable format
    
    exposures = numpy.random.choice(scheduler_count, a.shape[1])
    numpy.random.shuffle(exposures)

    enshuffles = numpy.zeros(a.shape)
    for row in range(a.shape[0]):
        enshuffles[row] = numpy.arange(a.shape[1])
        random.shuffle(enshuffles[row])

    unshuffles = numpy.zeros(a.shape)
    for row in range(a.shape[0]):
        for col in range(a.shape[1]):
            unshuffles[row][int(enshuffles[row][col])] = col

    print(time.time() - start)

    return {
        'exposures': exposures,
        'enshuffles': enshuffles,
        'unshuffles': unshuffles
    }

In [4]:
previous_weights_hash = '!'

for epoch in range(epochs):
    
    start = time.time()
    
    # layer 1 forward
    z1 = numpy.dot(train_X, w1) + b1
    a1 = 1 / (1 + numpy.exp(-z1))

    # layer 2 forward
    z2 = numpy.dot(a1, w2) + b2
    a2 = 1 / (1 + numpy.exp(-z2))

    # computing cost
    m = training_images_count
    cost_delta = a2 - train_Y
    cost_delta_squared = numpy.square(cost_delta)

    # cost is computed locally
    cost = (1 / (2 * m)) * numpy.sum(cost_delta_squared)
    cost = numpy.squeeze(cost)
    dY_hat = (-1 / m) * cost_delta

    # layer 2 backward
    a2_dz = dY_hat * a2 * (1 - a2)
    z2_dw = numpy.dot(a2_dz.T, a1)
    z2_da = numpy.dot(w2, a2_dz.T)
    # z2_db is calculated locally
    z2_db = numpy.sum(a2_dz.T)

    # layer 1 backward
    a1_dz = z2_da.T * a1 * (1 - a1)
    z1_dw = numpy.dot(a1_dz.T, train_X)
    # z1_da = numpy.dot(a1_dz, w1)
    # z1_db is calculated locally
    z1_db = numpy.sum(a1_dz.T)

    # computing training accuracy
    training_correct = 0
    for index in range(training_images_count):
        predicted = numpy.argmax(a2[index])
        actual = train_labels[index]
        if predicted == actual:
            training_correct += 1
    training_accuracy = training_correct / training_images_count

    # computing validation accuracy
    z1 = numpy.dot(test_X, w1)
    a1 = 1 / (1 + numpy.exp(-z1))
    z2 = numpy.dot(a1, w2)
    a2 = 1 / (1 + numpy.exp(-z2))
    validation_correct = 0
    for index in range(testing_images_count):
        predicted = numpy.argmax(a2[index])
        actual = test_labels[index]
        if predicted == actual:
            validation_correct += 1
    validation_accuracy = validation_correct / testing_images_count

    # save model
    # model_state = {
    #     'epoch': epoch,
    #     'seed': seed,
    #     'w1': w1,
    #     'w2': w2,
    #     'cost': cost,
    #     'training_accuracy': training_accuracy,
    #     'validation_accuracy': validation_accuracy,
    #     'dw1': z1_dw,
    #     'dw2': z2_dw,
    #     'prev': previous_weights_hash
    # }
    # weights_hash = hashlib.md5()
    # weights_hash.update(w1.tobytes())
    # weights_hash.update(w2.tobytes())
    # weights_hash = weights_hash.hexdigest()
    # model_file_path = './models/{}.pkl'.format(weights_hash)
    # with open(model_file_path, 'wb') as model_file:
    #     pickle.dump(model_state, model_file)
    # previous_weights_hash = weights_hash

    # update weights and bias
    w2 += (learning_rate * z2_dw.T)
    b2 += (learning_rate * z2_db.T)
    w1 += (learning_rate * z1_dw.T)
    b1 += (learning_rate * z1_db.T)
    
    print(time.time() - start, cost, training_accuracy, validation_accuracy)

1.7544622421264648 1.2512294217166522 0.10436666666666666 0.1082
1.6957628726959229 1.0071909087840292 0.1206 0.1262
1.6131434440612793 0.8257812796332974 0.13896666666666666 0.1437
1.623542308807373 0.6928938240955658 0.1711 0.1726
1.631908893585205 0.6085361355050166 0.20296666666666666 0.2083
1.6023461818695068 0.5569681743777688 0.22186666666666666 0.2248
1.5887415409088135 0.5248399478268465 0.22888333333333333 0.2298
1.5517597198486328 0.5040375524830192 0.2276 0.2284
1.5730583667755127 0.4900065308169594 0.22436666666666666 0.2248
1.5931992530822754 0.48020630965304084 0.22028333333333333 0.2204
1.607269525527954 0.47313005704218924 0.21701666666666666 0.2169
1.5832734107971191 0.46786485956061014 0.21591666666666667 0.2183
1.5826234817504883 0.4638461290631344 0.21635 0.2201
1.59440016746521 0.46071303728106344 0.21701666666666666 0.2227
1.5853769779205322 0.4582242046774408 0.2179 0.2245
1.5908448696136475 0.4562109619392329 0.21978333333333333 0.2247
1.5755677223205566 0.4545

In [5]:
z1_meta = create_exposure_matrix('dot', train_X, w1)
a1_meta = create_exposure_matrix('sigmoid', z1)
z2_meta = create_exposure_matrix('dot', a1, w2)
a2_meta = create_exposure_matrix('sigmoid', z2)
cost_delta_meta = create_exposure_matrix('minus', a2, train_Y)
cost_delta_squared_meta = create_exposure_matrix('square', cost_delta)    
a2_dz_meta = create_exposure_matrix('multiply', dY_hat, a2)
z2_dw_neta = create_exposure_matrix('dot', a2_dz.T, a1)
z2_da_meta = create_exposure_matrix('dot', w2, a2_dz.T)
a1_dz_meta = create_exposure_matrix('multiply', z2_da.T, a1)
z1_dw_neta = create_exposure_matrix('dot', a1_dz.T, train_X)
# z1_da_meta = create_exposure_matrix('dot', a1_dz, w1)

dot (60000, 784) (784, 128)
0.17656922340393066
sigmoid (10000, 128) 0
2.1751625537872314
dot (10000, 128) (128, 10)
0.003541707992553711
sigmoid (10000, 10) 0
0.21239757537841797
minus (10000, 10) (60000, 10)
0.21523404121398926
square (60000, 10) 0
1.3058509826660156
multiply (60000, 10) (10000, 10)
1.272111177444458
dot (10, 60000) (10000, 128)
2.198803424835205
dot (128, 10) (10, 60000)
1.0619251728057861
multiply (60000, 128) (10000, 128)
13.205965042114258
dot (128, 60000) (60000, 784)
82.69490909576416


Saving all meta inf into a compressed numpy dealeo.

In [6]:
numpy.savez_compressed('./meta-data-compressed', z1_meta = z1_meta, a1_meta = a1_meta, z2_meta = z2_meta, a2_meta = a2_meta, cost_delta_meta = cost_delta_meta, cost_delta_squared_meta = cost_delta_squared_meta, a2_dz_meta = a2_dz_meta, z2_dw_neta = z2_dw_neta, z2_da_meta = z2_da_meta, a1_dz_meta = a1_dz_meta, z1_dw_neta = z1_dw_neta)

Testing loading in saved meta data

In [7]:
# Left for later usage, commented for performance
# loaded = numpy.load('./meta-data-compressed.npz', allow_pickle=True)

Outputting exposures per scheduler

In [8]:
print(numpy.bincount(z1_meta['exposures']))
print(numpy.bincount(a1_meta['exposures']))
print(numpy.bincount(z2_meta['exposures']))
print(numpy.bincount(a2_meta['exposures']))
print(numpy.bincount(cost_delta_meta['exposures']))
print(numpy.bincount(cost_delta_squared_meta['exposures']))
print(numpy.bincount(a2_dz_meta['exposures']))
print(numpy.bincount(z2_dw_neta['exposures']))
print(numpy.bincount(z2_da_meta['exposures']))
print(numpy.bincount(a1_dz_meta['exposures']))
print(numpy.bincount(z1_dw_neta['exposures']))

[112 109 150 106  93  99 115]
[16 22 17 17 19 21 16]
[28 18 18 17 18 15 14]
[0 1 1 1 2 4 1]
[1 2 1 1 2 3]
[1 1 1 2 2 3]
[1 2 2 1 3 0 1]
[8575 8693 8569 8562 8456 8552 8593]
[2 0 3 3 0 2]
[21 12 18 26 16 21 14]
[8600 8561 8547 8584 8628 8665 8415]


Code to generate computations for all schedulers for i * w subcalculation of the z1 calculation

In [9]:
computations = [[] for i in range(scheduler_count)]

for row in range(train_X.shape[0]):
    for col in range(train_X.shape[1]):
        
        applicable_input = train_X[row][col]
        
        # skip computations with 0 input value
        if applicable_input == 0:
            continue
        
        applicable_scheduler = z1_meta['exposures'][col]
        applicable_weights = w1[col]
        applicable_enshuffle = z1_meta['enshuffles'][col]
        applicable_unshuffle = z1_meta['unshuffles'][col]
        applicable_shuffled_weights = applicable_weights[applicable_enshuffle.astype(int)]
        
        computation = {}
        computation['row'] = row
        computation['input_value'] = applicable_input
        computation['weights'] = applicable_shuffled_weights
        computation['unshuffle'] = applicable_unshuffle
        computations[applicable_scheduler].append(computation)

Calculating the computed result by unshuffling calculated results

In [10]:
computation_result = numpy.zeros(z1.shape)

for scheduler_index in range(scheduler_count):
    for computation_index in range(len(computations[scheduler_index])):

        computation = computations[scheduler_index][computation_index]
        
        input_value = computation['input_value']
        weights = numpy.array(computation['weights'])
        result = input_value * weights

        # this part is hidden from the client
        unshuffle = computation['unshuffle']
        unshuffled_result = result[unshuffle.astype(int)]
        row = computation['row']
        
        computation_result[row] += unshuffled_result

IndexError: index 10000 is out of bounds for axis 0 with size 10000

Now we can compare the results between centralized and distributed computations

In [None]:
centralized_result = numpy.dot(train_X, w1) + b1
distributed_result = computation_result + b1

print(centralized_result)
print(distributed_result)
print(centralized_result == distributed_result)
print(centralized_result - distributed_result)
print(numpy.all(numpy.isclose(centralized_result, distributed_result)))