In [25]:
from collections import OrderedDict
from typing import List, Tuple, Dict, Optional, Callable, Union

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import DataLoader

from gradient_descent_the_ultimate_optimizer import gdtuo

import flwr as fl
from flwr.common import Metrics
from flwr_datasets import FederatedDataset
from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    MetricsAggregationFn,
    NDArrays,
    NDArray,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy.aggregate import aggregate, weighted_loss_avg


DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)

client_resources = None
if DEVICE.type == "cuda":
    client_resources = {"num_gpus": 1}
    
    
disable_progress_bar()

Training on cpu using PyTorch 2.0.1+cpu and Flower 1.7.0


In [26]:
NUM_CLIENTS = 2
BATCH_SIZE = 32

def load_datasets():
    fds = FederatedDataset(dataset="cifar10", partitioners={"train" : NUM_CLIENTS})

    def apply_transforms(batch):
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5),(0.5, 0.5, 0.5))
            ]
        )
        batch["img"] = [transform(img) for img in batch["img"]]
        return batch

    trainloaders = []
    valloaders = []
    for partition_id in range(NUM_CLIENTS):
        partition = fds.load_partition(partition_id, "train")
        partition = partition.with_transform(apply_transforms)
        partition = partition.train_test_split(train_size=0.8)
        trainloaders.append(DataLoader(partition["train"], batch_size=BATCH_SIZE))
        valloaders.append(DataLoader(partition["test"], batch_size=BATCH_SIZE))
    testset = fds.load_full("test").with_transform(apply_transforms)
    testloaders = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloaders, valloaders, testloaders

trainloaders, valloaders, testloaders = load_datasets()     

KeyboardInterrupt: 

In [None]:
batch = next(iter(trainloaders[0]))
images, labels = batch["img"], batch["label"]

images = images.permute(0, 2, 3, 1).numpy()
images = images / 2 + 0.5

fig, axs = plt.subplots(4, 8, figsize=(12, 6))

for i, ax in enumerate(axs.flat):
    ax.imshow(images[i])
    ax.set_title(trainloaders[0].dataset.features["label"].int2str([labels[i]])[0])
    ax.axis("off")

fig.tight_layout()
plt.show()

NameError: name 'trainloaders' is not defined

In [None]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

In [None]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.cid}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.cid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(cid) -> FlowerClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerClient(cid, net, trainloader, valloader)

In [None]:
from typing import Callable, Union

from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    MetricsAggregationFn,
    NDArrays,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy.aggregate import aggregate, weighted_loss_avg


class FedCustom(fl.server.strategy.Strategy):
    def __init__(
        self,
        fraction_fit: float = 1.0,
        fraction_evaluate: float = 1.0,
        min_fit_clients: int = 2,
        min_evaluate_clients: int = 2,
        min_available_clients: int = 2,
    ) -> None:
        super().__init__()
        self.fraction_fit = fraction_fit
        self.fraction_evaluate = fraction_evaluate
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients

    def __repr__(self) -> str:
        return "FedCustom"

    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        """Initialize global model parameters."""
        net = Net()
        ndarrays = get_parameters(net)
        return fl.common.ndarrays_to_parameters(ndarrays)

    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        """Configure the next round of training."""

        # Sample clients
        sample_size, min_num_clients = self.num_fit_clients(
            client_manager.num_available()
        )
        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )

        # Create custom configs
        n_clients = len(clients)
        half_clients = n_clients // 2
        standard_config = {"lr": 0.001}
        higher_lr_config = {"lr": 0.003}
        fit_configurations = []
        for idx, client in enumerate(clients):
            if idx < half_clients:
                fit_configurations.append((client, FitIns(parameters, standard_config)))
            else:
                fit_configurations.append(
                    (client, FitIns(parameters, higher_lr_config))
                )
        return fit_configurations

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate fit results using weighted average."""

        weights_results = [
            (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
            for _, fit_res in results
        ]
        parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results))
        metrics_aggregated = {}
        return parameters_aggregated, metrics_aggregated

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        """Configure the next round of evaluation."""
        if self.fraction_evaluate == 0.0:
            return []
        config = {}
        evaluate_ins = EvaluateIns(parameters, config)

        # Sample clients
        sample_size, min_num_clients = self.num_evaluation_clients(
            client_manager.num_available()
        )
        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )

        # Return client/config pairs
        return [(client, evaluate_ins) for client in clients]

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation losses using weighted average."""

        if not results:
            return None, {}

        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )
        metrics_aggregated = {}
        return loss_aggregated, metrics_aggregated

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        """Evaluate global model parameters using an evaluation function."""

        # Let's assume we won't perform the global model evaluation on the server side.
        return None

    def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Return sample size and required number of clients."""
        num_clients = int(num_available_clients * self.fraction_fit)
        return max(num_clients, self.min_fit_clients), self.min_available_clients

    def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Use a fraction of available clients for evaluation."""
        num_clients = int(num_available_clients * self.fraction_evaluate)
        return max(num_clients, self.min_evaluate_clients), self.min_available_clients

In [None]:
from io import BytesIO
from typing import cast

import numpy as np


def ndarrays_to_parameters(ndarrays: NDArrays) -> Parameters:
    """Convert NumPy ndarrays to parameters object."""
    tensors = [ndarray_to_bytes(ndarray) for ndarray in ndarrays]
    return Parameters(tensors=tensors, tensor_type="numpy.ndarray")



def parameters_to_ndarrays(parameters: Parameters) -> NDArrays:
    """Convert parameters object to NumPy ndarrays."""
    return [bytes_to_ndarray(tensor) for tensor in parameters.tensors]



def ndarray_to_bytes(ndarray: NDArray) -> bytes:
    """Serialize NumPy ndarray to bytes."""
    bytes_io = BytesIO()
    # WARNING: NEVER set allow_pickle to true.
    # Reason: loading pickled data can execute arbitrary code
    # Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html
    np.save(bytes_io, ndarray, allow_pickle=False)
    return bytes_io.getvalue()

def bytes_to_ndarray(tensor: bytes) -> NDArray:
    """Deserialize NumPy ndarray from bytes."""
    bytes_io = BytesIO(tensor)
    # WARNING: NEVER set allow_pickle to true.
    # Reason: loading pickled data can execute arbitrary code
    # Source: https://numpy.org/doc/stable/reference/generated/numpy.load.html
    ndarray_deserialized = np.load(bytes_io, allow_pickle=False)
    return cast(NDArray, ndarray_deserialized)


In [None]:
from flwr.common import (
    Code,
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    GetParametersIns,
    GetParametersRes,
    Status,
)

def WrappedModel(net):
    optim = gdtuo.Adam(optimizer=gdtuo.SGD(1e-2))
    net = gdtuo.ModuleWrapper(net, optimizer=optim)
    return net, optim

def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

status = Status(code=Code.OK, message="Success")

net, optimizer = WrappedModel(Net())
ndarrays_model = get_parameters(net)
ndarrays_optimizer = get_parameters(optimizer)
# print(ndarrays_model)
# print(ndarrays_optimizer)
ah = ndarrays_model+ndarrays_optimizer
print(ah[-3:])
oof = ndarrays_to_parameters(ah)
hm = FitRes(
            status=status,
            parameters=oof,
            num_examples=10,
            metrics={},
        )
print("START")
#[print(ahh) for ahh in ah]
#print(ah[-1][0])

net, optimizer1 = WrappedModel(Net())
ndarrays_model = get_parameters(net)
ndarrays_optimizer = get_parameters(optimizer1)
#ah2 = (ndarrays_model+ndarrays_optimizer)
ah2 = ndarrays_model + [np.array(0.008), np.array(2.0986122), np.array(6.453384)]
print(ah2[-3:])
oof2 = ndarrays_to_parameters(ah2)
hm2 = FitRes(
            status=status,
            parameters=oof2,
            num_examples=10,
            metrics={},
        )


results = [hm] + [hm2]
weights_results = [
    (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
    for fit_res in results
]
#print(weights_results)
parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results))
print("END")
out = parameters_to_ndarrays(parameters_aggregated)

def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    #print(state_dict)
    net.load_state_dict(state_dict)
    

eh = optimizer.state_dict()
ehh = net.state_dict()
#print(eh.items())
# for key, value in eh.items():
#     print(key)
#     print(value)
# uh = OrderedDict([('alpha', torch.tensor(0.0010)), ('beta1', torch.tensor(1.0986)), ('beta2', torch.tensor(3.4534))])
# ooom = {'alpha': torch.tensor(0.0010), 'beta1': torch.tensor(1.0986), 'beta2': torch.tensor(3.4534)}

# for key, value in uh.items():
#             # Check if the key corresponds to a parameter or buffer of the model
#             if key in ooom:
#                   print("dababy")
print(out[-3:])
set_parameters(optimizer, out[-3:])
print(optimizer.state_dict())

print("first")
print(ah[0])
print("second")
print(ah2[0])
print("sum")
print(out[0])
set_parameters(net, out[:-3])
print("out")
print(net.state_dict())
# [ 0.08302877 -0.09394756 -0.01417978 -0.05718208 -0.02231832  0.0826757
#  -0.05445452 -0.02976146  0.08450524 -0.04731008]
# [array(0.001, dtype=float32), array(1.0986122, dtype=float32), array(3.453384, dtype=float32)]

# array([ 0.00025471, -0.05088459, -0.01390265, -0.01027279, -0.03953674,
#         0.04186539, -0.00426449, -0.02046354,  0.05037209, -0.05080017],
#       dtype=float32), array([1.0000000e-03, 1.0986122e+00, 3.4533839e+00], dtype=float32)]

[array(0.001, dtype=float32), array(1.0986122, dtype=float32), array(3.453384, dtype=float32)]
START
[array(0.008), array(2.0986122), array(6.453384)]
END
[array(0.0045), array(1.59861219), array(4.95338396)]
{'alpha': tensor(0.0045), 'beta1': tensor(1.5986), 'beta2': tensor(4.9534)}
first
[[[[ 0.10571678 -0.10966041  0.01240583 -0.07839645  0.04607232]
   [-0.02503977 -0.11544131  0.00623979 -0.04953222 -0.05930189]
   [ 0.06205     0.02624109 -0.04721972 -0.01883036  0.11255088]
   [ 0.10949159  0.05369635  0.01639724  0.07491052  0.01990528]
   [ 0.06228021 -0.04673331  0.04013827  0.11365476  0.10317692]]

  [[-0.10961216  0.01832429 -0.03634967 -0.01290751 -0.05621865]
   [-0.10204232  0.0431049  -0.02258573 -0.10713286 -0.06825158]
   [ 0.0556554  -0.04029971 -0.10487504 -0.04911793  0.0232622 ]
   [-0.04136389 -0.04269595 -0.01160658  0.09191768 -0.03027645]
   [-0.08066314 -0.0451332   0.04778045 -0.05444124 -0.05107547]]

  [[-0.04789704  0.09273724 -0.10638253  0.09062104  0.

In [None]:
from flwr.common import (
    Code,
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    GetParametersIns,
    GetParametersRes,
    Status,
)

def WrappedModel(net):
    optim = gdtuo.Adam(optimizer=gdtuo.SGD(1e-2))
    net = gdtuo.ModuleWrapper(net, optimizer=optim)
    return net, optim

def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

status = Status(code=Code.OK, message="Success")

net = Net()
ndarrays_model = get_parameters(net)
ndarrays_optimizer = get_parameters(optimizer)
# print(ndarrays_model)
# print(ndarrays_optimizer)
ah = (ndarrays_model+ndarrays_optimizer)
#print(ah)
oof = ndarrays_to_parameters(ah)
hm = FitRes(
            status=status,
            parameters=oof,
            num_examples=10,
            metrics={},
        )
print("START")
#[print(ahh) for ahh in ah]
#print(ah[-1][0])

net= Net()
ndarrays_model = get_parameters(net)
ndarrays_optimizer = get_parameters(optimizer1)
ah2 = (ndarrays_model+[ndarrays_optimizer])
oof2 = ndarrays_to_parameters(ah2)
hm2 = FitRes(
            status=status,
            parameters=oof2,
            num_examples=3,
            metrics={},
        )


results = [hm] + [hm2]
weights_results = [
    (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
    for fit_res in results
]
parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results))
print("END")
out = parameters_to_ndarrays(parameters_aggregated)

def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict = True)

print(out[-1])
set_parameters(optimizer1, out[-1:-3])
print(optimizer1.state_dict())

START


ValueError: operands could not be broadcast together with shapes (6,3,5,5) (3,) 

In [None]:
# def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
#     # Multiply accuracy of each client by number of examples used
#     accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
#     examples = [num_examples for num_examples, _ in metrics]

#     # Aggregate and return custom metric (weighted average)
#     return {"accuracy": sum(accuracies) / sum(examples)}

In [None]:
# # Server Side Giving To Client A Config File
# def fit_config(server_round: int):
#     """Return training configuration dict for each round.

#     Perform two rounds of training with one local epoch, increase to two local
#     epochs afterwards.
#     """
#     config = {
#         "server_round": server_round,  # The current round of federated learning
#         "local_epochs": 1 if server_round < 2 else 2,  #
#     }
#     return config

In [None]:
# # Server Side Evaluation
# def evaluate(
#     server_round: int,
#     parameters: fl.common.NDArrays,
#     config: Dict[str, fl.common.Scalar],
# ) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
#     net = Net().to(DEVICE)
#     valloader = valloaders[0]
#     set_parameters(net, parameters)  # Update model with the latest parameters
#     loss, accuracy = test(net, valloader)
#     print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
#     return loss, {"accuracy": accuracy}

In [None]:
# strategy = fl.server.strategy.FedAvg(
#     fraction_fit=1,
#     fraction_evaluate=1,
#     min_fit_clients=2,
#     min_evaluate_clients=2,
#     min_available_clients=NUM_CLIENTS,
# )

# fl.simulation.start_simulation(
#     client_fn=client_fn,
#     num_clients=NUM_CLIENTS,
#     config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
#     strategy=strategy,
#     client_resources=client_resources,
# )

INFO flwr 2024-03-13 23:02:51,305 | app.py:178 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2024-03-13 23:02:58,065	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2024-03-13 23:03:01,234 | app.py:213 | Flower VCE: Ray initialized with resources: {'CPU': 12.0, 'memory': 3943334708.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 1971667353.0, 'node:__internal_head__': 1.0, 'GPU': 1.0}
INFO flwr 2024-03-13 23:03:01,235 | app.py:219 | Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO flwr 2024-03-13 23:03:01,235 | app.py:227 | No `client_resources` specified. Using minimal resources for clients.
INFO flwr 2024-03-13 23:03:01,236 | app.py:242 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
INFO flwr 2024-03-13 23:03:01,251 | app.py:288 | Flower VCE: Creating VirtualClientEngineActorPool with 12 actors
INFO flwr 2024-03-13 23:03:01,252 | server

Server-side evaluation loss 0.07365223121643066 / accuracy 0.101


[2m[36m(pid=18156)[0m 
[2m[36m(DefaultActor pid=14656)[0m   client = check_clientfn_returns_client(client_fn(cid))


[2m[36m(DefaultActor pid=14656)[0m [Client 9, round 1] fit, config: {'server_round': 1, 'local_epochs': 1}


DEBUG flwr 2024-03-13 23:03:31,117 | server.py:236 | fit_round 1 received 3 results and 0 failures
INFO flwr 2024-03-13 23:03:31,423 | server.py:125 | fit progress: (1, 0.06371984660625457, {'accuracy': 0.31}, 27.908565100005944)
DEBUG flwr 2024-03-13 23:03:31,424 | server.py:173 | evaluate_round 1: strategy sampled 3 clients (out of 10)


Server-side evaluation loss 0.06371984660625457 / accuracy 0.31
[2m[36m(DefaultActor pid=22996)[0m [Client 2] evaluate, config: {}


DEBUG flwr 2024-03-13 23:03:32,159 | server.py:187 | evaluate_round 1 received 3 results and 0 failures
DEBUG flwr 2024-03-13 23:03:32,160 | server.py:222 | fit_round 2: strategy sampled 3 clients (out of 10)
DEBUG flwr 2024-03-13 23:03:36,715 | server.py:236 | fit_round 2 received 3 results and 0 failures
INFO flwr 2024-03-13 23:03:37,012 | server.py:125 | fit progress: (2, 0.052110953092575074, {'accuracy': 0.39}, 33.49688370000513)
DEBUG flwr 2024-03-13 23:03:37,012 | server.py:173 | evaluate_round 2: strategy sampled 3 clients (out of 10)


Server-side evaluation loss 0.052110953092575074 / accuracy 0.39
[2m[36m(DefaultActor pid=22996)[0m [Client 7, round 2] fit, config: {'server_round': 2, 'local_epochs': 2}[32m [repeated 5x across cluster][0m
[2m[36m(DefaultActor pid=14656)[0m [Client 9] evaluate, config: {}[32m [repeated 3x across cluster][0m


DEBUG flwr 2024-03-13 23:03:38,056 | server.py:187 | evaluate_round 2 received 3 results and 0 failures
DEBUG flwr 2024-03-13 23:03:38,057 | server.py:222 | fit_round 3: strategy sampled 3 clients (out of 10)
DEBUG flwr 2024-03-13 23:03:42,658 | server.py:236 | fit_round 3 received 3 results and 0 failures
INFO flwr 2024-03-13 23:03:42,967 | server.py:125 | fit progress: (3, 0.048094457149505616, {'accuracy': 0.461}, 39.45236170000135)
DEBUG flwr 2024-03-13 23:03:42,968 | server.py:173 | evaluate_round 3: strategy sampled 3 clients (out of 10)


Server-side evaluation loss 0.048094457149505616 / accuracy 0.461
[2m[36m(DefaultActor pid=25524)[0m [Client 3, round 3] fit, config: {'server_round': 3, 'local_epochs': 2}[32m [repeated 3x across cluster][0m
[2m[36m(DefaultActor pid=14656)[0m [Client 6] evaluate, config: {}[32m [repeated 3x across cluster][0m


DEBUG flwr 2024-03-13 23:03:43,676 | server.py:187 | evaluate_round 3 received 3 results and 0 failures
INFO flwr 2024-03-13 23:03:43,677 | server.py:153 | FL finished in 40.16247860000294
INFO flwr 2024-03-13 23:03:43,677 | app.py:226 | app_fit: losses_distributed [(1, 0.06373047351837158), (2, 0.052273877064387), (3, 0.04709111166000366)]
INFO flwr 2024-03-13 23:03:43,678 | app.py:227 | app_fit: metrics_distributed_fit {}
INFO flwr 2024-03-13 23:03:43,678 | app.py:228 | app_fit: metrics_distributed {}
INFO flwr 2024-03-13 23:03:43,679 | app.py:229 | app_fit: losses_centralized [(0, 0.07365223121643066), (1, 0.06371984660625457), (2, 0.052110953092575074), (3, 0.048094457149505616)]
INFO flwr 2024-03-13 23:03:43,679 | app.py:230 | app_fit: metrics_centralized {'accuracy': [(0, 0.101), (1, 0.31), (2, 0.39), (3, 0.461)]}


History (loss, distributed):
	round 1: 0.06373047351837158
	round 2: 0.052273877064387
	round 3: 0.04709111166000366
History (loss, centralized):
	round 0: 0.07365223121643066
	round 1: 0.06371984660625457
	round 2: 0.052110953092575074
	round 3: 0.048094457149505616
History (metrics, centralized):
{'accuracy': [(0, 0.101), (1, 0.31), (2, 0.39), (3, 0.461)]}

## MAKE OWN STRATEGY

In [None]:
from typing import Callable, Union

from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    MetricsAggregationFn,
    NDArrays,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy.aggregate import aggregate, weighted_loss_avg


class FedCustom(fl.server.strategy.Strategy):
    def __init__(
        self,
        fraction_fit: float = 1.0,
        fraction_evaluate: float = 1.0,
        min_fit_clients: int = 2,
        min_evaluate_clients: int = 2,
        min_available_clients: int = 2,
    ) -> None:
        super().__init__()
        self.fraction_fit = fraction_fit
        self.fraction_evaluate = fraction_evaluate
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients

    def __repr__(self) -> str:
        return "FedCustom"

    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        """Initialize global model parameters."""
        net = Net()
        ndarrays = get_parameters(net)
        return fl.common.ndarrays_to_parameters(ndarrays)

    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        """Configure the next round of training."""

        # Sample clients
        sample_size, min_num_clients = self.num_fit_clients(
            client_manager.num_available()
        )
        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )

        # Create custom configs
        n_clients = len(clients)
        half_clients = n_clients // 2
        standard_config = {"lr": 0.001}
        higher_lr_config = {"lr": 0.003}
        fit_configurations = []
        for idx, client in enumerate(clients):
            if idx < half_clients:
                fit_configurations.append((client, FitIns(parameters, standard_config)))
            else:
                fit_configurations.append(
                    (client, FitIns(parameters, higher_lr_config))
                )
        return fit_configurations

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate fit results using weighted average."""

        weights_results = [
            (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
            for _, fit_res in results
        ]
        parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results))
        metrics_aggregated = {}
        return parameters_aggregated, metrics_aggregated

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        """Configure the next round of evaluation."""
        if self.fraction_evaluate == 0.0:
            return []
        config = {}
        evaluate_ins = EvaluateIns(parameters, config)

        # Sample clients
        sample_size, min_num_clients = self.num_evaluation_clients(
            client_manager.num_available()
        )
        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )

        # Return client/config pairs
        return [(client, evaluate_ins) for client in clients]

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation losses using weighted average."""

        if not results:
            return None, {}

        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )
        metrics_aggregated = {}
        return loss_aggregated, metrics_aggregated

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        """Evaluate global model parameters using an evaluation function."""

        # Let's assume we won't perform the global model evaluation on the server side.
        return None

    def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Return sample size and required number of clients."""
        num_clients = int(num_available_clients * self.fraction_fit)
        return max(num_clients, self.min_fit_clients), self.min_available_clients

    def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Use a fraction of available clients for evaluation."""
        num_clients = int(num_available_clients * self.fraction_evaluate)
        return max(num_clients, self.min_evaluate_clients), self.min_available_clients

In [None]:
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=2,
    config=fl.server.ServerConfig(num_rounds=3),
    strategy=FedCustom(),  # <-- pass the new strategy here
    client_resources=client_resources,
)

INFO flwr 2024-03-14 14:51:03,397 | app.py:178 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2024-03-14 14:51:09,686	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2024-03-14 14:51:12,289 | app.py:213 | Flower VCE: Ray initialized with resources: {'CPU': 12.0, 'memory': 3944086734.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 1972043366.0, 'GPU': 1.0, 'node:__internal_head__': 1.0}
INFO flwr 2024-03-14 14:51:12,290 | app.py:219 | Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO flwr 2024-03-14 14:51:12,290 | app.py:227 | No `client_resources` specified. Using minimal resources for clients.
INFO flwr 2024-03-14 14:51:12,291 | app.py:242 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
INFO flwr 2024-03-14 14:51:12,305 | app.py:288 | Flower VCE: Creating VirtualClientEngineActorPool with 12 actors
INFO flwr 2024-03-14 14:51:12,306 | server

[2m[36m(DefaultActor pid=26016)[0m [Client 1] evaluate, config: {}


ERROR flwr 2024-03-14 14:51:38,830 | ray_client_proxy.py:161 | Traceback (most recent call last):
  File "C:\Users\poops\Desktop\Coding Shenanigans\SOIEvent\Lib\site-packages\flwr\simulation\ray_transport\ray_client_proxy.py", line 151, in _submit_job
    res, updated_context = self.actor_pool.get_client_result(self.cid, timeout)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\poops\Desktop\Coding Shenanigans\SOIEvent\Lib\site-packages\flwr\simulation\ray_transport\ray_actor.py", line 425, in get_client_result
    return self._fetch_future_result(cid)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\poops\Desktop\Coding Shenanigans\SOIEvent\Lib\site-packages\flwr\simulation\ray_transport\ray_actor.py", line 306, in _fetch_future_result
    res_cid, res, updated_context = ray.get(
                                    ^^^^^^^^
  File "C:\Users\poops\Desktop\Coding Shenanigans\SOIEvent\Lib\site-packages\ray\_private\auto_init_hook.



[2m[36m(DefaultActor pid=26016)[0m [Client 1] evaluate, config: {}
