# Tutorial 1A

The first lab tutorial presents the findings and uses part of the experimental methodology from the [original Federated Learning](https://arxiv.org/pdf/1602.05629.pdf) paper. In horizontal federated learning, all clients have access to the same complete model architecture, which they train on local data, sharing information about model updates but not their data.

Before starting, make sure to follow the overall setup for the labs.

<a href="https://blogs.nvidia.com/blog/what-is-federated-learning/" target="_blank">
    <img src="https://blogs.nvidia.com/wp-content/uploads/2019/10/federated_learning_animation_still_white.png" alt="FL Visualization" style="width:50%;">
</a>

---

Before anything else, we download, load, and preprocess the [MNIST dataset](https://archive.ics.uci.edu/dataset/683/mnist+database+of+handwritten+digits), which we will use for all experiments.

In [1]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

data_path = "./data"
ETA = "\N{GREEK SMALL LETTER ETA}"

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

torch.backends.cudnn.deterministic = True

transform = transforms.Compose([
    transforms.ToTensor(),
    # normalize by training set mean and standard deviation
    # resulting data has mean=0 and std=1
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST(data_path, train=True, download=True, transform=transform)
test_loader = DataLoader(
    datasets.MNIST(data_path, train=False, download=False, transform=transform),
    # decrease batch size if running into memory issues when testing
    # a bespoke generator is passed to avoid reproducibility issues
    shuffle=False, drop_last=False, batch_size=10000, generator=torch.Generator())

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:05<00:00, 1.94MB/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 313kB/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:01<00:00, 1.10MB/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 3.65MB/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw






We can then define a small convolutional neural network that will serve as our model.

In [2]:
import torch.nn as nn
import torch.nn.functional as F


class MnistCnn(nn.Module):
    def __init__(self):
        super(MnistCnn, self).__init__()

        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)

        return output

With that, we can define a helper method, which, given a model, a loader for iterating through a set of data, and an optimizer for updating the model trains one epoch (i.e., learns going through all the available data once).

In [3]:
from torch.optim import Optimizer


def train_epoch(model: torch.nn.Module, loader: DataLoader, optimizer: Optimizer) -> None:
    # TODO
    model.train()   # set model to training
    running_loss = 0.0 # initialize loss
    # load data in batches
    for data, target in loader:
        optimizer.zero_grad() # zero out gradient from before
        output = model(data)    # get models predictions
        loss = criterion(output, target)    # get difference bewteen ground truth and models predictions
        loss.backwards()    # backwards propagation GD
        optimizer.step()    # update models weights
        running_loss += loss.item() # track loss for each batch
    
    avg_loss = running_loss / len(loader)
    print(f"Epoch Loss: {avg_loss:.4f}")
    return

We also define another utility method that splits the dataset into several chunks.

We assign samples within chunks in an IID (independent and identically distributed) fashion or allow only two labels to exist in each.

In [22]:
from typing import cast

import numpy as np
import numpy.random as npr
from torch.utils.data import Subset


def split(nr_clients: int, iid: bool, seed: int) -> list[Subset]:
    # TODO
    # set random seed
    npr.seed(seed)

    # get indices
    dataset_length = len(train_dataset)
    indices = list(range(dataset_length))
    subset_length = dataset_length // nr_clients

    # make sure samples are iid
    npr.shuffle(indices)

    client_subsets = []
    for i in range(0, nr_clients):
        # define start and end point from random indices list
        start = i*subset_length
        if i != nr_clients:
            end = i*subset_length + subset_length
        else: 
            end = dataset_length
        client_indices = indices[start: end] # get random indices
        # create subset from random indices in train data
        client_subset = Subset(train_dataset, client_indices)
        client_subsets.append(client_subset) # add new subset

    return client_subsets

In [23]:
sample_split = split(100, True, 42)
len(sample_split)

100

We define a short class for holding the results of training runs and the parameters used.

In [24]:
from dataclasses import asdict, dataclass, field

from pandas import DataFrame


@dataclass
class RunResult:
    algorithm: str
    n: int      # number of clients
    c: float    # client_fraction
    b: int      # take -1 as inf
    e: int      # nr_local_epochs
    lr: float   # printed as lowercase eta
    seed: int
    wall_time: list[float] = field(default_factory=list)
    message_count: list[int] = field(default_factory=list)
    test_accuracy: list[float] = field(default_factory=list)

    def as_df(self, skip_wtime=True) -> DataFrame:
        self_dict = {
            k.capitalize().replace("_", " "): v
            for k, v in asdict(self).items()}

        if self_dict["B"] == -1:
            self_dict["B"] = "\N{INFINITY}"

        df = DataFrame({"Round": range(1, len(self.wall_time) + 1), **self_dict})
        df = df.rename(columns={"Lr": ETA})
        if skip_wtime:
            df = df.drop(columns=["Wall time"])
        return df

We create an abstract class as a template for all distributed learning clients, defining a method for outputting an update after training a given model on local data.

In [25]:
from abc import ABC, abstractmethod


class Client(ABC):
    def __init__(self, client_data: Subset, batch_size: int) -> None:
        self.model = MnistCnn().to(device)
        self.generator = torch.Generator()
        self.loader_train = DataLoader(
            client_data, batch_size=batch_size, shuffle=True,
            drop_last=False, generator=self.generator)

    @abstractmethod
    def update(self, weights: list[torch.Tensor], seed: int) -> list[torch.Tensor]:
        ...

On the flip side, a server needs to be able to run the (distributed) training process for a given number of rounds and test the current model it possesses.

In [26]:
class Server(ABC):
    def __init__(self, lr: float, batch_size: int, seed: int) -> None:
        self.clients: list[Client]
        self.lr = lr
        self.batch_size = batch_size
        self.seed = seed
        torch.manual_seed(seed)
        self.model = MnistCnn().to(device)

    @abstractmethod
    def run(self, nr_rounds: int) -> RunResult:
        ...

    def test(self) -> float:
        # TODO
        self.model.eval()
        evaluation = model(test_loader)
        
        corr_pred = 0
        all_samples = 0
        loss = 0.0
        test_loss = 0.0
        
        with torch.no_grad():
            for data, target in loader:
                optimizer.zero_grad() # zero out gradient from before
                output = self.model(data)    # get models predictions
                loss = F.cross_entropy(output, target)    # get difference bewteen ground truth and models predictions
                test_loss += loss.item()
                _, prediction = torch.max(output, 1)
                corr_pred += (prediciton == target)
                all_samples += target.size(0)
        avg_loss = test_loss / len(self.test_loader)
        accuracy = corr_pred / all_samples
        print(f"Epoch Loss: {avg_loss:.4f}")
        print(f"Epoch Loss: {accuracy:.4f}")
        return avg_loss

Over the previously defined server template, we can even formulate a centralized variant, which does not involve clients, as a precursor to distributed versions.

In [None]:
from time import perf_counter

from torch.optim import SGD
from tqdm import tqdm


class CentralizedServer(Server):
    def __init__(self, lr: float, batch_size: int, seed: int) -> None:
        super().__init__(lr, batch_size, seed)
        self.optimizer = SGD(params=self.model.parameters(), lr=lr)
        self.generator = torch.Generator()
        self.loader_train = DataLoader(
            train_dataset, batch_size=batch_size, shuffle=True,
            drop_last=False, generator=self.generator)
        self.clients = []

    def run(self, nr_rounds: int) -> RunResult:
        # TODO
        start = perf_counter() 

        RunResult(100, 0, 0, e=nr_rounds, lr=lr, seed=seed, 0, 0., 0)
        stop = perf_counter()
        print("Elapsed time in seconds:", stop - start)
        return 

In [None]:
centralized_server = CentralizedServer(0.5, 1024, 42)
result_centralized = centralized_server.run(5)
centralized_df = result_centralized.as_df()
centralized_df

We can extend the template with some setup steps common to all decentralized algorithms.

In [None]:
class DecentralizedServer(Server):
    def __init__(
            self, lr: float, batch_size: int, client_subsets: list[Subset],
            client_fraction: float, seed: int) -> None:
        # TODO
        return

The two federated learning algorithms from the paper follow, alongside an overview of metric plotting.

---

For the FedSGD algorithm, the baseline from the paper, we first need to define the client, and we choose to pass gradients from the client as the update result.

In [None]:
class GradientClient(Client):
    def __init__(self, client_data: Subset) -> None:
        super().__init__(client_data, len(client_data))

    def update(self, weights: list[torch.Tensor], seed: int) -> list[torch.Tensor]:
        # TODO
        return []

We then define the corresponding server.

In [None]:
class FedSgdGradientServer(DecentralizedServer):
    def __init__(
            self, lr: float,
            client_subsets: list[Subset], client_fraction: float, seed: int) -> None:
        super().__init__(lr, -1, client_subsets, client_fraction, seed)
        self.optimizer = SGD(params=self.model.parameters(), lr=lr)
        self.clients = [GradientClient(subset) for subset in client_subsets]

    def run(self, nr_rounds: int) -> RunResult:
        # TODO
        return RunResult("", 0, 0., 0, 0, 0., 0)

In [None]:
fedsgd_gradient_server = FedSgdGradientServer(0.02, sample_split, 0.2, 42)
result_fedsgd_gradient = fedsgd_gradient_server.run(5)
fedsgd_gradient_df = result_fedsgd_gradient.as_df()
fedsgd_gradient_df

The FedAvg algorithm is the paper's main contribution, requiring a client that passes around weights instead of gradients.

In [None]:
class WeightClient(Client):
    def __init__(self, client_data: Subset, lr: float, batch_size: int, nr_epochs: int) -> None:
        super().__init__(client_data, batch_size)
        self.optimizer = SGD(params=self.model.parameters(), lr=lr)
        self.nr_epochs = nr_epochs

    def update(self, weights: list[torch.Tensor], seed: int) -> list[torch.Tensor]:
        # TODO
        return []

Following that, we define the actual server code for the method.

In [None]:
class FedAvgServer(DecentralizedServer):
    def __init__(
            self, lr: float, batch_size: int, client_subsets: list[Subset],
            client_fraction: float, nr_local_epochs: int, seed: int) -> None:
        super().__init__(lr, batch_size, client_subsets, client_fraction, seed)
        self.name = "FedAvg"
        self.nr_local_epochs = nr_local_epochs
        self.clients = [
            WeightClient(subset, lr, batch_size, nr_local_epochs)
            for subset in client_subsets]

    def run(self, nr_rounds: int) -> RunResult:
        # TODO
        return RunResult("", 0, 0., 0, 0, 0., 0)

In [None]:
fedavg_server = FedAvgServer(0.02, 200, sample_split, 0.2, 2, 42)
result_fedavg = fedavg_server.run(5)
fedavg_df = result_fedavg.as_df()
fedavg_df

Finally, we look at a quick example of plotting the accuracy per round of the two algorithms.

In [None]:
import pandas as pd
import seaborn as sns

df = pd.concat([fedavg_df, fedsgd_gradient_df], ignore_index=True)
ax = sns.lineplot(df, x="Round", y="Test accuracy", hue="Algorithm", seed=0)
_ = ax.set_xticks(df["Round"].unique())