# Horizontal Federated Learning


The easiest way to get started is to run the example on a hosted Python environment using Google Colab. To open the example on Google Colab click on the "Open in Colab" button below. If you chose Google Colab, you will need to set some variabales at the beginning of the notebook. The easiest way to do this is by clicking on the copy button below, which will copy all variables and insert them at the same position in the notebook on Google Colab.


In [None]:
PYTHON_VERSION = "{PYTHON_VERSION}"  # noqa: F821
ARTIFACT_USER = "{ARTIFACT_USER} "  # noqa: F821
ARTIFACT_KEY = "{ARTIFACT_KEY}"  # noqa: F821
PYPI_REGISTRY = "{PYPI_REGISTRY}"  # noqa: F821
ORGANIZATION_ID = "{ORGANIZATION_ID}"  # noqa: F821
SERVER_ADDRESS = "{SERVER_ADDRESS}"  # noqa: F821
TOKEN_URL = "{TOKEN_URL}"  # noqa: F821


[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/katulu-io/examples/blob/v{DOCKER_VERSION}/examples/workbook_hfl.ipynb)


This is a demonstration of Horizontal Federated Learning using the Katulu platform. In this demo we will:
* Provide a short introduction to the concept of Horizontal Federated Learning
* Demonstrate how to install and onboard an agent
* Show how to access the data and prepare it for training
* Explain how to train a model using federated learning
* Examplify how to evaluate the model

## Intro Horizontal Federated Learning

Federated Learning is a machine learning setting where many agents (e.g. mobile devices or whole organizations) collaboratively train a model under the orchestration of a central server (e.g. service provider), while keeping the training data decentralized. Federated Learning is great for privacy and security reasons, as the data never leaves the agent device. Horizontal Federated Learning is a specific type of federated learning where the data is distributed across the agents and all agents can provide the same features. From a practical perspective, this means that all agents' data is associated to the same process.

In an industrial context this process could be predicting the quality of a chemical solution based on different measurements within a factory. Each factory can be an agent and we want to train a model to predict the quality based on all data collected from a global production. With federated learning, we can train a model that is able to predict the quality without sharing any data between the factorys.

For this demo we will focuse on the above use case but we could also use federated learing for quality prediction in other processes, e.g., like PCB production where we want to predict the PCB' quality at the end of the line based on the process data. Or for other cases like predictive maintenance, where we want to predict the remaining lifetime of a machine based on the data, etc.

## Installing the agent

for this demo we will install the agents locally and use an already prepared dataset. 

We will run the agents directly in Python. Please make sure that you have at least Python > 10.14. installed. First, we will create a virtual environment called `platform_demo` and and activate it. If you don't want to use a virtual environment, you can skip this step.
```bash
python -m venv platform_demo
source platform_demo/bin/activate
```

Next, we will install the required packages. 
```bash
pip install katulu-agent=={PYTHON_VERSION} -U --extra-index-url https://download.pytorch.org/whl/cpu --extra-index-url https://{ARTIFACT_USER}:{ARTIFACT_KEY}@{PYPI_REGISTRY}
```

More details on agent installation and different options can be found in the [agent documentation]({SERVER_URL}/docs/agent/installation).

### Download the datasets 

After installing the agent we will need to download the datasets. This can be done with `gcloud CLI`. 
If you don't have `gcloud CLI` installed, you can install it by following the instructions [here](https://cloud.google.com/sdk/docs/install). 

Next, we will download the dataset and store it in a folder called `data`.

```bash
mkdir data
echo {ARTIFACT_KEY} \
| base64 --decode | gcloud auth activate-service-account --key-file=-
gcloud storage cp gs://demo-agent-data-files/* data/
```

### Configure and start the agents

Now we will create two agents on the Platform. To do this, we need to open the [agents page]({{SERVER_URL}}/{{ORGANIZATION_ID}}/agents) and click on the `Create Agent` button. This will open a dialog where we need to set a name for the agent and can define some labels, to better identify the agent, e.g., location, hardware, etc. We will call the agents `agent_1` and `agent_2`. After inserting a name and maybe some labels, we will click `Create Agent`. This will open up a new page, where we can download the first part of the configuration file by clicking the `Download (agent_1.yml)` (respective `agent_2.yml`) button on the `Configuration File(...)` section.

The configuration file should be place into the current directory and we will need to append the data specific configuration next. 

For the first agent, we will add the following information to the configuration file:
```yaml
datasets:
  - name: chemical
      type:
        parquet:
          file: .//data/agent_1.parquet
      privacy_level: 0
```

and for the second agent:
```yaml
datasets:
  - name: chemical
      type:
        parquet:
          file: .//data/agent_2.parquet
      privacy_level: 0
```

This will tell the agent to use the data from the `agent_1.parquet` and `agent_2.parquet` files. The `privacy_level` is set to `0` which means that the we will not use any additional privacy preserving techniques on the data. In a real world scenario, you would want to use a higher privacy level, e.g., `1` or `2` to ensure that the data cannot be reconstructed from the model.

The full configuration file should look like this:
```yaml
id: {AGENT_ID}
server_url: {SERVER_URL}
credentials:
    token_url: {TOKEN_URL}
    client_id: {CLIENT_ID}
    client_secret: {CLIENT_SECRET}
datasets:
  - name: chemical
      type:
        parquet:
          file: .//data/agent_{agent_number}.parquet
      privacy_level: 0
```


More information on the configuration file can be found in the [agent documentation]({SERVER_URL}/docs/agent/configuration).

Now we are all set to start the agents. We can do this by opening two terminals and run the following command in the terminals:
```bash
source platform_demo/bin/activate
katulu-agent agent_1.yml
```
and
```bash
source platform_demo/bin/activate
katulu-agent agent_2.yml
``` 

The agents are sucessfully started when you see three lines in the terminal starting with:
```
Starting agent
Retrieving server version
Schemas registered with server 
```

If you are not seeing these lines, please check the configuration file and the [troubleshooting guide]({SERVER_URL}/docs/agent/troubleshooting).

Now we can move to the next step and explore the data and prepare it for training.

## Exploring the SDK

To interact with the Platform we will use the Katulu SDK. The SDK is a Python package that provides an easy way to interact with the Platform. The SDK can be installed by running the following command:

In [None]:
!pip install katulu-sdk=={PYTHON_VERSION} -U --extra-index-url https://download.pytorch.org/whl/cpu --extra-index-url https://{ARTIFACT_USER}:{ARTIFACT_KEY}@{PYPI_REGISTRY}

You can write the SDK code in a Python script and call it with `python script.py` or you can use a Jupyter notebook. For this demo we will use a Jupyter notebook. You can use a hosted Jupyter service like, Google Colab or Kaggle Notebook. We will use a local installation in this demo. To start a Jupyter notebook, run the following command:
```bash
pip install jupyter
jupyter notebook
```

This installs and starts a Jupyter notebook server. You can now open a browser and navigate to `http://localhost:8888` to open the Jupyter notebook interface. There you can create a new notebook and start writing code.

### Notebook setup.

First we will import the required packages.

In [1]:
# ruff: noqa: PLE1142
import torch

from katulu.sdk import (
    Adam,
    BinaryAccuracy,
    BinaryCrossEntropyWLogitsLoss,
    JobSpecConfig,
    build_job_spec,
    connect,
    model_from_torch,
)
from katulu.sdk.pipeline import (
    Alias,
    Avg,
    Cast,
    CastType,
    Features,
    GroupBy,
    Max,
    MinMaxScaler,
    Select,
    Source,
    Targets,
)

### Project setup

Now we will need to create a new project on the Platform. To do this, we need to open the [projects page]({SERVER_URL}/{ORGANIZATION_ID}/projects) and click on the `Create Project` button. This will open a dialog where we need to set a name for the project. We will call the project `chemical_quality_prediction`. After inserting the name, we will click `Create Project`. This will open up a new page, and it will show the `PROJECT_ID` in the top of the page, next the project name. We will need this `PROJECT_ID` to connect to the project in the SDK. We can insert it into the code below.

In [None]:
PROJECT_ID = "{PROJECT_ID}"  # noqa: F821
CLIENT_ID = "{CLIENT_ID}"  # noqa: F821
CLIENT_SECRET = "{CLIENT_SECRET}"  # noqa: F821

### Client setup

To get the `CLIENT_ID` and `CLIENT_SECRET`, we will create new Access Credential associated to our profile. Open the [Access Credentials page]({SERVER_URL}/{ORGANIZATION_ID}/access-credentials) and click on the `Create Access Credential` button. This will open a dialog where we need to set a name for the credential. We will call the credential `chemical_quality_credentials`. After inserting the name, we will click `Create Access Credential`. This will open up a new page, and it will show all the information we need to fill in the missing pieces in the above code.

These are all the required information to connect to the Platform. Now we can start exploring the data and prepare it for training.

We will define the dataset names that we want to retrieve from the Platform. The name should be the same as the name in the configuration file. In this case, we will use `chemical` and create an empty dictionary to store the data.

In [3]:
DATASET_NAMES = ["chemical"]
datasets = {}

Now we connect to the Platfom, retrieve the data and print the schema of the data.

In [4]:
datasets = {}
# Connect to the Platform to get the dataset
async with connect(
    project_id=PROJECT_ID,
    organization_id=ORGANIZATION_ID,
    server_address=SERVER_ADDRESS,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    token_url=TOKEN_URL,
) as session:
    for dataset_name in DATASET_NAMES:
        ds = (await session.find_sources(name=dataset_name))[0]
        datasets[dataset_name] = ds
        print(f"Found dataset: {ds.id}")
        print(ds.schema)

[2m2024-07-15 08:44:59[0m [[32m[1mdebug    [0m] [1mIssuing new access token      [0m [36msource[0m=[35mkatulu.core.auth.jwt[0m
Found dataset: 9b2d032cd1c1aedf101581e0bdb25ed5581110e5aad7daad4f752cebf26723ec
╒══════════════════════╤═════════╕
│ Field                │ Type    │
╞══════════════════════╪═════════╡
│ fixed acidity        │ Float64 │
├──────────────────────┼─────────┤
│ volatile acidity     │ Float64 │
├──────────────────────┼─────────┤
│ citric acid          │ Float64 │
├──────────────────────┼─────────┤
│ residual sugar       │ Float64 │
├──────────────────────┼─────────┤
│ chlorides            │ Float64 │
├──────────────────────┼─────────┤
│ free sulfur dioxide  │ Float64 │
├──────────────────────┼─────────┤
│ total sulfur dioxide │ Float64 │
├──────────────────────┼─────────┤
│ density              │ Float64 │
├──────────────────────┼─────────┤
│ pH                   │ Float64 │
├──────────────────────┼─────────┤
│ sulphates            │ Float64 │
├───────────

The Schema contains the available columns and their types. We can use this information to prepare the data for training.

To do this we define which columns we want to use for training and which column we want to predict. In this case, we will use all columns except the `quality` column to train the model and we will use the `quality` column to predict the quality of the chemical solution. The features will be put into a list called `fields` and the target column will be stored in a list called `target`. 

In [5]:
fields = [
    "fixed acidity",
    "volatile acidity",
    "citric acid",
    "residual sugar",
    "chlorides",
    "free sulfur dioxide",
    "total sulfur dioxide",
    "density",
    "pH",
    "sulphates",
    "alcohol",
]

target = ["quality"]

The pipeline defines the data transformations that need to be applied to the data. In this case, we know from the production that they take multiple mesurements per batch. Therefore, we will first group the data by the batch id and aggregate the values by the `MAX` function. This will give us a single row per batch with the maximum value of each feature.

Then, we define the `Targets` (what we want to predic) and cast them to a `Float32` type. We will do the same for the `Features` and cast them to a `Float32` type as well. We will also normalize the data by using the `MinMaxScaler`.

In [6]:
# Define the pipeline
# fmt: off
pipeline = (
    Source(datasets[DATASET_NAMES[0]])
    | GroupBy(["id"], [Alias(Max(f), f) for f in fields + target])
    + Targets(Select(target) + Cast(target, CastType.Float32))
    + Features(Select(fields) + Cast(fields, CastType.Float32) + MinMaxScaler(fields))
)
# fmt: on

Next, we will define a simple model that we will train on the data. The model will be a simple feed forward neural network with two hidden layers. The input size will be the number of features and the output size will be the number of targets.

In [7]:
# Define the model
MODEL = torch.nn.Sequential(
    torch.nn.BatchNorm1d(len(fields)),
    torch.nn.Linear(len(fields), 20),
    torch.nn.ReLU(),
    torch.nn.LayerNorm(20),
    torch.nn.Linear(20, 20),
    torch.nn.ReLU(),
    torch.nn.Linear(20, 1),
)

Finnaly, we put all the pieces together and define a training `Job`. It contains the `pipeline`, the `model`, and a configuration for the training.

We will train the model for 30 epochs with a batch size of 256. For the optimizer, we will use the `Adam` optimizer with a learning rate of `0.001`. We will use the `BinarCrossEntropyLoss` as the loss function, as we want the predict if the quality of the chemical solution meets some binary quality criteria.

As a metric to track during the training we use the `Accuracy` metric. This metric will tell us how many of the predictions are correct.

In [8]:
# Create the job spec
job_spec = build_job_spec(
    name="chemical",
    pipeline=pipeline,
    model=model_from_torch(MODEL),
    config=JobSpecConfig(
        num_rounds=30,
        batch_size=256,
        optimizer=Adam(learning_rate=1e-3),
        loss_function=BinaryCrossEntropyWLogitsLoss(),
        metrics=[BinaryAccuracy(threshold=0.0)],
    ),
)

Finally, we are all set and can start the training job. We will print the training progress and also have the chance to track the training metrics on the platform by following the link in the output.

In [9]:
# Start the training
async with connect(
    project_id=PROJECT_ID,
    organization_id=ORGANIZATION_ID,
    server_address=SERVER_ADDRESS,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    token_url=TOKEN_URL,
) as session:
    job_id = await session.fit(job_spec)  # noqa: PLE1142
    print(f"Job ID: {job_id}")
    print(f"https://{SERVER_ADDRESS}/{ORGANIZATION_ID}/{PROJECT_ID}/jobs")
    round = 1
    async for metrics in session.metrics(job_id):
        print("Round:", round, {m.name: metrics.items[f"val_{m.name}"] for m in job_spec.config.metrics})
        round += 1

[2m2024-07-15 08:45:10[0m [[32m[1mdebug    [0m] [1mIssuing new access token      [0m [36msource[0m=[35mkatulu.core.auth.jwt[0m
Job ID: acee832f-5de7-4878-9c98-4e685d5041b6
https://https://platform.katulu.io/ca2bf2b8-3cd1-427b-8b39-c1b09c72c0ed/bcd538b2-422f-400e-a9fd-18aa54f92ec5/jobs
Round: 1 {'accuracy': 0.6210559010505676}
Round: 2 {'accuracy': 0.58365398645401}
Round: 3 {'accuracy': 0.602124035358429}
Round: 4 {'accuracy': 0.6253655552864075}
Round: 5 {'accuracy': 0.6389102935791016}
Round: 6 {'accuracy': 0.633061408996582}
Round: 7 {'accuracy': 0.633061408996582}
Round: 8 {'accuracy': 0.633061408996582}
Round: 9 {'accuracy': 0.633677065372467}
Round: 10 {'accuracy': 0.6344466805458069}
Round: 11 {'accuracy': 0.6355240941047668}
Round: 12 {'accuracy': 0.6350623369216919}
Round: 13 {'accuracy': 0.6350623369216919}
Round: 14 {'accuracy': 0.634754478931427}
Round: 15 {'accuracy': 0.635216236114502}
Round: 16 {'accuracy': 0.634754478931427}
Round: 17 {'accuracy': 0.635062336

We have seen from the training that the model wasn't able to learn the data. We possibily made a mistake in the data preperation. Therefore, we will go back and check the pipeline. We will change the aggregation function from `MAX` to `MEAN`.

In [11]:
# Adjust the pipeline to use average instead of max aggregation
# fmt: off
pipeline = (
    Source(datasets[DATASET_NAMES[0]])
    | GroupBy(["id"], [Alias(Avg(f), f) for f in fields + target])
    + Targets(Select(target) + Cast(target, CastType.Float32))
    + Features(Select(fields) + Cast(fields, CastType.Float32) + MinMaxScaler(fields))
)
# fmt: on

We create a new Job with the updated pipeline and start the training. This time the model should be able to learn the data and we should see the accuracy increasing over time.

In [12]:
# Create the job spec
job_spec = build_job_spec(
    name="chemical",
    pipeline=pipeline,
    model=model_from_torch(MODEL),
    config=JobSpecConfig(
        num_rounds=30,
        batch_size=256,
        optimizer=Adam(learning_rate=1e-3),
        loss_function=BinaryCrossEntropyWLogitsLoss(),
        metrics=[BinaryAccuracy(threshold=0.0)],
    ),
)

In [13]:
# Start the training
async with connect(
    project_id=PROJECT_ID,
    organization_id=ORGANIZATION_ID,
    server_address=SERVER_ADDRESS,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    token_url=TOKEN_URL,
) as session:
    job_id = await session.fit(job_spec)  # noqa: PLE1142
    print(f"Job ID: {job_id}")
    print(f"https://{SERVER_ADDRESS}/{ORGANIZATION_ID}/{PROJECT_ID}/jobs")
    round = 1
    async for metrics in session.metrics(job_id):
        print("Round:", round, {m.name: metrics.items[f"val_{m.name}"] for m in job_spec.config.metrics})
        round += 1

[2m2024-07-15 08:45:48[0m [[32m[1mdebug    [0m] [1mIssuing new access token      [0m [36msource[0m=[35mkatulu.core.auth.jwt[0m
Job ID: 80c6eec9-ed3b-471a-a78c-fc06597700fb
https://https://platform.katulu.io/ca2bf2b8-3cd1-427b-8b39-c1b09c72c0ed/bcd538b2-422f-400e-a9fd-18aa54f92ec5/jobs
Round: 1 {'accuracy': 0.6596890687942505}
Round: 2 {'accuracy': 0.6596890687942505}
Round: 3 {'accuracy': 0.7077112793922424}
Round: 4 {'accuracy': 0.7175619602203369}
Round: 5 {'accuracy': 0.7175619602203369}
Round: 6 {'accuracy': 0.734184980392456}
Round: 7 {'accuracy': 0.7394182085990906}
Round: 8 {'accuracy': 0.7394182085990906}
Round: 9 {'accuracy': 0.7452670335769653}
Round: 10 {'accuracy': 0.7460366487503052}
Round: 11 {'accuracy': 0.7460366487503052}
Round: 12 {'accuracy': 0.7494227886199951}
Round: 13 {'accuracy': 0.7529628872871399}
Round: 14 {'accuracy': 0.7529628872871399}
Round: 15 {'accuracy': 0.7540403008460999}
Round: 16 {'accuracy': 0.7541942596435547}
Round: 17 {'accuracy': 0.

And we see that the Accuracy increased by more then 10% and the model is able to predict the quality of the chemical solution with an accuracy of around 80%.

Within the demo we have seen how to install and onboard agents, how to access the data and prepare it for training, how to train a model using federated learning and how to evaluate the model. Now it is time to explore the platform and try it out yourself.