<a href="https://colab.research.google.com/github/vvmnnnkv/private-ai/blob/master/Section%203%20-%20Encrypted%20Federated%20Learning%20Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Federated Learning with Encrypted Gradients Aggregation Project

This project is improvement of naive Federated Learning protocol implemented previuosly. Now the gradients are securely aggregated using additive secret sharing method.

Aggregation is done among workers so we don't need separate dedicated aggregator worker.

In [3]:
# install dependency
!pip install syft



In [4]:
import torch
import syft as sy
import math
import pandas as pd

hook = sy.TorchHook(torch)

W0706 07:15:57.749050 139834759829376 secure_random.py:26] Falling back to insecure randomness since the required custom op could not be found for the installed version of TensorFlow. Fix this by compiling custom ops. Missing file was '/usr/local/lib/python3.6/dist-packages/tf_encrypted/operations/secure_random/secure_random_module_tf_1.14.0.so'
W0706 07:15:57.772230 139834759829376 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/tf_encrypted/session.py:26: The name tf.Session is deprecated. Please use tf.compat.v1.Session instead.



In [5]:
# load MNIST included with Colab
def mnist_to_torch(df, train=True):
  y = pd.get_dummies(df[0])
  X = df.drop(0, axis=1)
  X, y = torch.tensor(X.values).type(torch.float), torch.tensor(y.values).type(torch.float)
  return X, y

# Train & test datasets
X_train, y_train = mnist_to_torch(pd.read_csv("sample_data/mnist_train_small.csv", header=None))
X_test, y_test = mnist_to_torch(pd.read_csv("sample_data/mnist_test.csv", header=None))

num_train = X_train.size(0)
num_features = X_train.size(1)

print("Train size %d, test size: %d" % (num_train, y_test.size(0)))

Train size 20000, test size: 10000


In [0]:
# number of workers
num_workers = 3
# Create workers
workers = []
for i in range(num_workers):
  worker = sy.VirtualWorker(hook, id="worker_%d" % i)
  workers.append(worker)

In [0]:
# Split data and send chunk to each worker
fed_dataset = []
chunk_size = num_train // num_workers
for i in range(num_workers):
  start = i * chunk_size
  if i + 1 < num_workers:
    end = (i + 1) * chunk_size
  else:
    end = num_train
  fed_dataset.append((
      X_train[start:end].send(workers[i]), 
      y_train[start:end].send(workers[i])
  ))


In [8]:
print(workers)
print(fed_dataset)

[<VirtualWorker id:worker_0 #objects:2>, <VirtualWorker id:worker_1 #objects:2>, <VirtualWorker id:worker_2 #objects:2>]
[((Wrapper)>[PointerTensor | me:84902066393 -> worker_0:96416324762], (Wrapper)>[PointerTensor | me:66644451691 -> worker_0:73625216394]), ((Wrapper)>[PointerTensor | me:16287878638 -> worker_1:71673147673], (Wrapper)>[PointerTensor | me:8234497527 -> worker_1:36528660801]), ((Wrapper)>[PointerTensor | me:18413793037 -> worker_2:3112532217], (Wrapper)>[PointerTensor | me:50245312515 -> worker_2:23713311158])]


In [0]:
# Calculate mean for all parameters of list of models and set to target_model
def avg_params(source_models, target_model):
  # get params and emptify
  avg_dict = target_model.state_dict()
  for param, _ in avg_dict.items():
    avg_dict[param].zero_()
  
  # sum up params
  for _, m in source_models.items():
    m_dict = m[0].state_dict()
    for param, _ in m_dict.items():
      avg_dict[param] += m_dict[param]
  
  # calc avg
  for param, _ in avg_dict.items():
    avg_dict[param] /= len(source_models)
  
  # set
  target_model.load_state_dict(avg_dict)

def share_model_grads(model, workers):
  out = {}
  for name, param in model.named_parameters():
    if not param.requires_grad: continue
    out[name] = param.grad.fix_prec().share(*workers)
  return out

  
# Federated training procedure
def fed_train(model, criteria, fed_dataset, test_dataset, opt, avg_epochs = 10, worker_epochs = 30, lr=0.001):
  for global_epoch in range(avg_epochs):
    # copy latest model to workers
    fed_models = {}
    for X, y in fed_dataset:
      fed_model = model.copy().send(X.location)
      optimizer = opt(params=fed_model.parameters(), lr=lr)
      fed_models[fed_model.location.id] = (fed_model, optimizer)
    
    # train in parallel on workers
    for local_epoch in range(worker_epochs):
      losses = []
      for X, y in fed_dataset:
        fed_model, optimizer = fed_models[X.location.id]
        pred = fed_model(X)
        loss = criteria(pred, y)
        loss.backward()
        optimizer.step()
        loss = loss.get()
        losses.append(loss)
      print('Avg loss (%d/%d): %f' % (global_epoch, local_epoch, sum(losses) / len(losses)))
      
    # aggregate worker's models
    # share each model to each worker
    all_grads = {}
    for w, fm in fed_models.items():
      print('sharing %s model' % w)
      all_grads[w] = share_model_grads(fm[0], workers)
      # move shared pointers to my machine
      all_grads[w] = { k:v.get() for k,v in all_grads[w].items() }
    print(all_grads)
      
    # prepare avg model placeholder
    with torch.no_grad():
      # calc grads sum
      grads_sum = None
      for w, grads in all_grads.items():
        if not grads_sum:
          print('grad sum = ', grads)
          grads_sum = grads
          continue
        for n, data in grads_sum.items():
          # print(grads_sum[n], grads[n])
          grads_sum[n] += grads[n]
      
      state = model.state_dict()
      # cal avg, retrieve and apply to local model
      for n, data in grads_sum.items():
        print('calculating %s' % n)
        grads_sum[n] /= len(workers)
        grads_sum[n] = grads_sum[n].get().float_prec()
        print(grads_sum[n])
        state[n] -= lr * grads_sum[n]
      model.load_state_dict(state)

      # calculate accuracy on test set
      X_test, y_test = test_dataset
      y_pred = torch.softmax(model(X_test), dim=1)
      valid = (torch.argmax(y_pred, dim=1) == torch.argmax(y_test, dim=1)).sum()
      print('Test Accuracy: %f' % (float(valid) / float(y_test.size(0))))

  return model

    

In [24]:
# Define a simple MLP model (softmax is included in loss)
model = torch.nn.Sequential(
  torch.nn.Linear(num_features, 50),
  torch.nn.ReLU(),
  torch.nn.Linear(50, 10)
)
loss = torch.nn.modules.loss.BCEWithLogitsLoss()

# Train!
fed_train(model, loss, fed_dataset, (X_test, y_test), torch.optim.SGD, 10, 20)

t = torch.tensor([1,2,3])
t = t.send(workers[1])
t = t.share(*workers)
t = t.get()
t

Avg loss (0/0): 8.216489
Avg loss (0/1): 3.180396
Avg loss (0/2): 2.158376
Avg loss (0/3): 2.191385
Avg loss (0/4): 2.217928
Avg loss (0/5): 2.153948
Avg loss (0/6): 1.974271
Avg loss (0/7): 1.708811
Avg loss (0/8): 1.441572
Avg loss (0/9): 1.182407
Avg loss (0/10): 0.929071
Avg loss (0/11): 0.765021
Avg loss (0/12): 0.851567
Avg loss (0/13): 1.048102
Avg loss (0/14): 1.109802
Avg loss (0/15): 1.018232
Avg loss (0/16): 0.896424
Avg loss (0/17): 0.805563
Avg loss (0/18): 0.754212
Avg loss (0/19): 0.734603
sharing worker_0 model
sharing worker_1 model
sharing worker_2 model
{'worker_0': {'0.weight': (Wrapper)>FixedPrecisionTensor>[AdditiveSharingTensor]
	-> (Wrapper)>[PointerTensor | me:88620217308 -> worker_0:36439259640]
	-> (Wrapper)>[PointerTensor | me:99039981213 -> worker_1:84745112870]
	-> (Wrapper)>[PointerTensor | me:4072167083 -> worker_2:33391834009]
	*crypto provider: me*, '0.bias': (Wrapper)>FixedPrecisionTensor>[AdditiveSharingTensor]
	-> (Wrapper)>[PointerTensor | me:31554

(Wrapper)>[AdditiveSharingTensor]
	-> (Wrapper)>[PointerTensor | me:29034010016 -> worker_0:51334403573]
	-> (Wrapper)>[PointerTensor | me:82552229219 -> worker_1:45373275418]
	-> (Wrapper)>[PointerTensor | me:77865090764 -> worker_2:54095967242]
	*crypto provider: me*