In [None]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision scipy

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.7/66.7 MB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m58.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m48.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m31.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
from collections import OrderedDict
from typing import List

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.common import Context
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")

Training on cpu
Flower 1.15.2 / PyTorch 2.5.1+cu124


In [None]:
def load_datasets(partition_id: int, num_partitions: int):
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_partitions})
    partition = fds.load_partition(partition_id)
    # Divide data on each node: 80% train, 20% test
    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)
    pytorch_transforms = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        batch["img"] = [pytorch_transforms(img) for img in batch["img"]]
        return batch

    partition_train_test = partition_train_test.with_transform(apply_transforms)
    trainloader = DataLoader(partition_train_test["train"], batch_size=32, shuffle=True)
    valloader = DataLoader(partition_train_test["test"], batch_size=32)
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=32)
    return trainloader, valloader, testloader

In [None]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

In [None]:
class FlowerNumPyClient(NumPyClient):
    def __init__(self, partition_id, net, trainloader, valloader):
        self.partition_id = partition_id
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.partition_id}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.partition_id}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.partition_id}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

In [None]:
def numpyclient_fn(context: Context) -> Client:
    net = Net().to(DEVICE)
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]
    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerNumPyClient(partition_id, net, trainloader, valloader).to_client()


# Create the ClientApp
numpyclient = ClientApp(client_fn=numpyclient_fn)

In [None]:
def server_fn(context: Context) -> ServerAppComponents:
    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(config=config)


# Create ServerApp
server = ServerApp(server_fn=server_fn)

In [None]:
# Specify the resources each of your clients need
# If set to none, by default, each client will be allocated 2x CPU and 0x GPUs
backend_config = {"client_resources": None}
if DEVICE.type == "cuda":
    backend_config = {"client_resources": {"num_gpus": 1}}

NUM_PARTITIONS = 10

# Run simulation
run_simulation(
    server_app=server,
    client_app=numpyclient,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

DEBUG:flwr:Asyncio event loop already running.
[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=3, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[36m(pid=5306)[0m 2025-02-22 05:46:47.553593: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(pid=5306)[0m E0000 00:00:1740203207.578931    5306 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(pid=5306)[0m E0000 00:00:1740203207.586615    5306 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(ClientAppActor pid=5306)[0m see the appropriate new directories, set the environment variable
[36m(ClientAppAct

[36m(ClientAppActor pid=5306)[0m [Client 9] get_parameters
[36m(ClientAppActor pid=5306)[0m [Client 0] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.0653839260339737, accuracy 0.22575
[36m(ClientAppActor pid=5306)[0m [Client 1] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.06534437090158463, accuracy 0.221
[36m(ClientAppActor pid=5306)[0m [Client 3] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.06539936363697052, accuracy 0.217
[36m(ClientAppActor pid=5306)[0m [Client 5] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.06659355759620667, accuracy 0.189
[36m(ClientAppActor pid=5306)[0m [Client 7] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.06565361469984055, accuracy 0.22225
[36m(ClientAppActor pid=5306)[0m [Client 9] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.06535127758979797, accuracy 0.228
[36m(ClientAppAct

[92mINFO [0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.0658802017569542, accuracy 0.216
[36m(ClientAppActor pid=5306)[0m [Client 1] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 2] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 4] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 5] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 6] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 8] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 3] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 9] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 7] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 0] evaluate, config: {}


[92mINFO [0m:      aggregate_evaluate: received 10 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=5306)[0m [Client 1] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05916011705994606, accuracy 0.298
[36m(ClientAppActor pid=5306)[0m [Client 2] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.059411369264125824, accuracy 0.3005
[36m(ClientAppActor pid=5306)[0m [Client 3] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.058530550450086594, accuracy 0.31275
[36m(ClientAppActor pid=5306)[0m [Client 4] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05922050029039383, accuracy 0.3115
[36m(ClientAppActor pid=5306)[0m [Client 7] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05846981331706047, accuracy 0.30625
[36m(ClientAppActor pid=5306)[0m [Client 8] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.0589534267783165, accuracy 0.30825
[36m(ClientAppActor pid=5306)[0m [Client 6] fit, config: {}
[36m(Clien

[92mINFO [0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.059198152273893356, accuracy 0.28225
[36m(ClientAppActor pid=5306)[0m [Client 0] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 2] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 4] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 5] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 7] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 8] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 9] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 6] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 1] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 3] evaluate, config: {}


[92mINFO [0m:      aggregate_evaluate: received 10 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 3]
[92mINFO [0m:      configure_fit: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=5306)[0m [Client 0] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.0550396591424942, accuracy 0.355
[36m(ClientAppActor pid=5306)[0m [Client 1] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.054933782666921616, accuracy 0.349
[36m(ClientAppActor pid=5306)[0m [Client 2] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.0546678751707077, accuracy 0.36675
[36m(ClientAppActor pid=5306)[0m [Client 6] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05343230068683624, accuracy 0.3635
[36m(ClientAppActor pid=5306)[0m [Client 7] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05440385639667511, accuracy 0.3515
[36m(ClientAppActor pid=5306)[0m [Client 9] fit, config: {}
[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05437876284122467, accuracy 0.3705
[36m(ClientAppActor pid=5306)[0m [Client 5] fit, config: {}
[36m(ClientAppA

[92mINFO [0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=5306)[0m Epoch 1: train loss 0.05374069884419441, accuracy 0.374
[36m(ClientAppActor pid=5306)[0m [Client 0] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 2] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 3] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 4] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 5] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 7] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 8] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 6] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 9] evaluate, config: {}
[36m(ClientAppActor pid=5306)[0m [Client 1] evaluate, config: {}


[92mINFO [0m:      aggregate_evaluate: received 10 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [SUMMARY]
[92mINFO [0m:      Run finished 3 round(s) in 384.09s
[92mINFO [0m:      	History (loss, distributed):
[92mINFO [0m:      		round 1: 0.063454574406147
[92mINFO [0m:      		round 2: 0.05613302038908006
[92mINFO [0m:      		round 3: 0.052657102560997
[92mINFO [0m:      


In [None]:
def evaluate_global_model(net, testloader):
    """Evaluate the global model on the test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient calculation
        for batch in testloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy


In [None]:
def evaluate_after_training(global_model: nn.Module):
    """Evaluate the model after federated training."""
    # Load the global test dataset
    testset = FederatedDataset(dataset="cifar10", partitioners={"train": 1})
    testset = testset.load_split("test")
    testloader = DataLoader(testset, batch_size=32)

    # Evaluate the final model
    loss, accuracy = evaluate_global_model(global_model, testloader)
    print(f"Final Model Test Loss: {loss}")
    print(f"Final Model Test Accuracy: {accuracy * 100}%")


In [None]:
def apply_transforms(batch):
    # Apply transformations to the images in the batch
    batch["img"] = [transforms.ToTensor()(img) for img in batch["img"]]  # Ensure conversion to Tensor
    return batch


In [None]:
def load_datasets(partition_id: int, num_partitions: int):
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_partitions})
    partition = fds.load_partition(partition_id)
    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)

    # Ensure the transformations are applied to the images
    partition_train_test = partition_train_test.with_transform(apply_transforms)

    # Rest of the code...
