<h1>Distributed Federated Learning with Boston Housing Data</h1>

<b>Importing the libraries</b>

<ul>
<li> Pytorch (to train the model)
<li> Pysyft (to train using federated approach)
</ul>

In [1]:
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import time
import copy
import numpy as np
import syft as sy
from syft.frameworks.torch.fl import utils
from syft.workers.websocket_client import WebsocketClientWorker
import pandas as pd

In [2]:
class Parser:
    def __init__(self):
        self.epochs = 50
        self.lr = 0.001
        self.test_batch_size = 8
        self.batch_size = 8
        self.log_interval = 10
        self.seed = 1
        self.weight_scale = 0.03
    
args = Parser()
torch.manual_seed(args.seed)

<torch._C.Generator at 0x1c81e82d5d0>

<b>Data Preprocessing</b>

In [3]:
with open('./data/boston_housing.pickle','rb') as f:
    ((x, y), (x_test, y_test)) = pickle.load(f)

x = torch.from_numpy(x).float()
y = torch.from_numpy(y).float()
x_test = torch.from_numpy(x_test).float()
y_test = torch.from_numpy(y_test).float()

In [4]:
mean = x.mean(0, keepdim=True)
dev = x.std(0, keepdim=True)
mean[:, 3] = 0.
dev[:, 3] = 1.
x = (x - mean) / dev
x_test = (x_test - mean) / dev
train = TensorDataset(x, y)
test = TensorDataset(x_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True)

<b>Creating architecture of the Neural Network model</b>

In [5]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(13, 32)
        self.fc2 = nn.Linear(32, 24)
        self.fc4 = nn.Linear(24, 16)
        self.fc3 = nn.Linear(16, 1)

    def forward(self, x):
        x = x.view(-1, 13)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc4(x))
        x = self.fc3(x)
        return x
    
    def federated_avg(self, neighbour_model_params=None, current_worker=None):
        #Average two models weights
        
        params1 = self.state_dict()
        params2 = neighbour_model_params.copy()
      
        with torch.no_grad():
            for name1 in params1:
                if name1 in params2:
                    neighbour_ptr = params2[name1].copy()
                    neighbour_ptr.move(current_worker)
                    #print(current_worker, neighbour_ptr)
                    params1[name1] = (params1[name1]  +  neighbour_ptr)/2
        
                    
        self.load_state_dict(params1, strict=False)
        
    

<b>Connect to the workers or the devices for training</b>

In [6]:
hook = sy.TorchHook(torch)
bob_worker = sy.VirtualWorker(hook, id="bob")
alice_worker = sy.VirtualWorker(hook, id="alice")
charlie_worker = sy.VirtualWorker(hook, id="charlie")
# kwargs_websocket = {"host": "localhost", "hook": hook}
# alice = WebsocketClientWorker(id='alice', port=8779, **kwargs_websocket)
# bob = WebsocketClientWorker(id='bob', port=8778, **kwargs_websocket)
compute_nodes = [bob_worker, alice_worker, charlie_worker]

<b>Though data will be available offline for federated learning with the workers but here we are sending the data over to the workers for training with ondevice capability</b>

In [7]:
remote_dataset = (list(), list(), list())
train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    remote_dataset[batch_idx % len(compute_nodes)].append((data, target))

In [8]:
len(remote_dataset)

3

In [9]:
bobs_model = Net()
alices_model = Net()
charlie_model = Net()
bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)
charlie_optimizer = optim.SGD(charlie_model.parameters(), lr=args.lr)

In [10]:
models = [bobs_model, alices_model, charlie_model]
optimizers = [bobs_optimizer, alices_optimizer, charlie_optimizer]

In [11]:
model = Net()
model

Net(
  (fc1): Linear(in_features=13, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=24, bias=True)
  (fc4): Linear(in_features=24, out_features=16, bias=True)
  (fc3): Linear(in_features=16, out_features=1, bias=True)
)

In [12]:
def read_events(eventsfile):
    eventsdf = pd.read_csv(eventsfile, index_col=0)
    return eventsdf

In [13]:
eventsdf = read_events("three_worker_events.csv")
eventsdf

Unnamed: 0_level_0,worker_id1,worker_id2
batchnum,Unnamed: 1_level_1,Unnamed: 2_level_1
0,1,2
1,2,3
2,1,3


In [14]:
def update(data, target, model, optimizer):
    model.send(data.location)
    #print(model, data)
    optimizer.zero_grad()
    prediction = model(data)
    loss = F.mse_loss(prediction.view(-1), target)
    loss.backward()
    optimizer.step()
    
    return model

def exchange_parameters(models, data_index):
    #Get the workers ID from event dataframe to exchange parameters between two worker models.
    event_idx = data_index % len(eventsdf)
    event_row = eventsdf.loc[[event_idx]]
    worker_id1 = event_row['worker_id1'].values[0]-1
    worker_id2 = event_row['worker_id2'].values[0]-1
    worker1_model = models[worker_id1]
    worker2_model = models[worker_id2]
    worker1_params = worker1_model.state_dict()
    worker2_params = worker2_model.state_dict()
    
    #Send worker2 params to worker1 for averaging at worker1
    worker1_model.federated_avg(worker2_params, compute_nodes[worker_id1])
    
    #Send worker1 params to worker2 for averaging at worker2
    worker2_model.federated_avg(worker1_params, compute_nodes[worker_id2])
    
    

def train():
    for data_index in range(len(remote_dataset[0])-1):
        #print(f"Trip {data_index}")
        for remote_index in range(len(compute_nodes)):
            data, target = remote_dataset[remote_index][data_index]
            models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])
           
        #Exchange of parameter between two remote models.
        exchange_parameters(models, data_index)
        
        for model in models:
            model.get()
            

In [15]:
def test(worker_node, federated_model):
    federated_model.eval()
    test_loss = 0
    for data, target in test_loader:
        output = federated_model(data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item()
        predection = output.data.max(1, keepdim=True)[1]
        
    test_loss /= len(test_loader.dataset)
    print('Test set for worker {}: Average loss: {:.4f}'.format(worker_node.id, test_loss))

In [16]:
for epoch in range(args.epochs):
    start_time = time.time()
    print(f"Epoch Number {epoch + 1}")
    train()
    
    for node, model in zip(compute_nodes,models):
        test(node, model)
    total_time = time.time() - start_time
    print('Communication time over the network', round(total_time, 2), 's\n')

Epoch Number 1
Test set for worker bob: Average loss: 584.5751
Test set for worker alice: Average loss: 584.5274
Test set for worker charlie: Average loss: 584.5947
Communication time over the network 1.59 s

Epoch Number 2
Test set for worker bob: Average loss: 536.6637
Test set for worker alice: Average loss: 536.5852
Test set for worker charlie: Average loss: 536.6685
Communication time over the network 1.64 s

Epoch Number 3
Test set for worker bob: Average loss: 421.7588
Test set for worker alice: Average loss: 421.5169
Test set for worker charlie: Average loss: 421.1362
Communication time over the network 1.69 s

Epoch Number 4
Test set for worker bob: Average loss: 44.6196
Test set for worker alice: Average loss: 45.4724
Test set for worker charlie: Average loss: 47.3344
Communication time over the network 1.6 s

Epoch Number 5
Test set for worker bob: Average loss: 21.3561
Test set for worker alice: Average loss: 22.1148
Test set for worker charlie: Average loss: 23.1130
Commun

Test set for worker bob: Average loss: 20.3979
Test set for worker alice: Average loss: 20.6635
Test set for worker charlie: Average loss: 20.8650
Communication time over the network 1.78 s

Epoch Number 42
Test set for worker bob: Average loss: 20.4586
Test set for worker alice: Average loss: 20.7691
Test set for worker charlie: Average loss: 20.8586
Communication time over the network 1.72 s

Epoch Number 43
Test set for worker bob: Average loss: 20.0111
Test set for worker alice: Average loss: 19.9907
Test set for worker charlie: Average loss: 20.9072
Communication time over the network 1.66 s

Epoch Number 44
Test set for worker bob: Average loss: 20.2678
Test set for worker alice: Average loss: 20.5576
Test set for worker charlie: Average loss: 20.5964
Communication time over the network 1.65 s

Epoch Number 45
Test set for worker bob: Average loss: 20.1979
Test set for worker alice: Average loss: 20.3918
Test set for worker charlie: Average loss: 20.7355
Communication time over t