# Data parallelism: Exercise

For this exercise we will be build upon last week's vanilla gradient descent example. Included in the next codebox are functions to perform feedforward and backprop on a single minibatch. The computeMinibatchGradientsTuple() function is the same as the computeMinibatchGradients() function, but its inputs in a single tuple will make using Python's ThreadPool easier later on.

You don’t need to do modify this first block of code. 

If you do not have scikit-learn then you can get it here: https://scikit-learn.org/stable/install.html

In [1]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import time

# In order to run this in class, we're going to reduce the dataset by a factor of 5
X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
X = X[::5]
y = y.astype(int)[::5]
X, X_test, y, y_test = train_test_split(X, y)

# Here we specify the size of our neural network.
# We are mapping from 784 to 10 with 256 hiden layer nodes.

m = len(X)
n_0 = 784
n_1 = 256
N = 10


# Function to convert categorical labels into one-hot matrix.
def convert_to_one_hot(y, n_classes):
    T = np.zeros((y.shape[0], n_classes))
    for t, yy in zip(T, y):
        t[yy] = 1
    return T


# Convert the data to one hot notation
one_hot_y_actual = convert_to_one_hot(y, N)
one_hot_y_test = convert_to_one_hot(y_test, N)


# Sigmoid function (activation)
def sigmoid(a):
    return 1. / (1 + np.exp(-a))


# Softmax function (final layer for classification)
def softmax(A):
    numerator = np.exp(A)
    denominator = numerator.sum(axis=1)
    return numerator / denominator[:, np.newaxis]


# Categorical cross-entropy
def L(T, S, W1, W2, alpha_1=1e-2, alpha_2=1e-5):
    return -1. / len(T) * np.sum(T * np.log(S)) + np.sum(0.5 * alpha_1 * W1 ** 2) + np.sum(0.5 * alpha_2 * W2 ** 2)


# Run the neural network forward, given some weights and biases
def feedforward(X, W1, W2, b1, b2):
    # Feedforward
    A1 = X @ W1 + b1
    Z1 = sigmoid(A1)
    A2 = Z1 @ W2 + b2
    y_pred = softmax(A2)
    return y_pred, Z1


# Compute the neural network gradients using backpropagation
def backpropogate(y_pred, Z1, X, y_obs, alpha_1=1e-2, alpha_2=1e-5):
    # Backpropogate
    delta_2 = (1. / len(y_pred)) * (y_pred - y_obs)
    grad_W2 = Z1.T @ delta_2 + alpha_2 * W2
    grad_b2 = delta_2.sum(axis=0)

    delta_1 = delta_2 @ W2.T * Z1 * (1 - Z1)
    grad_W1 = X.T @ delta_1 + alpha_1 * W1
    grad_b1 = delta_1.sum(axis=0)
    return grad_W1, grad_W2, grad_b1, grad_b2


def mini_batch(x_sample, y_sample, start_batch_size):
    """
    Takes a copy of x_sample and y_sample and returns mini batch matrices of both and number of batches
    """

    # Batches must divide evenly into total number of samples for numpy arrays to be happy.
    # Gets number of bathes by finding next smallest number that evenly divides
    num_batches = start_batch_size
    while len(x_sample) % num_batches != 0:
        num_batches -= 1

    # randomly shuffle indices
    np.random.seed(42)
    random_indices = np.random.choice(range(len(x_sample)), len(x_sample), replace=False)

    # instantiate lists to hold batches
    x_list = [[] for i in range(num_batches)]
    y_list = [[] for i in range(num_batches)]

    # populate batches matrix with random mini batch indices
    for i in range(len(x_sample)):

        x_list[i // 105].append(x_sample[random_indices[i]])
        y_list[i // 105].append(y_sample[random_indices[i]])
    
    # Convert to numpy arrays
    x_batch = np.array(x_list)
    y_batch = np.array(y_list)

    return x_batch, y_batch, num_batches, num_batches


#computes the gradients of a single minibatch
def computeMinibatchGradients(W1, W2, b1, b2, x_batch, y_batch):
    y_pred, Z1 = feedforward(x_batch, W1, W2, b1, b2)
    """
    These are your gradients with respect to weight matrices W1 and W2 
    as well as your biases b1 and b2
    """
    grad_W1, grad_W2, grad_b1, grad_b2 = backpropogate(y_pred, Z1, x_batch, y_batch)
    
    return grad_W1, grad_W2, grad_b1, grad_b2

#computes the gradients of a single minibatch
def computeMinibatchGradientsTuple(inputTuple):
    W1, W2, b1, b2, x_batch, y_batch = inputTuple
    y_pred, Z1 = feedforward(x_batch, W1, W2, b1, b2)
    """
    These are your gradients with respect to weight matrices W1 and W2 
    as well as your biases b1 and b2
    """
    grad_W1, grad_W2, grad_b1, grad_b2 = backpropogate(y_pred, Z1, x_batch, y_batch)
    
    return grad_W1, grad_W2, grad_b1, grad_b2


# Vanilla Gradient Descent

This next codebox should look familiar; it performs vanilla gradient descent. You don't need to change this codebox, either. Run this, and notice that it now also prints out the time taken to evaluate each epoch. We'll use these times to evaluate how much of a speedup data parallelism will give us in a simple multithreading environment.

In [2]:
"""
Vanilla Gradient Descent
"""
import os
os.environ['MKL_NUM_THREADS'] = '1'

# Hyper Parameters
eta = 1e-3
initial_batch_size = 104
epochs = 250

# Initialize random parameter matrices
np.random.seed(42)
W1 = 0.001 * np.random.randn(n_0, n_1)
W2 = 0.001 * np.random.randn(n_1, N)

b1 = 0.1 * np.random.randn(1, n_1)
b2 = 0.1 * np.random.randn(1, N)

# data for analysis
vanilla_loss = []


# Perform gradient descent
for i in range(epochs):
    epochStartTime = time.time()
    
    # generate mini batches
    x_batches, y_batches, num_batches, actual_batch_size = mini_batch(X, one_hot_y_actual, initial_batch_size)

    # perform gradient descent on mini batches
    for j in range(0, num_batches):
        
        grad_W1, grad_W2, grad_b1, grad_b2 = computeMinibatchGradients(W1, W2, b1, b2, x_batches[j], y_batches[j])
        '''
        use the gradients to update weights and biases
        '''
        W1 -= eta * grad_W1
        W2 -= eta * grad_W2
        b1 -= eta * grad_b1
        b2 -= eta * grad_b2


    # calc loss at end of each epoch
    y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
    vanilla_loss.append(L(one_hot_y_actual, y_entire_pred, W1, W2))
    
    #find the time taken to compute the epoch
    epochTimeTaken = (time.time() - epochStartTime)*1000

    # Print some summary statistics every ten iterations
    if i % 10 == 0:
        y_pred_test, Z1_test = feedforward(X_test, W1, W2, b1, b2)
        acc = sum(y_test == np.argmax(y_pred_test, axis=1)) / len(y_test)
        y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
        print("Epoch %d Loss %f Accuracy %f time taken %d ms" % (i, L(one_hot_y_actual, y_entire_pred, W1, W2), acc, epochTimeTaken))



Epoch 0 Loss 2.165225 Accuracy 0.646000 time taken 576 ms
Epoch 10 Loss 0.777892 Accuracy 0.880286 time taken 512 ms
Epoch 20 Loss 0.462656 Accuracy 0.908000 time taken 505 ms
Epoch 30 Loss 0.340586 Accuracy 0.914857 time taken 503 ms
Epoch 40 Loss 0.274967 Accuracy 0.920286 time taken 497 ms
Epoch 50 Loss 0.233327 Accuracy 0.922286 time taken 502 ms
Epoch 60 Loss 0.204176 Accuracy 0.924286 time taken 499 ms
Epoch 70 Loss 0.182608 Accuracy 0.925714 time taken 503 ms
Epoch 80 Loss 0.165866 Accuracy 0.927429 time taken 518 ms
Epoch 90 Loss 0.152414 Accuracy 0.928286 time taken 517 ms
Epoch 100 Loss 0.141393 Accuracy 0.928857 time taken 528 ms
Epoch 110 Loss 0.132171 Accuracy 0.929714 time taken 521 ms
Epoch 120 Loss 0.124389 Accuracy 0.930857 time taken 524 ms
Epoch 130 Loss 0.117737 Accuracy 0.930571 time taken 494 ms
Epoch 140 Loss 0.111933 Accuracy 0.931143 time taken 490 ms
Epoch 150 Loss 0.106800 Accuracy 0.931429 time taken 513 ms
Epoch 160 Loss 0.102224 Accuracy 0.931429 time take

# Updating Vanilla Gradient Descent with Data Parallelism

Now that we have some baseline timings, we're going to be updating this example to employ data parallelism. The Ben-Nun et.al. paper mainly focuses on parallelism in a distributed computing environment, but using a library like MPI for distributed parallelism would be well outside the scope of these assignments, so we're going to using Python's multiprocessing package to perform data parallelism with a ThreadPool

First, read the documentation on python's Pool class, located here:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing

We're going to be using the Pool's faster (and less documented) cousin, the ThreadPool. The Pool and ThreadPool have the same interface, but while Pool uses a single thread, trading it between the pool's workers, the ThreadPool actually spins up multiple instances of the Python interpreter in different threads to perform true parallel computation.

The next codeblock uses the ThreadPool's map function to give each process a different minibatch in parallel. 


1.
On line 60, use the ThreadPool's map function to parallelize the gradient calculation for each of the parallel batches.

2.
We will need to average the gradients returned from each parallel batch in order to perform gradient descent, but the thread pool returns a list of the list of each batch's gradients. To make averaging the gradients easier, line 58 uses the zip function to make a new list such that the first element in the list contains all the W1 gradients, the second element contains all the W2 gradients, etc. On lines 59-62, use the np.mean function to average all W1, W2, b1, and b2 gradients, and use those averages to update W1, W2, b1, and b2.


In [6]:
"""
Vanilla Gradient Descent with Data Parallelism
"""

#import the ThreadPool
from multiprocessing.pool import ThreadPool


# Hyper Parameters
eta = 1e-3
initial_batch_size = 104
epochs = 250

#add additional hyperparameters related to the data parallelism
threads_in_pool = 5
parallel_batches = 25

#create the thread pool
pool = ThreadPool(processes=threads_in_pool) 


# Initialize random parameter matrices
np.random.seed(42)
W1 = 0.001 * np.random.randn(n_0, n_1)
W2 = 0.001 * np.random.randn(n_1, N)

b1 = 0.1 * np.random.randn(1, n_1)
b2 = 0.1 * np.random.randn(1, N)

# data for analysis
vanilla_loss = []

# Perform gradient descent
for i in range(epochs):
    epochStartTime = time.time()
    
    # generate mini batches
    x_batches, y_batches, num_batches, actual_batch_size = mini_batch(X, one_hot_y_actual, initial_batch_size)
    
    
    # perform gradient descent on mini batches
    for j in range(0, num_batches, parallel_batches):
        
        #create the list of inputs for the pool threads
        #this might look weird, but by creating a list of tuples, the input data can be easily given to
        #each worker thread in the ThreadPool
        minibatchGradientInputLists = []
        for k in range(parallel_batches):
            minibatchGradientInputLists.append((W1, W2, b1, b2, x_batches[j+k], y_batches[j+k]))
        
        #TODO: use the ThreadPool's map function to compute minibatch gradients in parallel.
        gradientOutputs = pool.map(computeMinibatchGradientsTuple, minibatchGradientInputLists)
        
        
        '''
        use the gradients to update weights and biases
        '''
        gradients = list(zip(*gradientOutputs))
        W1 -= eta * np.mean(gradients[0], axis=0) 
        W2 -= eta * np.mean(gradients[1], axis=0) 
        b1 -= eta * np.mean(gradients[2], axis=0) 
        b2 -= eta * np.mean(gradients[3], axis=0) 

    # calc loss at end of each epoch
    y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
    vanilla_loss.append(L(one_hot_y_actual, y_entire_pred, W1, W2))
    
    #find the time taken to compute the epoch
    epochTimeTaken = (time.time() - epochStartTime) * 1000

    # Print some summary statistics every ten iterations
    if i % 10 == 0:
        y_pred_test, Z1_test = feedforward(X_test, W1, W2, b1, b2)
        acc = sum(y_test == np.argmax(y_pred_test, axis=1)) / len(y_test)
        y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
        print("Epoch %d Loss %f Accuracy %f time taken %d ms" % (i, L(one_hot_y_actual, y_entire_pred, W1, W2), acc, epochTimeTaken))

#kill the pool so it doesn't hang around without getting garbage collected
pool.terminate()

Epoch 0 Loss 2.303491 Accuracy 0.103429 time taken 374 ms
Epoch 10 Loss 2.261419 Accuracy 0.244571 time taken 368 ms
Epoch 20 Loss 2.197020 Accuracy 0.611429 time taken 398 ms
Epoch 30 Loss 2.113055 Accuracy 0.652000 time taken 381 ms
Epoch 40 Loss 2.019295 Accuracy 0.649143 time taken 393 ms
Epoch 50 Loss 1.923187 Accuracy 0.650286 time taken 406 ms
Epoch 60 Loss 1.828900 Accuracy 0.662000 time taken 413 ms
Epoch 70 Loss 1.738727 Accuracy 0.676571 time taken 402 ms
Epoch 80 Loss 1.653625 Accuracy 0.689143 time taken 392 ms
Epoch 90 Loss 1.573866 Accuracy 0.707429 time taken 417 ms
Epoch 100 Loss 1.499376 Accuracy 0.722857 time taken 393 ms
Epoch 110 Loss 1.429932 Accuracy 0.736286 time taken 420 ms
Epoch 120 Loss 1.365248 Accuracy 0.752857 time taken 372 ms
Epoch 130 Loss 1.305017 Accuracy 0.766571 time taken 381 ms
Epoch 140 Loss 1.248922 Accuracy 0.783429 time taken 399 ms
Epoch 150 Loss 1.196653 Accuracy 0.797429 time taken 361 ms
Epoch 160 Loss 1.147921 Accuracy 0.812286 time take

# Performance assessment and questions

Now that your data parallel implementation is finished, play around with the threads_in_pool and parallel_batches hyperparameters, and answer the following questions.

1. How does the speed of the data parallel implementation compare to the non-parallelized version.


2. Adjusting the threads_in_pool and parallel_batches hyperparameters, where do you see the most improvement in speed? When does increasing these hyperparameters stop making the computation faster?


3. Section 3 of the paper discusses Generalization in the context of statistical accuracy. How does the generalization issue relate to the parallel_batches hyperparameter?


4. Using a library like mpi4py, we could take the local, thread-parallel approach and do it in a true distributed environment. If the computeMinibatchGradients function was being run on different processors in a distributed system, what data would you have to send to the processors for each minibatch? What information would these distributed processors need to send back?


5. As we discussed in class on Tuesday, model parallelism involves splitting up a network between processors such that different portions of the same layer might be computed on different processors. Knowing that the example network is comprised of two full-connected layers, what changes would you have to make to the code to be able to employ model parallelism. (Note, actually doing this would be an enormous amount of work, but think critically about which parts of the network would need to be rewritten to achieve model parallelism.) 


6. Pipeline parallelism involves splitting up a network between processors such that each processor is responsible for one or more contiguous operators. How might you change the example to perform pipeline-parallelism? Would this be easier to implement than model parallelism, or harder?


7. If a pipeline-parallel network such as the one from the previous question was implemented, how would data quantization help improve performance in a distributed environment?

# Answers
1. Now that the numpy threads are restricted, it's a teensy bit faster. 


2. Threads_in_pool is best at a small number (4 or 5) while increasing parallel_batches to a large multiple of that number gives a performance increase - 20 or 25 seems to be fastest. (However, the accuracy drops slightly)


3. A larger number of parallel batches helps performance (speed), but because the weights/biases are not updating as much, it incurs a cost in accuracy. (This bears out in my results above.) 


4. It requires sending essentially all the data - weights, biases, & x and y batches. (Hopefully in one package!) After using this data to compute the feed forward and backpropogation gradients, then should be sent out again to whatever part of the distributed environment will compute the new weights and biases from that collection of data.


5. Because model parallelism means splitting the layers between processors, the multiplication steps (in feed forward and backpropogation) would need to be modified to accept and compute only sections of that computation. 


6. Pipeline parallelism would be easier, because it can be directly parallelized - you don't have to change the code to parallelize it, the feed forward and backpropogation computations would just be done on separate processors. i.e., compute A1, send to next processor, compute Z1, send forward, etc all the way through the feed forward function. 


7. Data quantization could reduce (compress) the amount of data being sent from processor to processor as full, large outputs are pushed down the "line-up" of processors. 