# Bonus1.1: Federated Averaging

### Name: Vincent Lee


## 0. You will do the following:

1. Read the lecture note: [click here](https://github.com/wangshusen/DeepLearning/blob/master/LectureNotes/Parallel/Parallel.pdf)

2. Implement **federated averaging** or decentralized optimization.

3. Plot the convergence curve. (The x-axis can be ```number of epochs``` or ```number of communication```. You must make sure the label is correct.)

4. Convert the .IPYNB file to .HTML file.

    * The HTML file must contain **the code** and **the output after execution**.
    
5. Upload this .HTML file to your Google Drive, Dropbox, or your Github repo. (If it is submitted to Google Drive or Dropbox, you must make the file open-access.)

6. Submit the link to this .HTML file to Canvas.

    * Example: https://github.com/wangshusen/CS583-2020S/blob/master/homework/Bonus1/Bonus1.html



# 1. Data processing

- Download the Diabete dataset from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/diabetes
- Load the data using sklearn.
- Preprocess the data.

## 1.1. Load the data

In [1]:
from sklearn import datasets
import numpy

x_sparse, y = datasets.load_svmlight_file('diabetes')
x = x_sparse.todense()

print('Shape of x: ' + str(x.shape))
print('Shape of y: ' + str(y.shape))

Shape of x: (768, 8)
Shape of y: (768,)


## 1.2. Partition to training and test sets

In [2]:
# partition the data to training and test sets
n = x.shape[0]
n_train = 640
n_test = n - n_train

rand_indices = numpy.random.permutation(n)
train_indices = rand_indices[0:n_train]
test_indices = rand_indices[n_train:n]

x_train = x[train_indices, :]
x_test = x[test_indices, :]
y_train = y[train_indices].reshape(n_train, 1)
y_test = y[test_indices].reshape(n_test, 1)

print('Shape of x_train: ' + str(x_train.shape))
print('Shape of x_test: ' + str(x_test.shape))
print('Shape of y_train: ' + str(y_train.shape))
print('Shape of y_test: ' + str(y_test.shape))

Shape of x_train: (640, 8)
Shape of x_test: (128, 8)
Shape of y_train: (640, 1)
Shape of y_test: (128, 1)


## 1.3. Feature scaling

Use the standardization to trainsform both training and test features

In [3]:
# Standardization
import numpy

# calculate mu and sig using the training set
d = x_train.shape[1]
mu = numpy.mean(x_train, axis=0).reshape(1, d)
sig = numpy.std(x_train, axis=0).reshape(1, d)

# transform the training features
x_train = (x_train - mu) / (sig + 1E-6)

# transform the test features
x_test = (x_test - mu) / (sig + 1E-6)

print('test mean = ')
print(numpy.mean(x_test, axis=0))

print('test std = ')
print(numpy.std(x_test, axis=0))

test mean = 
[[ 0.08893865  0.00625612 -0.06925102  0.00367375 -0.11008717 -0.0751612
  -0.03220125 -0.1473439 ]]
test std = 
[[1.01775645 0.94924062 1.13605568 0.91523984 0.73376482 0.87115693
  1.04727104 0.91999902]]


## 1.4. Add a dimension of all ones

In [4]:
n_train, d = x_train.shape
x_train = numpy.concatenate((x_train, numpy.ones((n_train, 1))), axis=1)

n_test, d = x_test.shape
x_test = numpy.concatenate((x_test, numpy.ones((n_test, 1))), axis=1)

print('Shape of x_train: ' + str(x_train.shape))
print('Shape of x_test: ' + str(x_test.shape))

Shape of x_train: (640, 9)
Shape of x_test: (128, 9)


# 2. Minibatch Gradient Descent 

In [5]:
# Calculate the objective Q_I and the gradient of Q_I
# Inputs:
#     w: d-by-1 matrix
#     xi: b-by-d matrix
#     yi: b-by-1 matrix
#     lam: scalar, the regularization parameter
#     b: integer, the batch size
# Return:
#     obj: scalar, the objective Q_i
#     g: d-by-1 matrix, gradient of Q_i
def mb_stochastic_objective_gradient(w, xi, yi, lam, b):
    b, d = xi.shape
    yx = numpy.multiply(yi,xi) # b by d matrix
    yxw = numpy.dot(yx,w) # b by 1 vector
    vec1 = numpy.exp(-1 * yxw) 
    vec2 = numpy.log(1+vec1) 
    loss = numpy.mean(vec2)
    reg = lam / 2 * numpy.sum(numpy.multiply(w,w))
    obj = loss + reg
    
    g_vec = -yx / (1 + numpy.exp(yxw))
    g = numpy.mean(g_vec,axis=0).reshape(d,1)
    g += lam * w
    
    return obj, g

In [6]:
import ray

ray.init()

@ray.remote
class Server:
    def __init__(self,m,d, stepsize):
        self.m = m # number of workers
        self.d = d # number of parameters 
        self.weight = numpy.zeros((d,1))
        self.stepsize = stepsize
        self.objvals = []
        
    def broadcast(self):
        return self.weight,self.stepsize
    
    def aggregate(self,objvals,weights):
        crnt_obj = 0 
        crnt_gradient = numpy.zeros((self.d,1))
        for i in range(self.m):
            crnt_gradient += weights[i]
            crnt_obj += objvals[i]
        crnt_gradient /= self.m
        crnt_obj /= self.m
        self.weight = crnt_gradient
        self.objvals.append(crnt_obj)
        self.stepsize *= .9 
    
    def get_objvals(self):
        return self.objvals

2020-03-20 14:55:11,954	INFO resource_spec.py:212 -- Starting Ray with 8.74 GiB memory available for workers and up to 4.39 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-03-20 14:55:12,240	INFO services.py:1078 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


In [7]:
@ray.remote
class Worker:
    def __init__(self,x,y,b):
        self.n = x.shape[0]
        self.d = x.shape[1]
        rand_indices = numpy.random.permutation(self.n)
        self.x = x[rand_indices,:]
        self.y = y[rand_indices,:]
        self.b = b
        # how many batches per update
        # do one quarter of the total batches
        self.batches = self.n//b//4
        # current batch
        self.crnt_batch = 0
        
    def update_params(self,weight,stepsize):
        self.weight = weight
        if self.crnt_batch == self.batches * 4:
            self.crnt_batch = 0
            rand_indices = numpy.random.permutation(self.n)
            self.x = self.x[rand_indices,:]
            self.y = self.y[rand_indices,:]
        crnt_obj = 0
        for i in range(self.crnt_batch, self.crnt_batch + self.batches):
            xi = self.x[i*self.b:(i+1)*self.b,:]
            yi = self.y[i*self.b:(i+1)*self.b,:]
            obj, w = mb_stochastic_objective_gradient(self.weight,xi,yi,stepsize,self.b)
            self.weight = self.weight - stepsize * w
            crnt_obj += obj
        
        crnt_obj /= self.batches
        return crnt_obj, self.weight

In [None]:
# MB-SGD with batch size b=8
lam = 1E-6 # do not change
b = 8 # do not change
stepsize = 0.4 # you must tune this parameter
m = 5
maxepochs =100
d = x_train.shape[1]

server = Server.remote(4,d,stepsize)
workers = []
chunksize = n_train // m
for i in range(m):
    x = x_train[i*chunksize:(i+1)*chunksize,:]
    y = y_train[i*chunksize:(i+1)*chunksize,:]
    workers.append(Worker.remote(x,y,b))

    
for i in range(maxepochs):
    # since each worker update only does 1/4 of the data its given
    # one epoch is 4 updates
    for i in range(4):
        objvals = []
        weights = []
        w, stepsize = ray.get(server.broadcast.remote())
        for worker in workers:
            obj, weight = ray.get(worker.update_params.remote(w,stepsize))
            objvals.append(obj)
            weights.append(weight)
        server.aggregate.remote(objvals,weights)

objvals = ray.get(server.get_objvals.remote())

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

communications = range(len(objvals))
line0, = plt.plot(communications, objvals, LineWidth=4)
plt.xlabel('Communications', FontSize=20)
plt.ylabel('Objective Value', FontSize=20)
plt.xticks(FontSize=16)
plt.yticks(FontSize=16)
plt.show()