Federated Learning with heterogeneous clients using PyTorch and PySyft with Trusted FedAvg on DNS traffic datasets.

Trusted FedAvg paper: https://arxiv.org/pdf/2104.07853.pdf

Heterogeneous clients paper: https://arxiv.org/pdf/2010.01264.pdf

## Add libraries, define FL clients

In [None]:
!pip install syft==0.2.9

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as sched
from torch.nn import BCELoss
import torch.utils.data as tud
import numpy as np
import pandas as pd
from statistics import median
import syft as sy

# hook PyTorch to PySyft, i.e. add extra functionalities to support Federated Learning and other private AI tools
hook = sy.TorchHook(torch)

In [None]:
# create clients
clients = []
for i in range(0, 21):
    clients.append(sy.VirtualWorker(hook, id='client'+str(i+1)))

## Load data

In [None]:
# load clients' datasets and test sets
clients_datasets = []
testsets = []
for i in range(0, 21):
    clients_datasets.append(pd.read_csv('client' + str(i+1) + '.csv').astype('float32'))

for n in [7, 14, 21]:
    testsets.append(pd.read_csv('test' + str(n) + '.csv').astype('float32'))

In [None]:
# transform to tensors
features_train = [torch.tensor(cd.iloc[:,:-1].to_numpy()) for cd in clients_datasets]
target_train = [torch.tensor(cd['target'].to_numpy()) for cd in clients_datasets]

features_test = [torch.tensor(t.iloc[:,:-1].to_numpy()) for t in testsets]
target_test = [torch.tensor(t['target'].to_numpy()) for t in testsets]

## Define training parameters and models, send data to clients

In [None]:
# define the args
args = {
    'use_cuda' : True,
    'batch_size' : 128,
    'test_batch_size' : 1000,
    'lr' : 0.001,
    'log_interval' : 200,
    'epochs' : 7
}

# check to use GPU or not
use_cuda = args['use_cuda'] and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [None]:
# create a simple feedforward network
# n features as input, 2*n+1 hidden layer neurons, 1 output for binary classification
class MLP(nn.Module):
    
    def __init__(self, n):
        super(MLP, self).__init__()
        self.n = n
        
        self.layers = nn.Sequential(
            nn.Linear(in_features=n, out_features=2*n+1),
            nn.ReLU(),
            nn.Linear(in_features=2*n+1, out_features=1),
            nn.Sigmoid()
        )
            
    def forward(self, x):
        return self.layers(x)

In [None]:
# distribute data across workers
# normally there is no need to distribute data, since it is already at the clients
# this is more of a simulation of federated learning
train_datasets = [sy.BaseDataset(features_train[i].send(clients[i]), target_train[i].send(clients[i])) for i in range(len(clients))]
federated_datasets = [sy.FederatedDataset(train_datasets[:n]) for n in [7, 14, 21]]
federated_train_loaders = [sy.FederatedDataLoader(fd, batch_size=args['batch_size'], shuffle=True) for fd in federated_datasets]

# test data remains at the central entity
test_datasets = [tud.TensorDataset(features_test[i], target_test[i]) for i in range(len(testsets))]
test_loaders = [tud.DataLoader(td, batch_size=args['test_batch_size'], shuffle=True) for td in test_datasets]

## Train, test, aggregation, trust computation functions

In [None]:
# classic torch code for training except for the federated part
def train_federated(args, models, device, train_loader, optimizers, epoch):
    for c, m in models.items():
        m.train()
        # send models to workers
        m.send(c)

    # iterate over federated data client by client
    # of course, in reality all clients would train their models at the same time
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizers[data.location].zero_grad()
        output = models[data.location](data)

        # loss is a ptr to the tensor loss at the remote location
        loss = BCELoss()(output, torch.reshape(target, [len(target),1]))
        # call backward() on the loss ptr, that will send the command to call
        # backward on the actual loss tensor present on the remote machine
        loss.backward()
        optimizers[data.location].step()

        if batch_idx % args['log_interval'] == 0:
            # get back loss, that was created at remote worker
            loss = loss.get()
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tWorker: {}'.format(
                    epoch, 
                    batch_idx * args['batch_size'], # number of packets done
                    len(train_loader) * args['batch_size'], # total packets
                    100. * batch_idx / len(train_loader), # percentage of batches done
                    loss,
                    data.location.id
                )
            )

    # get back models for aggregation
    for m in models.values():
        m = m.get()

In [None]:
# classic torch code for testing
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)

            # add losses together
            test_loss += BCELoss(reduction='sum')(output, torch.reshape(target, [len(target),1])).item()

            # get the index of the max probability class
            pred = pred = torch.round(output)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [None]:
def aggregate(central_model, models, weights, trust):
    with torch.no_grad():
        # dataXtrust values needed for normalization later
        dataXtrust_hidden_weight = np.zeros(central_model.layers[0].weight.shape, dtype='float32')
        dataXtrust_hidden_bias = np.zeros(central_model.layers[0].bias.shape, dtype='float32')
        dataXtrust_output_weight = np.zeros(central_model.layers[2].weight.shape, dtype='float32')
        dataXtrust_output_bias = 0
        # firstly compute new aggregated weight values
        # to do so we start by taking the sum of the weights of all clients
        for i, c in enumerate(used_clients):
            # each client only contributes to chosen features (i.e. columns of weights arrays)
            # for each of these features (columns), the aggregation uses the first x elements (rows) of central model weights
            # where x is the number of hidden layer neurons of client and is equal to 2*(number_of_features_of_client)+1
            rows = 2*models[c].n+1
            for j, feature in enumerate(clients_datasets[i].columns[:-1]):
                # find the index of feature in the central_model
                index = testsets[0].columns[:-1].get_loc(feature)
                weights['hidden_mean_weight'][:rows, index] += models[c].layers[0].weight.data[:, j].clone()*len(clients_datasets[i])*trust[c]
                dataXtrust_hidden_weight[:rows, index] += len(clients_datasets[i])*trust[c]
            # the rest of the weights don't have to be calculated feature-wise
            weights['hidden_mean_bias'][:rows] += models[c].layers[0].bias.data.clone()*len(clients_datasets[i])*trust[c]
            dataXtrust_hidden_bias[:rows] += len(clients_datasets[i])*trust[c]
            weights['output_mean_weight'][0, :rows] += models[c].layers[2].weight.data[0, :].clone()*len(clients_datasets[i])*trust[c]
            dataXtrust_output_weight[0, :rows] += len(clients_datasets[i])*trust[c]
            weights['output_mean_bias'] += models[c].layers[2].bias.data.clone()*len(clients_datasets[i])*trust[c]
            dataXtrust_output_bias += len(clients_datasets[i])*trust[c]

        # and then we normalize the sum taking into account number of data and trust value for each client
        # again parts of weights' arrays are normalized with respect only to clients that contributed to these parts
        # change zero dataXtrust values to ones
        dataXtrust_hidden_weight[dataXtrust_hidden_weight == 0] = 1
        dataXtrust_hidden_bias[dataXtrust_hidden_bias == 0] = 1
        dataXtrust_output_weight[dataXtrust_output_weight == 0] = 1
        weights['hidden_mean_weight'] /= dataXtrust_hidden_weight
        weights['hidden_mean_bias'] /= dataXtrust_hidden_bias
        weights['output_mean_weight'] /= dataXtrust_output_weight
        weights['output_mean_bias'] /= dataXtrust_output_bias

        # secondly copy new weight values to the local models of all clients
        for i, c in enumerate(used_clients):
            rows = 2*models[c].n+1
            for j, feature in enumerate(clients_datasets[i].columns[:-1]):
                index = testsets[0].columns[:-1].get_loc(feature)
                models[c].layers[0].weight.data[:, j] = weights['hidden_mean_weight'][:rows, index].clone()
            # the rest of the weights don't have to be copied feature-wise
            models[c].layers[0].bias.data = weights['hidden_mean_bias'][:rows].clone()
            models[c].layers[2].weight.data[0, :] = weights['output_mean_weight'][0, :rows].clone()
            models[c].layers[2].bias.data = weights['output_mean_bias'].clone()

        # and finally copy to the central model for the test set
        central_model.layers[0].weight.data = weights['hidden_mean_weight'].clone()
        central_model.layers[0].bias.data = weights['hidden_mean_bias'].clone()
        central_model.layers[2].weight.data = weights['output_mean_weight'].clone()
        central_model.layers[2].bias.data = weights['output_mean_bias'].clone()

In [None]:
def computeTrust(models, trust, r, s, num_of_clients_in_weights):
    # dev[i] shows how much the weights of model of client i differ from the models of all other clients
    # it is calculated in accordance with the relevant paper, but also taking into account the heterogeneity of models 
    dev = [0 for i in used_clients]
    for i, c in enumerate(used_clients):
        for j, cc in enumerate(used_clients):
            # the smallest model defines the number of weights of rows (neurons) that will be compared
            rows = min(2*models[c].n+1, 2*models[cc].n+1)
            # between 2 clients, only weights of features that both have chosen are compared
            for indexi, feature in enumerate(clients_datasets[i].columns[:-1]): 
                try:
                    # find the index of the column of feature in cc client, provided that cc has chosen this feature
                    indexj = clients_datasets[j].columns[:-1].get_loc(feature)
                except:
                    # go to the next feature, if current feature not chosen by cc
                    continue
                # for hidden layer, add to dev the sum of squared differences of weights of models divided by the number of clients which have each weight
                to_divide = num_of_clients_in_weights['hidden'][:rows, testsets[0].columns[:-1].get_loc(feature)]
                difference = models[cc].layers[0].weight.data[:rows, indexj].cpu() - models[c].layers[0].weight.data[:rows, indexi].cpu()
                dev[i] += np.sum(difference.numpy()**2 / to_divide)
            # output layer weights don't have to be compared feature-wise
            # same as above for the output layer
            difference = models[cc].layers[2].weight.data[0, :rows].cpu() - models[c].layers[2].weight.data[0, :rows].cpu()
            dev[i] += np.sum(difference.numpy()**2 / num_of_clients_in_weights['output'][0, :rows])

    # I[i] = 1 if client i acts normally and 0 if malicious or malfunctions
    I = [1 if d <= 1.3*median(sorted(dev)) else 0 for d in dev]
    print("dev: ",dev) # testing
    print("median*1.3: ", 1.3*median(sorted(dev))) # testing
    print("I: ", I) # testing
 
    # compute new r, s values for every client
    for i in range(len(used_clients)):
        p1 = 0.5
        #p2 = lambda x: x/median(sorted(dev)) if x/median(sorted(dev)) > 3 and x > 30 else (x/1000 if x > 1000 else (0.01 if I[i] == 1 and s[i] > 10 else 0.7))
        p2 = lambda x: 0.8
        r[i] = p1*r[i] + I[i]
        s[i] = p2(dev[i])*s[i] + 1 - I[i]

    # compute new trust value of every client
    for i, c in enumerate(used_clients):
        trust[c] = (r[i]+1)/(r[i]+s[i]+2)

## FL training with 7 clients

In [None]:
# take appropriate number of clients
used_clients = clients[:7]

# central model
central_model = MLP(len(testsets[0].columns[:-1])).to(device)
# initialize weights of central model to zero,
# so that features which are dropped by all clients do not affect testing
central_model.layers[0].weight.data.fill_(0)
central_model.layers[0].bias.data.fill_(0)
central_model.layers[2].weight.data.fill_(0)
central_model.layers[2].bias.data.fill_(0)

# clients' models, optimizers and schedulers for learning rate
# note that central entity knows the chosen features of each client from the preprocessing procedure
models = {used_clients[i]:MLP(len(clients_datasets[i].columns[:-1])).to(device) for i in range(len(used_clients))}
optimizers = {i:optim.SGD(models[i].parameters(), lr=args['lr']) for i in used_clients}
# decreasing learning rate
lamda = lambda epoch: 1 if epoch < 3 else 0.5
schedulers = {i:sched.LambdaLR(optimizers[i], lr_lambda=lamda) for i in used_clients}

# initialization of dictionary for models aggregation
weights = {'hidden_mean_weight' : torch.zeros(size=central_model.layers[0].weight.shape).to(device),
           'hidden_mean_bias' : torch.zeros(size=central_model.layers[0].bias.shape).to(device),
           'output_mean_weight' : torch.zeros(size=central_model.layers[2].weight.shape).to(device),
           'output_mean_bias' : torch.zeros(size=central_model.layers[2].bias.shape).to(device)}

# trust values
trust = {i:0 for i in used_clients}
r = [0 for i in used_clients]
s = [0 for i in used_clients]

# for each weight of central_model, count the number of clients which contain this weight in their models
# needed to compute the trust value of each client
num_of_clients_in_weights = {'hidden' : np.zeros(central_model.layers[0].weight.shape),
                             'output' : np.zeros(central_model.layers[2].weight.shape)}
for i, c in enumerate(used_clients):
    rows = 2*models[c].n+1
    num_of_clients_in_weights['output'][0, :rows] += 1
    for j, feature in enumerate(clients_datasets[i].columns[:-1]):
        index = testsets[0].columns[:-1].get_loc(feature)
        num_of_clients_in_weights['hidden'][:rows, index] += 1

for epoch in range(1, args['epochs'] + 1):
    train_federated(args, models, device, federated_train_loaders[0], optimizers, epoch)
    for scheduler in schedulers.values():
        scheduler.step()
    computeTrust(models, trust, r, s, num_of_clients_in_weights)
    aggregate(central_model, models, weights, trust)
    test(central_model, device, test_loaders[0])

dev:  [11.937261551306479, 13.101656778337404, 10.436748481400839, 10.301939393553068, 13.255829285578896, 10.324052673415911, 12.82865822033509]
median*1.3:  15.518440016698424
I:  [1, 1, 1, 1, 1, 1, 1]

Test set: Average loss: 0.5069, Accuracy: 342685/387902 (88%)

dev:  [0.16585561326104625, 0.17980549314526437, 0.16179204629390426, 0.18786363223504154, 0.26976724305282257, 0.1321316018695104, 0.7189396872945761]
median*1.3:  0.23374714108884367
I:  [1, 1, 1, 1, 0, 1, 0]

Test set: Average loss: 0.3668, Accuracy: 346976/387902 (89%)

dev:  [0.0794230192353506, 0.08900408387428455, 0.07075150615368425, 0.0971061189109085, 0.1700279687323348, 0.06299712152486678, 0.2647891024498614]
median*1.3:  0.1157053090365699
I:  [1, 1, 1, 1, 0, 1, 0]

Test set: Average loss: 0.3421, Accuracy: 347956/387902 (90%)

dev:  [0.018255273151173784, 0.020319157088546, 0.01527230775428249, 0.021618007772387236, 0.04491545102590407, 0.014925336571474036, 0.052635614853107957]
median*1.3:  0.02641490421510

## Non-federated training results

In [None]:
def train(models, device, train_loader, optimizers):
    for c, m in models.items():
        m.train()
        m.send(c)

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizers[data.location].zero_grad()
        output = models[data.location](data)
        loss = BCELoss()(output, torch.reshape(target, [len(target),1]))
        loss.backward()
        optimizers[data.location].step()

    for m in models.values():
        m = m.get()

In [None]:
def test_all(test_model, models, device, test_loader):
    test_model.eval()
    with torch.no_grad():
        for i, c in enumerate(used_clients):
            rows = 2*models[c].n+1
            for j, feature in enumerate(clients_datasets[i].columns[:-1]):
                index = testsets[0].columns[:-1].get_loc(feature)
                test_model.layers[0].weight.data[:rows, index] = models[c].layers[0].weight.data[:, j].clone()
            test_model.layers[0].bias.data[:rows] = models[c].layers[0].bias.data.clone()
            test_model.layers[2].weight.data[0, :rows] = models[c].layers[2].weight.data[0, :].clone()
            test_model.layers[2].bias.data = models[c].layers[2].bias.data.clone()

            test_loss = 0
            correct = 0
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = test_model(data)
                test_loss += BCELoss(reduction='sum')(output, torch.reshape(target, [len(target),1])).item()
                pred = pred = torch.round(output)
                correct += pred.eq(target.view_as(pred)).sum().item()
            test_loss /= len(test_loader.dataset)

            print('\tClient' + str(i+1) + ': Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))

In [None]:
used_clients = clients[:7]
test_model = MLP(len(testsets[0].columns[:-1])).to(device)
test_model.layers[0].weight.data.fill_(0)
test_model.layers[0].bias.data.fill_(0)
test_model.layers[2].weight.data.fill_(0)
test_model.layers[2].bias.data.fill_(0)
models = {used_clients[i]:MLP(len(clients_datasets[i].columns[:-1])).to(device) for i in range(len(used_clients))}
optimizers = {i:optim.SGD(models[i].parameters(), lr=args['lr']) for i in used_clients}
lamda = lambda epoch: 1 if epoch < 3 else 0.5
schedulers = {i:sched.LambdaLR(optimizers[i], lr_lambda=lamda) for i in used_clients}

for epoch in range(1, args['epochs'] + 1):
    train(models, device, federated_train_loaders[0], optimizers)
    for scheduler in schedulers.values():
        scheduler.step()
    print('Train Epoch ' + str(epoch) + ':')
    test_all(test_model, models, device, test_loaders[0])
    print()

Train Epoch 1:
	Client1: Average loss: 0.5419, Accuracy: 253311/387902 (65%)
	Client2: Average loss: 0.4953, Accuracy: 312430/387902 (81%)
	Client3: Average loss: 0.5538, Accuracy: 344678/387902 (89%)
	Client4: Average loss: 0.6030, Accuracy: 276110/387902 (71%)
	Client5: Average loss: 0.6955, Accuracy: 181814/387902 (47%)
	Client6: Average loss: 0.5911, Accuracy: 304388/387902 (78%)
	Client7: Average loss: 0.3941, Accuracy: 348026/387902 (90%)

Train Epoch 2:
	Client1: Average loss: 0.4920, Accuracy: 238917/387902 (62%)
	Client2: Average loss: 0.4601, Accuracy: 277588/387902 (72%)
	Client3: Average loss: 0.4703, Accuracy: 342672/387902 (88%)
	Client4: Average loss: 0.5662, Accuracy: 260011/387902 (67%)
	Client5: Average loss: 0.6160, Accuracy: 332187/387902 (86%)
	Client6: Average loss: 0.4201, Accuracy: 352529/387902 (91%)
	Client7: Average loss: 0.4154, Accuracy: 348877/387902 (90%)

Train Epoch 3:
	Client1: Average loss: 0.4712, Accuracy: 250316/387902 (65%)
	Client2: Average loss: