# Federated learning

Ezen a gyakorlaton egy egyszerű *federated learning* modellt fogunk építeni (szimulálni). Ez annyiban tér el a szokásos felügyelt tanulási sémánktól, hogy a tanítóadatok most közvetlenül nem állnak rendelkezésünkre, hanem elosztott módon, több kliensen helyezkednek el. Kézenfekvő példa lehet egy mobil operációs rendszer autocorrect/autocomplete szolgáltatása, ahol az adatok csak lokálisan, az eszközön érhetők el (pl. korábban begépelt vagy kiválasztott szavak), a predikcióhoz mégis egy közös, kollektív modellt szeretnénk tanulni. A módszer alapötlete, hogy magát a tanulást is a klienseken végezzük; az adatokat nem kell centralizálni, így a kommunikációs overhead elkerülhető, a biztonsági aggályokról nem is beszélve.

A gyakorlatban rendszerint a következő történik:
- A kliens megkapja az aktuális globális modellt (pl. mély neurális hálózat),
- Ezt frissíti a lokális adatok alapján (pl. backpropagation),
- A frissítést (pl. új súlyokat) visszaküldi a szervernek megfelelő titkosítás mellett (lásd pl. differential privacy, SecureNN/SPDZ protokollok)
- A szerver aggregálja (pl. átlagolja) a beérkezett adatokat és frissíti a globális modellt, amit akár rögtön vissza is küldhet a klienseknek.

Ezen a gyakorlaton a biztonsági aspektusokkal nem foglalkozunk, ám más problémák is felmerülnek:
- Az egyes klienseken található adatok más-más eloszlást követnek (nem IID)
- Az egyes klienseken eltérhet az adatok mennyisége (kiegyensúlyozatlanság)
- Technikai problémák: pl. kliensek nagy száma, kommunikációs overhead/sávszélességi korlátok

Az általunk használt algoritmus részletei megtalálhatók itt: https://arxiv.org/pdf/1602.05629.pdf (1. algoritmus). Az egyszerűség kedvéért eltekintünk a véletlenszerűen kiválasztott kliensektől, azaz minden frissítésben minden kliens részt fog venni. Szintén az egyszerűség érdekében egy elosztott képklasszfikáló modellt tanulunk a MNIST dataset alapján.

In [4]:
%pylab inline

import torch
from torch import nn
from torchvision import datasets, transforms

from tqdm import tqdm

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  "\n`%matplotlib` prevents importing * from pylab and numpy"


**1. feladat.** Hozzon létre egy képklasszifikáló modellt a MNIST adathalmazra! Értékelje ki a prediktív teljesítményt ($>95\%$)!

Példa: https://github.com/yunjey/pytorch-tutorial/blob/master/tutorials/02-intermediate/convolutional_neural_network/main.py

In [5]:
# Hyper parameters
num_epochs = 5
num_classes = 10
batch_size = 100
learning_rate = 0.001

num_clients = 10

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [6]:
class Model(torch.nn.Module):
  def __init__(self, num_classes=10):
    super(Model, self).__init__()
    self.layer1 = nn.Sequential(
        nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
        nn.BatchNorm2d(16),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2))
    self.layer2 = nn.Sequential(
        nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2))
    self.fc = nn.Linear(7*7*32, num_classes)
        
  def forward(self, x):
    out = self.layer1(x)
    out = self.layer2(out)
    out = out.reshape(out.size(0), -1)
    out = self.fc(out)
    return out

In [9]:
train_data = datasets.MNIST('../data', train=True, download=True,
                            transform=transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize((0.1307,), (0.3081,))
                            ]))
test_data = datasets.MNIST('../data', train=False, download=True,
                            transform=transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize((0.1307,), (0.3081,))
                            ]))

In [10]:
# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_data,
                                           batch_size=batch_size,
                                           num_workers=1,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_data,
                                          batch_size=batch_size,
                                          num_workers=1,
                                          shuffle=False)

In [14]:
# Loss and optimizer
model = Model().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [15]:
# Train the model
total_step = len(train_loader)
for epoch in tqdm(range(num_epochs)):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))


  0%|          | 0/5 [00:00<?, ?it/s][A

Epoch [1/5], Step [100/600], Loss: 0.2881
Epoch [1/5], Step [200/600], Loss: 0.1064
Epoch [1/5], Step [300/600], Loss: 0.0402
Epoch [1/5], Step [400/600], Loss: 0.0491
Epoch [1/5], Step [500/600], Loss: 0.0615



 20%|██        | 1/5 [00:05<00:20,  5.19s/it][A

Epoch [1/5], Step [600/600], Loss: 0.0795
Epoch [2/5], Step [100/600], Loss: 0.0539
Epoch [2/5], Step [200/600], Loss: 0.0402
Epoch [2/5], Step [300/600], Loss: 0.0334
Epoch [2/5], Step [400/600], Loss: 0.0532
Epoch [2/5], Step [500/600], Loss: 0.0652



 40%|████      | 2/5 [00:10<00:15,  5.11s/it][A

Epoch [2/5], Step [600/600], Loss: 0.0626
Epoch [3/5], Step [100/600], Loss: 0.0372
Epoch [3/5], Step [200/600], Loss: 0.0176
Epoch [3/5], Step [300/600], Loss: 0.0535
Epoch [3/5], Step [400/600], Loss: 0.1548
Epoch [3/5], Step [500/600], Loss: 0.0256



 60%|██████    | 3/5 [00:15<00:10,  5.14s/it][A

Epoch [3/5], Step [600/600], Loss: 0.0618
Epoch [4/5], Step [100/600], Loss: 0.0171
Epoch [4/5], Step [200/600], Loss: 0.0113
Epoch [4/5], Step [300/600], Loss: 0.0103
Epoch [4/5], Step [400/600], Loss: 0.0468
Epoch [4/5], Step [500/600], Loss: 0.0330



 80%|████████  | 4/5 [00:20<00:05,  5.11s/it][A

Epoch [4/5], Step [600/600], Loss: 0.0052
Epoch [5/5], Step [100/600], Loss: 0.0221
Epoch [5/5], Step [200/600], Loss: 0.0121
Epoch [5/5], Step [300/600], Loss: 0.0089
Epoch [5/5], Step [400/600], Loss: 0.0079
Epoch [5/5], Step [500/600], Loss: 0.0199



100%|██████████| 5/5 [00:25<00:00,  5.10s/it][A

Epoch [5/5], Step [600/600], Loss: 0.0434





In [16]:
# Test the model
model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))

Test Accuracy of the model on the 10000 test images: 98.89 %


**2. feladat.** Hozzon létre egy `TorchClient` osztályt, amely az előbbi architektúrával megegyező neurális hálózatot tartalmaz! Az `__init__()` függvényben valósítson meg egy IID ill. nem IID adatfelbontási sémát (véletlenszerűen szétosztott adatok vs. minden kliens csak egyetlen számjegyet lát). A `train()` függvényben implementálja a tanítást.

**3. feladat.** Hozzon létre egy `Server` osztályt, ami elvégzi a federált tanítást! Ehhez implementálja az `avg_models` függvényt, valamint a bevezetésben bemutatott algoritmust!

In [7]:
class Server():
  def __init__(self):
    self.model = Model().cuda()
    self.loader = torch.utils.data.DataLoader(dataset=test_data)

  # Kliens modellek paramétereinek átlagolása, saját modell frissítése
  def avg_models(self,clients):
    state_dict = {}

    parameters = [client.model.state_dict() for client in clients]
    with torch.no_grad():
        for parameter_name in parameters[0].keys():
          state_dict[parameter_name] = torch.mean(
              torch.stack(
                  [
                      model_parameters[parameter_name].float()
                      for model_parameters in parameters
                  ]
              ),
              dim=0,
          )
        self.model.load_state_dict(state_dict)

  def distribute_model(self, clients):
    for client in clients:
      client.send(self.model)

  # Tanítás
  def train(self,clients,num_epoch=10):
    for i in tqdm(range(num_epoch)):
      for client in clients:
        client.train(1)
      self.avg_models(clients)
      self.distribute_model(clients)
      self.eval()

  # Kiértékelés
  def eval(self):
    self.model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = self.model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))

**4. feladat.** Értékelje ki a prediktív teljesítményt az IID és nem IID felállásban, vonjon le következtetéseket!

In [21]:
# original
clients = [TorchClient(i, iid=True) for i in range(num_clients)]
server = Server()
server.train(clients, 5)


  0%|          | 0/5 [00:00<?, ?it/s][A
 20%|██        | 1/5 [00:56<03:45, 56.45s/it][A

Test Accuracy of the model on the 10000 test images: 21.71 %



 40%|████      | 2/5 [01:52<02:49, 56.41s/it][A

Test Accuracy of the model on the 10000 test images: 89.42 %



 60%|██████    | 3/5 [02:49<01:52, 56.38s/it][A

Test Accuracy of the model on the 10000 test images: 92.58 %



 80%|████████  | 4/5 [03:45<00:56, 56.40s/it][A

Test Accuracy of the model on the 10000 test images: 93.69 %



100%|██████████| 5/5 [04:41<00:00, 56.37s/it][A

Test Accuracy of the model on the 10000 test images: 94.5 %





In [22]:
# original
clients = [TorchClient(i, iid=False) for i in range(num_clients)]
server = Server()
server.train(clients)


  0%|          | 0/10 [00:00<?, ?it/s][A
 10%|█         | 1/10 [00:58<08:47, 58.57s/it][A

Test Accuracy of the model on the 10000 test images: 11.48 %



 20%|██        | 2/10 [01:57<07:48, 58.53s/it][A

Test Accuracy of the model on the 10000 test images: 27.37 %



 30%|███       | 3/10 [02:55<06:49, 58.50s/it][A

Test Accuracy of the model on the 10000 test images: 41.83 %



 40%|████      | 4/10 [03:53<05:50, 58.48s/it][A

Test Accuracy of the model on the 10000 test images: 45.8 %



 50%|█████     | 5/10 [04:52<04:52, 58.50s/it][A

Test Accuracy of the model on the 10000 test images: 47.4 %



 60%|██████    | 6/10 [05:50<03:53, 58.50s/it][A

Test Accuracy of the model on the 10000 test images: 48.79 %



 70%|███████   | 7/10 [06:49<02:55, 58.46s/it][A

Test Accuracy of the model on the 10000 test images: 50.86 %



 80%|████████  | 8/10 [07:47<01:56, 58.45s/it][A

Test Accuracy of the model on the 10000 test images: 53.98 %



 90%|█████████ | 9/10 [08:46<00:58, 58.44s/it][A

Test Accuracy of the model on the 10000 test images: 57.09 %



100%|██████████| 10/10 [09:44<00:00, 58.47s/it][A

Test Accuracy of the model on the 10000 test images: 60.29 %





In [19]:
device = "cpu"

In [20]:
import time
from torch.multiprocessing import Pool, Process, set_start_method
try:
     set_start_method('spawn')
except RuntimeError:
    pass
pool = Pool(processes=4)

class MultiServer():
  def __init__(self):
    self.model = Model()#.cuda()
    self.loader = torch.utils.data.DataLoader(dataset=test_data)

  # Kliens modellek paramétereinek átlagolása, saját modell frissítése
  def avg_models(self,clients):
    state_dict = {}

    parameters = [client.model.state_dict() for client in clients]
    with torch.no_grad():
        for parameter_name in parameters[0].keys():
          state_dict[parameter_name] = torch.mean(
              torch.stack(
                  [
                      model_parameters[parameter_name].float()
                      for model_parameters in parameters
                  ]
              ),
              dim=0,
          )
        self.model.load_state_dict(state_dict)

  def distribute_model(self, clients):
    for client in clients:
      client.send(self.model)

  # Tanítás
  def train(self,clients,num_epoch=10):
#     f = lambda c: c.train(1)
    self.model.share_memory()
    for i in tqdm(range(num_epoch)):
      clients = pool.map(f, zip(clients, [self.model] * len(clients)))
      self.avg_models(clients)
#       self.distribute_model(clients)
      self.eval()

  # Kiértékelés
  def eval(self):
    self.model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = self.model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))

def f(c, m):
    c.send(m)
    return c.train(1)

In [21]:
class TorchClient():
  def __init__(self,id,iid=True):
    self.id = id
#     self.model = Model().cuda()

    num_samples_per_client = len(train_data) // num_clients
    if iid:
      all_idx = np.arange(len(train_data.targets))
      # np.random.shuffle(all_idx)
      pass
    else:
      all_idx = np.argsort(train_data.targets)
    idx = all_idx[self.id*num_samples_per_client:(self.id + 1)*num_samples_per_client]
      
    sampler = torch.utils.data.sampler.SubsetRandomSampler(idx)
    self.loader = torch.utils.data.DataLoader(dataset=train_data,batch_size=256,sampler=sampler)
    self.loss   = torch.nn.CrossEntropyLoss()

  def train(self,num_epoch):
    self.opt = torch.optim.SGD(self.model.parameters(),lr=0.001)
    total_step = len(self.loader)
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(self.loader):
            images = images.to(device)
            labels = labels.to(device)
            
            # Forward pass
            outputs = self.model(images)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            self.opt.zero_grad()
            loss.backward()
            self.opt.step()
            
            if (i+1) % 100 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                      .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
    return self

  # Kapott modell átmásolása
  def send(self,model):
    self.model = Model().cuda()
    self.model.load_state_dict(model.state_dict())
#     self.model.share_memory()

In [None]:
# original
clients = [TorchClient(i, iid=False) for i in range(num_clients)]
server = MultiServer()
server.train(clients)




  0%|          | 0/10 [00:00<?, ?it/s][A[A[A

Fat proxy, Bayesian. MIT/en van valami FL-es ucc.