# Create a distributed histogram using Flower

This introductory example of creating a distributed histogram, uses a personalized strategy to aggregate results from clients. Deep knowledge of aggregation strategies is not necessarily required to run the example. However, it will help you understand how to adapt Flower to your use case. Running this example in itself is quite easy.

## Step 0: Preparation

Before we begin with any actual code, let's make sure that we have everything we need.

### Instaling dependencies
First, we should install the necessary packages

In [None]:
## Jump to the next block of code if you already installed the packages.

# Linux
!pip install -q flwr[simulation] matplotlib

# MacOs
#!pip3 install -U 'flwr[simulation]' matplotlib

Now that we have all dependencies installed, we can import everything we need for this tutorial:

In [None]:
import numpy as np
import flwr as fl
from sklearn.datasets import load_iris

It is possible to switch to a runtime that has GPU acceleration enabled (on Google Colab: Runtime > Change runtime type > Hardware accelerator: GPU > Save). Note, however, that Google Colab is not always able to offer GPU acceleration. If you see an error related to GPU availability in one of the following sections, consider switching back to CPU-based execution by setting DEVICE = torch.device("cpu"). If the runtime has GPU acceleration enabled, you should see the output Training on cuda, otherwise it'll say Training on cpu.

### Loading the data

Federated learning can be applied to many different types of tasks across different domains. In this tutorial, we introduce federated learning by training a Logistic Regression on the popular Iris dataset.

We simulate having multiple datasets from multiple organizations (also called the "cross-silo" setting in federated learning) by splitting the original Iris dataset into multiple partitions. Each partition will represent the data from a single organization. We're doing this purely for experimentation purposes.

Each organization will act as a client in the federated learning system. So having 3 organizations participate in a federation means having 3 clients connected to the federated learning server.

Let's now create the Federated Dataset abstraction that from flwr-datasets that partitions the Iris. We will create small training and test set for each edge device and wrap each of them into a DataLoader:

In [None]:
# Define the number of clients
NUM_CLIENTS = 3

def load_data():
    iris = load_iris()
    # Create pandas DataFrame
    iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)

    # Add label column (flower species)
    iris_df['species'] = iris.target

    # Optional: Map species numbers to names
    #iris_df['species'] = iris_df['species'].map({0: 'setosa', 1: 'versicolor', 2: 'virginica'})

    return iris_df

def partition_data(X, num_clients):
    partition_size = len(X) // num_clients
    partitions = [(X[i * partition_size:(i + 1) * partition_size])
                  for i in range(num_clients)]
    return partitions

## Step 2: Implementing Flower Client
Federated Learning systems consist of a server and multiple clients. In Flower, we create clients by implementing subclasses of `flwr.client.Client` or `flwr.client.NumPyClient`. We use `NumPyClient` in this tutorial because it is easier to implement.
To implement the Flower client, we create a subclass of `flwr.client.NumPyClient` and implement the method `fit`:
* `fit`: Receive model parameters from the server, train the model parameters on the local data, and return the updated model parameters to the server.

Our clients will use the `numpy` components to compute the histogram. Let's see a simple Flower Client implementation that brings everything together.

In [None]:
import pandas as pd
from collections import namedtuple

class PandasClient(fl.client.NumPyClient):
    def __init__(self, X, cid):
        self.X = X
        self.cid = cid

    def compute_hist(self, df, col_name):
        hist, bins = np.histogram(df[col_name])
        return hist, bins

    def fit(self, parameters, config):
        hist_list = []
        bins_list = []
        # Execute query locally
        for col in self.X.columns:
            hist, bins = self.compute_hist(self.X, col)
            hist_list.append(hist)
            bins_list.append(bins)
        array_of_arrays = [hist_list, bins_list]

        return array_of_arrays, len(self.X), {}


Our class `PandasClient` defines how local training/evaluation will be performed and allows Flower to call the local training/evaluation through `fit` and `evaluate`. Each instance of `PandasClient` represents a *single client* in our federated learning system. Federated Learning systems have multiple clients (otherwise, there is not federated), so each client will be represent by its own instance of `PandasClient`. If we have, for example, three clients in our workload, then we'd have three instances of `PandasClient`. Flower calls `PandasClient.fit` on the respective instance when the server selects a particular client for training.

### Using the Virtual Client Engine

In this notebook, we are simulating a federated learning system with 3 clients on a single machine. This means that the server and all 3 clients will live on a single machine and share resources as CPU, GPU, and memory. Having 3 clients would mean having 3 instances of `PandasClient` in memory. Doing this on a single machine can quickly exhaust the available memory resources, even if only a subset of this clients participates in a single round of federated learning.

In addition to the regular capabilities where server and clients run on multiple machines, Flower, therefore provides  simulation capabilities that create `PandasClient` instances only when they are actually necessary for training or evaluation. To enable clients the Flower framework to create clients when necessary, we need to implement a function called `client_fn` that creates a `PandasClient` instance on demand. Flower calls `client_fn` whenever it needs an instance of one particular client to call `fit` or `evaluate` (those instances are usually discarded after use, so they should not keep any local state). Clients are identified by a client ID, or short `cid`. The `cid` can be clients, as can be seen below.

In [None]:
def client_fn(cid: str) -> fl.client.Client:
    X = load_data() # Load data
    num_clients = NUM_CLIENTS  # Number of clients should match the number of partitions
    partitions = partition_data(X, num_clients) # Create data partitions

    partition_id = int(cid) # Associate the partition id to client id (cid)
    # Each client gets a different X_train and y_train, so each client will train and test on their unique data
    X = partitions[partition_id]

    # Create a single Flower client representing a single organization/device
    return PandasClient(X, cid).to_client() #for Flower version > 1.8
    # return PandasClient(X, cid) #for Flower version < 1.8

## Step 3: Create the Strategy

Let us overwrite the `configure_fit` method so that it passes a higher learning rate (potentially also other hyperparameters) to the optimizer of a fraction of the clients. We will keep the sampling of the clients as it is in FedAvg and then change the configuration dictionary (one of the FitIns attributes). The aggregation strategy should be implemented using the method `aggregate_fit`. Since we are aggregating histograms, each client transfers only the frequencies and bins. The server sums such values and draws the aggregated histogram.

In [None]:
from typing import Dict, List, Optional, Tuple, Union

import numpy as np

import flwr as fl
from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy import Strategy


class AggregateHistogram(Strategy):
    def initialize_parameters(
            self, client_manager: Optional[ClientManager] = None
    ) -> Optional[Parameters]:
        return None

    def configure_fit(
            self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        config = {}
        fit_ins = FitIns(parameters, config)
        clients = client_manager.sample(num_clients=NUM_CLIENTS, min_num_clients=2)
        return [(client, fit_ins) for client in clients]

    def aggregate_fit(
            self,
            server_round: int,
            results: List[Tuple[ClientProxy, FitRes]],
            failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        # Get results from fit
        # Convert results
        values_aggregated = [
            (parameters_to_ndarrays(fit_res.parameters)) for _, fit_res in results
        ]

        sepal_length_agg_hist = 0
        sepal_width_agg_hist = 0

        sepal_length_agg_bin = 0
        sepal_width_agg_bin = 0

        # For simplification, we are only using the first two columns of the Dataset
        for val in values_aggregated:
            sepal_length_agg_hist += val[0][0] # sepal length
            sepal_length_agg_bin += val[1][0]
            sepal_width_agg_hist += val[0][1] # sepal width
            sepal_width_agg_bin += val[1][1]

        dict_result = {
            'sepal_length_h': sepal_length_agg_hist,
            'sepal_width_h': sepal_width_agg_hist,
            'sepal_length_b': sepal_length_agg_bin,
            'sepal_width_b': sepal_width_agg_bin
        }

        ndarr = np.concatenate(
            (["Sepal Length"], sepal_length_agg_hist, ["Sepal Width"], sepal_width_agg_hist)
        )
        return ndarrays_to_parameters(ndarr), dict_result

    def evaluate(
            self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        agg_hist = [arr.item() for arr in parameters_to_ndarrays(parameters)]
        return 0, {}

    def configure_evaluate(
            self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        pass

    def aggregate_evaluate(
            self,
            server_round: int,
            results: List[Tuple[ClientProxy, EvaluateRes]],
            failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        pass


## Step 4

The built-in Flower Strategies provide way to send the config dictionary from server to clients, and it works similarly to the way server-side evaluation works. We provide a function to the strategy, and the strategy calls this function for every round of federated learning:

In [None]:
def fit_config(server_round) -> Dict:
    """Send round number to client."""
    config = {
        "server_round": server_round
    }
    return config

# Define the strategy
strategy = AggregateHistogram()

# Simulation configuration
# Check the Flower Framework documentation for more details about Flower Simulations
# and how to setup the client_resources
client_resources = {"num_cpus": 1}
num_clients = NUM_CLIENTS
num_rounds = 1

## Step 5

The last step is the actual call `start_simulation` which starts the simulation.

In [None]:
result = fl.simulation.start_simulation(
    strategy=strategy, # the strategy that will construct a client
    client_fn=client_fn, # a function to construct a client
    num_clients=num_clients, # total number of clients in the experiment
    config=fl.server.ServerConfig(num_rounds=1), #let's run for 5 rounds
    client_resources=client_resources,
)

## Step 6

Analysing the results

In [None]:
print(result)

Draw the global aggregated histogram

In [None]:
import matplotlib.pyplot as plt

# Aggregated histogram values
length = result.metrics_distributed_fit.get('sepal_length_h')[0][1]
width = result.metrics_distributed_fit.get('sepal_width_h')[0][1]

# Aggregated bin values
length_b = result.metrics_distributed_fit.get('sepal_length_b')[0][1]
width_b = result.metrics_distributed_fit.get('sepal_width_b')[0][1]

# Standard bins
num_bins = 10
# Calculate the centers of the bins for plotting
bin_centers_l = (length_b[:-1] + length_b[1:]) / 2
bin_centers_w = (width_b[:-1] + width_b[1:]) / 2

# Bar width
bar_width = 0.4

# Adjusting bar positions for side-by-side plotting
bar_positions_length = bin_centers_l - bar_width / 2
bar_positions_width = bin_centers_w + bar_width / 2

# Create the figure and subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Plot bars for Sepal Length
ax1.bar(bar_positions_length, length, width=bar_width, color='blue', alpha=0.7, edgecolor='black', label='Length')
ax1.set_title('Sepal Length')
ax1.set_xlabel('Value')
ax1.set_ylabel('Frequency')
ax1.set_xticks(bin_centers_l)

# Plot bars for Sepal Width
ax2.bar(bar_positions_width, width, width=bar_width, color='green', alpha=0.7, edgecolor='black', label='Width')
ax2.set_title('Sepal Width')
ax2.set_xlabel('Value')
ax2.set_ylabel('Frequency')
ax2.set_xticks(bin_centers_w)

# Adjust layout to avoid overlapping
plt.tight_layout()

# Show the graph
plt.show()
