# RoleML in 100 Minutes

Welcome to the world of distributed machine learning! This tutorial will lead you through RoleML, a role-oriented programming model for creating distributed learning architectures towards edge computing. You will learn the basic usage of RoleML by building an example architecture - vanilla Federated Averaging (FedAvg).

First things first, if you haven't installed Jupyter in your environment, which is required to run this notebook, please install it now:

In [None]:
%pip install jupyter

## Introduction

### Background

Traditional architectures for distributed machine learning (DML), such as AllReduce or Parameter Server, are limited to data centers on the cloud. On the other hand, **edge computing** catalyze the emergence of many new DML architectures that leverage the great power at the network edge while trying to overcome major challenges including device and data heterogeneity. The proliferation of advanced architectures calls for a general programming model and framework to facilitate customizable DML. However, most existing frameworks are only for Federated Learning, a classic architecture with centralized topology. They provide relatively fixed APIs for the client-server paradigm and therefore cannot be used on other architectures, especially the fully distributed ones (such as _Gossip Learning_).

We therefore propose RoleML, a role-oriented programming model for general-purpose DML development. RoleML breaks a DML architecture into a series of interactive components and uses a simple yet efficient abstraction, namely **role**, to express them uniformly. The main idea is that it _decouples functionalities from the physical computation nodes_ so that developers can _focus on designing individual modules_ that logically build up the whole architecture, and then use these modules to _assemble different kinds of physical nodes_ in the deployment stage. RoleML can be used to develop new architectures with different topologies and communication strategies, unlocking their potentials.

 ### RoleML Concepts

In RoleML, roles are functional components that interact with each other within a DML architecture. Each role encapsulates a set of **message channels** that serve as interfaces for other roles. There are three types of channels: _services_ and _tasks_ stand for synchronous and asynchronous function calls respectively, which are used in directional communication. _Events_ follow the publish-subscribe model and are used for non-directional communication. All channels defined in a role should focus on a single responsibility such as training or aggregation.

RoleML's runtime system allows multiple roles to be co-located on the same node. This enables developers to construct nodes in a building block fashion, improving efficiency in developing new DML architectures. It is especially important in centralized architectures like Federated Learning because it prevents the need for a "big class" to represent the server side, commonly seen in many FL frameworks.

RoleML also introduce a concept called **relationship** as an abstraction of different interaction targets. Traditional frameworks (mostly FL-only) use client IDs to reference each individual client. It is OK in centralized architectures, but can make things messy in more complex architectures. In RoleML, each relationship is directional and refers to one or more other roles that a role may interact with. For example, a relationship named `trainer` may contain a list of Trainer roles on different clients. As such, we can program a Client Selector role to say that "I will select clients from _the fast group_", instead of saying "I will select clients from _[client1, client2, client5] because they train at a faster speed_". Note that relationships are configured at the node level and managed by each node individually. Relationships from a node's perspective can be considered to contain partial information of the overall distributed training topology, visible to all roles that are deployed on the node.

RoleML fosters the decoupling of _workflow_ and _workloads_. A workflow defines "what to do" when training a model in a distributed fashion, which is reflected in the behavior of roles. Workloads define "how to do" in such training, such as what model to train and with what dataset. We introduce a new abstraction named **elements** for different kinds of workloads. The key idea is that it separates the implementation (including loading) of workload objects from its specification to make possible more precise control.

## Write a Federated Learning Application

To begin with, let's import the RoleML package:

In [None]:
import roleml.essentials as rml

A DML architecture defines how distributed computation nodes collaborate to speed up a model training job. We usually classify different architectures according to the topology and the communication strategy used. For example, Federated Learning is a centralized, synchronous architecture, while Gossip Learning is fully distributed and asynchronous.

RoleML is designed for two types of users: _DML architects_ who design new architectures, and _DML application developers_ who use existing architectures to train actual models with actual datasets. Certainly, even the architects need to use actual models and datasets to test if their architectures are implemented correctly. So in this tutorial, we will go through the process of writing a DML application from scratch. If you are an application developer and only interested in how to deploy Federated or Gossip Learning, we also recommend you to complete this tutorial to get a better view of what's happening under the hood.

A complete lifecycle for DML development includes the following steps:

* Workload definition: determine what the workloads will look like. In RoleML, workloads include models, datasets and operators that operate on them. This step is mainly about defining abstractions for these workloads, such as an abstract model class.
* Architecture design and implementation: given a formally described DML architecture, this step is mainly about designing and implementing roles with different responsibilities. The workload abstractions defined in the last step will be used accordingly when expressing the logic of roles.
* Application creation: provide concrete implementations for the workloads, and configurations to deploy the roles onto different nodes to perform distributed training. This is where most application developers mainly focus on.

### Define Workloads

The most significant benefit for defining workload abstractions is that different models and datasets will be able to fit into the same DML architecture more easily. In this tutorial, we will keep the interfaces minimal:

In [None]:
from typing import Any, Iterable, Protocol, runtime_checkable


@runtime_checkable
class Model(Protocol):

    def get_params(self) -> Any: ...

    def set_params(self, new_params: Any): ...


@runtime_checkable
class ClientModel(Model, Protocol):

    def train(self, dataset: 'Dataset') -> dict[str, Any]: ...


@runtime_checkable
class ServerModel(Model, Protocol):

    def test(self, dataset: 'Dataset') -> dict[str, Any]: ...


Dataset = Iterable

The above interfaces state that a model should implement a `get_params` method to get the current parameters, and a `set_params` method to update the parameters. The client-side model should additionally implement a `train` method, which represents one epoch of training. While the server-side model should additionally implement a `test` method to test the model with a given dataset. The use of protocols allows runtime type checking to make sure that only proper implementations can be used as the Trainer's workload. For simplicity, we only require a dataset to be iterable.

We also need to define an operator for aggregating different models, but we will defer this to later in this section.

Next, we will use these interfaces in the definition of client and server roles.

### Define the Client

To define a minimal FL client, which is responsible for local model training, we need to solve the following problems:

* How does the client load the model and the dataset?
* How does the client know when it's time for it to train? And with what configurations (e.g., number of local epochs)?
* How does the client return the trained model back to the server?

Since our client only has one responsibility, we only need to define one role, which is called the `Trainer`. Each type of role is represented by a subclass of the `Role` base class.

The following implementation of the Trainer role answers the above questions. Let's first have a look at it before diving into the detail:

In [None]:
class Trainer(rml.Role):

    def __init__(self):
        super().__init__()
        self.current_epoch = -1

    model = rml.Element(ClientModel)
    dataset = rml.Element(Dataset)

    epoch_completed = rml.Event()

    @rml.Task(expand=True)
    def train(self, _, num_epochs: int = 1):
        model = self.model()        # type: ClientModel
        dataset = self.dataset()    # type: Dataset
        for i in range(num_epochs):
            self.current_epoch += 1
            metrics = model.train(dataset)
            self.epoch_completed.emit(args={**metrics, 'epoch': self.current_epoch})
        return model.get_params()
    
    @rml.Service(expand=True)
    def apply_update(self, _, update):
        model = self.model()
        model.set_params(update)

_**Hint: this cell is important**_

In RoleML, roles are defined in a way similar to defining a regular class. You can provide arguments to initialize a role in the `__init__` method (not necessary here through). Note that for maximum compatibility, you should only use JSON-serializable basic data types (such as `int` or `str`) as arguments for `__init__`. To fulfill the need for more sophisticated and customizable member initialization, both the model and the dataset are declared as workload **elements**. We also specify their data types, which will be used to type-check the actually provided implementation.

A **task channel**, `train`, is defined by annotating a method as the message handler. It receives options for model training and returns the trained model. By default, the name of the channel is the same as the method name (except that all underscores `_` will be replaced by bars `-`). The option `expand` instructs RoleML to automatically extracts the contents from an incoming message and passes them directly as keyword arguments, making the method more like a regular instance method. Note that the first argument after `self` is always the caller (represented by a role instance ID). Since we are not interested in who sends the message here, we choose to ignore it.

An **event channel**, `epoch-completed`, is defined in the role to allow emitting a metrics message at the end of every training epoch. Any role interested in this metrics can subscribe to this event channel after the role is up.

Finally, a **service channel**, `apply-update`, is defined to accept updates to the local model's parameters.

A message consists of two parts: `args` and `payloads`. Both parts are key-value pairs (i.e. a dictionary). Content in the `args` part will be logged when sending or receiving a message if the debug mode is enabled, which can be useful in problem diagnosis. For maximum compatibility, you should only put content in the `args` part if it is JSON-serializable. Meanwhile, content in `args` are also used in event filtering. A subscriber can optionally provide conditions to only receive event messages whose `args` meet certain criteria.

You may have noticed that the event channel is defined as a class attribute, but is referenced as an instance attribute in the task handler. The class attribute is mainly a declaration saying _"Each role instantiated should have an event channel named `epoch-completed`"_. When initializing a role instance, RoleML will assign a proxy object with the same attribute name to the instance, with which you can emit event messages on behalf of this instance. Such design allows better type hinting during development.

Workload elements will be filled in a similar way when you provide customized implementations. You may also have noticed that the model and the dataset are not directly accessed as instance attributes; instead, we need to _call_ the corresponding instance attributes to get the actual workload objects. This hides the detail of the source of workload objects from the role. But for now, you only need to know that every call to the same element will return the same object (i.e. the construction will only be done once).

> When programming the Trainer role, we only define its behavior in a DML architecture, but does not care about _how_ these actions will be performed. This is usually the job of DML application developers who use the architecture to train different models.

As a basic infrastructure, RoleML's runtime library provides a ready-to-use Trainer role with more built-in functionalities. It is in the package `roleml.library.roles.trainer.epoch`. Also check the standard interfaces for model and dataset, which are located at `roleml.library.workload.models.protocols` and `roleml.library.workload.datasets.base`, respectively.

Lastly, don't forget to run the code cell above!

### Define the Server

In this tutorial, our server needs to do the following jobs in every round:

* Select clients
* Configure each selected client (e.g. by specifying the number of local epochs)
* Update their local models, send training instructions and aggregate the newly-trained local models
* Update the global model and test it with the test dataset

RoleML encourages modular development. We will have a separate role for each of the above jobs.

First, a `ClientSelector` role for client selection:

In [None]:
import random


class ClientSelector(rml.Role):

    RELATIONSHIP_NAME = 'trainer'   # we are actually selecting trainer roles

    @rml.Service(expand=True)
    def select_client(self, _, ratio: float = 0.5) -> list[rml.RoleInstanceID]:
        candidates = self.ctx.relationships.get_relationship_view(self.RELATIONSHIP_NAME)
        k = int(len(candidates) * ratio)
        return random.sample(candidates, k=k)
        # Note: sampling from a set will be removed in a subsequent Python version. When that
        # happens, convert `candidates` to a list before sampling.

Here `ctx` is a public attribute pointing to the application context, which includes a relationship manager. The client selector is interested in the set of role instances that belong to the relationship `trainer`, and it uses the `get_relationship_view` API to query them. This API is synchronized, which means that the set won't change during an invocation of the API, and it is therefore thread-safe.

> Another API `get_relationship_unsafe()` provides a more efficient approach. For more detail, please refer to our example implementation of Federated Learning in `examples/federated_learning` in the source code archive.

Next, we define a minimal `ClientConfigurator` role to configure the number of local epochs for each selected client:

In [None]:
class ClientConfigurator(rml.Role):

    @rml.Service(expand=True)
    def configure(self, _, clients: Iterable[rml.RoleInstanceID]) -> dict[rml.RoleInstanceID, dict[str, Any]]:
        return {client: {'num_epochs': 1} for client in clients}

Although it is just a one-line logic, separating it into another role allows us to switch to more advanced logic in the future.

Third, let's make an `Aggregator` that aggregates model updates from the selected clients:

In [None]:
from roleml.core.actor.group.util.collections import TaskResultCollector
from roleml.shared.collections.merger import KeyValueMerger, DictKeyValueMerger


class CollectiveAggregator(rml.Role):

    def __init__(self, collect_channel: str = 'train'):
        super().__init__()
        self.collect_channel = collect_channel

    # rml.Element but with default_construct_strategy=ConstructStrategy.EVERY_CALL
    merger = rml.Factory(KeyValueMerger, default_constructor=DictKeyValueMerger)

    aggregation_completed = rml.Event()

    @rml.Task(expand=True)
    def aggregate(self, _, sources_and_options: dict[rml.RoleInstanceID, dict[str, Any]]):
        group = sources_and_options.keys()
        merger = TaskResultCollector(group, merger=self.merger())
        self.call_task_group(
            group, channel_name=self.collect_channel,
            message_map={src: rml.Message(args=options) for src, options in sources_and_options.items()},
            on_result=merger.push)
        # wait until all clients have provided a result for aggregation
        aggregated = merger.result()
        self.aggregation_completed.emit(args={'sources': group})
        return aggregated

Because the semantics of the task channel `train` on Trainer is to _produce_ a training output via computation, it is reasonable to let the Aggregator, _consumer_ of training outputs, initiate the training process.

This is the first time we are using the collective communication API - `call_task_group` to call tasks on a group of roles. Since tasks are asynchronous, we need a callback function to handle the result whenever a task is finished, and this is exactly what the argument `on_result` does.

This Aggregator declares a `merger` workload element with the type `KeyValueMerger`. It is an abstraction provided by the runtime library and does two things: accepts key value pairs via the `push()` method, and return a merged result via the `merge()` method. Here it serves as the aggregation operator, which accepts _(role instance ID, model)_ key-value pairs and returns the aggregated model. Different concrete implementations usually represent different types of models, or models implemented in different ML engines. Because we are interacting with different roles in different rounds, we need a new merger every time, and therefore we declare the merger workload as a `Factory`, which means that (by default) every call to the workload proxy (i.e. `self.merger()`) will construct and return a new merger instance.

There's also another "merger", `TaskResultCollector`, in this Aggregator. It wraps the aggregation operator described above and provides a mechanism for us to _wait_ until all selected clients have provided their result.

> If you've learnt about Java, think about its I/O classes - it is common to wrap an `InputStream` or an `OutputStream` with another one, such as `BufferedInputStream(InputStream)`, for augmenting the original stream.

Now we have two responsibilities left: to update the selected clients' local model before training, and to update the global model and test it after aggregation. Also notice that we are missing something that can chain the three server-side roles written above.

Our solution is to define a `Coordinator` role, which maintains the global model, and uses a single task handler to chain other roles and undertake the remaining responsibilities.

> Trainer, Aggregator, and Coordinator are three basic roles in most DML architectures.

In [None]:
class Coordinator(rml.Role):

    model = rml.Element(ServerModel)    # type: rml.Element[ServerModel]
    dataset = rml.Element(Dataset)      # type: rml.Element[Dataset]

    round_completed = rml.Event()
    
    @rml.Task(expand=True)
    def run(self, _, num_rounds: int, select_ratio: float = 0.3):
        model = self.model()
        dataset = self.dataset()
        for i in range(num_rounds):
            # 1. select clients
            group = self.call('client-selector', 'select-client', args={'ratio': select_ratio})     # type: list[rml.RoleInstanceID]
            # 2. configure clients
            configurations = self.call('configurator', 'configure', payloads={'clients': group})    # type: dict[rml.RoleInstanceID, dict[str, Any]]
            # 3. update client model
            self.call_group(group, 'apply-update', payloads={'update': model.get_params()})
            # 4. send train instructions and wait for aggregation
            future = self.call_task('aggregator', 'aggregate', payloads={'sources_and_options': configurations})
            aggregated_model = future.result()
            # 5. update global model and test it
            model.set_params(aggregated_model)
            test_result = model.test(dataset)
            self.round_completed.emit(args={'round': i, 'result': test_result})
            self.logger.info(f'round {i} test result is {test_result}')
        self.logger.info('FL is done!!!!')

### Implement Workloads

Now it's time to prepare the real model and dataset. In this tutorial, we will train an RGB version of the LeNet-5 model using the classic CIFAR-10 dataset. The model is implemented in PyTorch, so make sure it is [installed](https://pytorch.org/get-started/locally/) in your environment.

Python is a dynamic typing (aka. duck typing) language, which means that we can define a single model class that conform to both interfaces without considering inheritance (it's weird to have a class that extends both `ClientModel` and `ServerModel`, isn't it?).

> By the way, if you haven't learned about Python's `typing` package, go check the official document. RoleML widely uses this package for type annotations. We hope that when you are interested in how RoleML is implemented, you will find it easy to go through the source code.

The model is defined as follows:

In [None]:
import torch.nn as nn

def lenet5_rgb() -> nn.Sequential:
    """ Modified from https://d2l.ai/chapter_convolutional-neural-networks/lenet.html"""
    return nn.Sequential(
        nn.Conv2d(3, 6, kernel_size=5), nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(6, 16, kernel_size=5),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Flatten(),
        nn.Linear(16 * 5 * 5, 120),
        nn.Linear(120, 84),
        nn.Linear(84, 10)
    )

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


class MyLeNet5RGBModel:     # you can optionally declare that it implements both ClientModel and ServerModel interface

    def __init__(self, lr: float = 0.01, device: str = 'cpu') -> None:
        self.module = lenet5_rgb()
        # for simplicity, our model only supports SGD
        self.optimizer = optim.SGD(self.module.parameters(), lr=lr)
        self.criterion = nn.CrossEntropyLoss()
        self.device = torch.device('cuda') if device.lower() == 'cuda' and torch.cuda.is_available() else torch.device('cpu')
        self.module.to(self.device)
        self.criterion.to(self.device)
    
    def get_params(self):
        return self.module.state_dict()

    def set_params(self, new_params):
        self.module.load_state_dict(new_params)
    
    def train(self, data: Dataset):
        self.module.train()
        batch_losses = []
        for batch_idx, (x, y) in enumerate(data):
            # expected x, y to be of type torch.tensor
            x = x.to(self.device)
            y = y.to(self.device)
            self.optimizer.zero_grad()
            logits = self.module(x)
            loss = self.criterion(logits, y)
            loss.backward()
            self.optimizer.step()
            batch_losses.append(loss.item())
        return {'loss': sum(batch_losses) / len(batch_losses)}
    
    def test(self, data: Dataset):
        self.module.eval()
        total_correct = 0
        total_samples = 0
        total_loss = 0.0
        with torch.no_grad():
            for batch_idx, (x, y) in enumerate(data):
                x = x.to(self.device)
                y = y.to(self.device)
                logits = self.module(x)
                loss = self.criterion(logits, y)
                _, predicted = torch.max(logits, dim=1)
                num_correct = predicted.eq(y).sum()
                total_correct += num_correct.item()
                total_samples += y.size(0)
                total_loss += loss.item() * y.size(0)
        return {'accuracy': total_correct / total_samples, 'loss': total_loss / total_samples}

Note that the training and testing function actually requires each item in the dataset to be a 2-tuple that corresponds to x and y, respectively. Therefore, we need to make sure that the dataset contains a series of 2-tuples.

Now we have a single class that can serve either as the client model or as the server model. Also note that our workload class `MyLeNet5RGBModel` defines several parameters for the `__init__` method; we will see how to provide arguments for them later.

> In fact, it is not difficult to make this workload class compatible with other models of the same category, such as VGG.

As for the dataset, we will download it from the official website since it can be more easily managed. Please first specify a directory for it:

In [None]:
# please replace by your own path
DATASET_ROOT = '/home/roleml/datasets/cifar-10'

Now we download and extract the dataset archive:

In [None]:
import os
os.environ['DATASET_ROOT'] = DATASET_ROOT

In [None]:
! mkdir -p {DATASET_ROOT}

In [None]:
! wget -O {DATASET_ROOT}"/cifar-10-python.tar.gz" https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz

In [None]:
! cd {DATASET_ROOT} && tar -xzvf {DATASET_ROOT}"/cifar-10-python.tar.gz"

In [None]:
! cd {DATASET_ROOT}"/cifar-10-batches-py" && mv * ../

In [None]:
! ls {DATASET_ROOT}     # should include data_batch_1 ~ data_batch_5 and test_batch

We assume that the server uses the whole test set for model testing and each client will get one of the five training batches. Therefore, we only need to know how to load a single file from the dataset. Again, we will use a single class that can be adopted by both sides:

In [None]:
import pickle
from pathlib import Path
from typing import Literal

from roleml.kits.workload import XYDataset


def unpickle(file):
    """ Unpickle a CIFAR-10 data file. """
    with open(file, 'rb') as fo:
        content = pickle.load(fo, encoding='bytes')
    return content[b'data'], content[b'labels']


class MyCiFar10Dataset(XYDataset):

    def __init__(self, root: str, part: Literal['train', 'test'] = 'train', index: int = 0):
        # again, we will see how to provide arguments later
        filename = 'test_batch' if part == 'test' else f'data_batch_{index}'
        data, labels = unpickle(str(Path(root) / filename))
        super().__init__(data.reshape(10000, 32, 32, 3), labels)

Here `XYDataset` is a utility class provided by RoleML's runtime library. It adds some common functionalities such as getting a single sample from the dataset or querying the total number of samples. It also makes sure that each sample, when queried, is returned as a 2-tuple, which comply to the requirement of the model.

So far we have just written the code to load data from the disk. The loaded objects are only some numpy arrays and therefore _ML engine-agnostic_. We need some adaptation to make them work better in our PyTorch model. Meanwhile, we need a mechanism to fetch data from the dataset in batches, while `XYDataset` does not have such capability.

First, let's prepare some helper functions:

In [None]:
import numpy as np
import torchvision.transforms as transforms


def transform_torch(dataset: MyCiFar10Dataset) -> XYDataset:
    """ Dataset normalization. """
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    x = [transform(img) for img in dataset.x]
    y = dataset.y
    return XYDataset(x, y)


def combine(data: Iterable):
    """ Combine a batch of samples. """
    data = list(data)
    x = np.array([d[0].numpy() for d in data])
    x = torch.tensor(x)
    y = np.array([d[1] for d in data])
    y = torch.from_numpy(y)
    return x, y

The above functions are actually taken from the runtime library at `roleml.library.workload.datasets.zoo.image.cifar_10` - yes, RoleML has out-of-the-box support for CIFAR-10 dataset (and more). The first function performs some normalization to the loaded dataset, while the second one combines samples from a single batch so it can be accepted by the model as a whole.

Following the principle of low coupling, RoleML provides utilities to create dataset *views* - different dataset views can represent different ways to iterate or process the same dataset. We will use these functions later when creating a dataset view for the client later.

## Run the Application

So far we have prepared roles for the server and the clients. Now it's time to create a Federated Learning application by deploying these roles to the corresponding actors (here each actor represents a node). There are several ways to deploy roles; for this tutorial we will just go simple - we will simulate Federated Learning in one Python program.

### Prepare Actors

RoleML provides an `ActorBuilder` class to create an actor out of a configuration dict. First, let's have a look at the configuration for deploying the Trainer role in a client:

In [None]:
from roleml.kits.workload import DatasetViewFactory

conf_trainer = {
    'class': Trainer,
    'impl': {
        'model': {
            'class': MyLeNet5RGBModel,
            'constructor_args': {
                'lr': 0.01
            }
        },
        'dataset': {
            'constructor': DatasetViewFactory,
            'constructor_args': {
                'dataset': {
                    'type': MyCiFar10Dataset,
                    'options': {
                        'root': DATASET_ROOT,
                        'part': 'train',
                        'index': 0
                    }
                },
                'converters': [transform_torch],
                'sampler': 'sequential',
                'batch_size': 32,
                'combiner': combine
            },
        }
    }
}   # type: rml.RoleSpec

The configuration dict for a role consists of three keys: **`class`** for the role class, **`options`** for keyword arguments that will be passed to the role's `__init__` method, and **`impl`** for implementing workload elements. The latter two are optional. The above example also shows the basic schema which you must follow when writing a configuration file; otherwise the actor builder will not recognize it.

In the dataset implementation, the constructor `DatasetViewFactory` is a callable that when called with the `constructor_args` given, will return a dataset view object that conforms to the `Dataset` interface in the original declaration.

You can also put the configuration dict in a separate JSON/YAML file. If you need to declare a type (e.g. role class) or a function, or any other Python object in the configuration file, specify its fully qualified name, and RoleML will automatically find and load the corresponding class, function, etc. For example, you can specify `"roleml.kits.workload.DatasetViewFactory"` as the dataset's constructor.

> Using a configuration file is the most common way to configure a role. Therefore, for maximum compatibility, the parameters in the `__init__` method of your roles as well as their workload elements should only accept basic data types such as string, integer, or boolean.

Now we put this dict in the configuration of an actor, which also specifies the name and network address of the actor, as well as the messaging components to be used.

In [None]:
from roleml.extensions.messaging.invokers.requests import RequestsProcedureInvoker
from roleml.extensions.messaging.providers.flask import FlaskProcedureProvider

In [None]:
conf_client = {
    'name': 'client1',
    'address': '127.0.0.1:5001',
    # Note that network messaging will be disabled when you don't specify the components to use
    'procedure_invoker': RequestsProcedureInvoker,
    'procedure_provider': FlaskProcedureProvider,
    'roles': {
        'trainer': conf_trainer
    },
    'contacts': {
        # network addresses of other nodes visible to the current node
        'server': '127.0.0.1:5000'
    },
    'log_file_path': "",    # disable logging to file
}   # type: rml.ActorBootstrapSpec

Now we are ready to build the actor:

In [None]:
builder = rml.ActorBuilder()
builder.load_config(conf_client)
actor = builder.build()

... and four more for the remaining training set files:

In [None]:
client_profiles = (
    ('client2', '127.0.0.1:5002'),
    ('client3', '127.0.0.1:5003'),
    ('client4', '127.0.0.1:5004'),
    ('client5', '127.0.0.1:5005'),
)
actors = [actor]
for i, (client_name, address) in enumerate(client_profiles):
    conf_client['name'] = client_name
    conf_client['address'] = address
    conf_client['roles']['trainer']['impl']['dataset']['constructor_args']['dataset']['options']['index'] = i + 1  # type: ignore
    builder = rml.ActorBuilder()
    builder.load_config(conf_client)
    actors.append(builder.build())

It looks ugly to modify configurations like this. Good news is that when you need to conduct an experiment and you want to start a batch of roles, RoleML provides a suite to help you quickly configure multiple actors whose configurations may be slightly different.

The role configuration for the server is slightly longer, given that it has four roles:

In [None]:
conf_server_roles = {
    'aggregator': {
        'class': CollectiveAggregator,
        'impl': {
            'merger': {
                'class': 'roleml.library.workload.util.collections.merger.torch.TorchStateDictAverager'
            }
        }
    },
    'client-selector': {
        'class': ClientSelector
    },
    'configurator': {
        'class': ClientConfigurator
    },
    'coordinator': {
        'class': Coordinator,
        'impl': {
            'model': {
                'class': MyLeNet5RGBModel
            },
            'dataset': {
                'constructor': DatasetViewFactory,
                'constructor_args': {
                    'dataset': {
                        'type': MyCiFar10Dataset,
                        'options': {
                            'root': DATASET_ROOT,
                            'part': 'test'
                        }
                    },
                    'converters': [transform_torch],
                    'sampler': 'sequential',
                    'batch_size': 32,
                    'combiner': combine
                },
            }
        }
    }
}   # type: dict[str, rml.RoleSpec]

Now you might ask: does it mean that whenever I need to modify even just a little bit of the configuration, I'll need to make a complete copy of it first?

The answer is no, you don't need to. RoleML extends the YAML format to support file inclusion, so that when using the YAML format, you can split the configuration into several files and only modify that specific file if you just need to change a small part of it.

Let's finish the server's configuration:

In [None]:
from roleml.core.actor.group.impl.threaded import ThreadedCollectiveImplementor


conf_server = {
    'name': 'server',
    'address': '127.0.0.1:5000',
    'procedure_invoker': RequestsProcedureInvoker,
    'procedure_provider': FlaskProcedureProvider,
    'collective_implementor': ThreadedCollectiveImplementor,
    'roles': conf_server_roles,     # type: ignore
    'relationships': {
        'trainer': [
            "client1/trainer",
            "client2/trainer",
            "client3/trainer",
            "client4/trainer",
            "client5/trainer"
        ]
    },
    "contacts": {
        'client1': '127.0.0.1:5001',
        'client2': '127.0.0.1:5002',
        'client3': '127.0.0.1:5003',
        'client4': '127.0.0.1:5004',
        'client5': '127.0.0.1:5005',
    },
    'log_file_path': "",    # disable logging to file
}   # type: rml.ActorBootstrapSpec

Note that our Coordinator role includes references to several relationships, such as `client-selector` and `configurator`. However, these relationships are not configured in the above configuration dict. In fact, they can be omitted if the target roles are deployed on the same actor with the same name as the corresponding relationship. For example, RoleML will try to find a local role named `client-selector` when calling services on a role belonging to the relationship `client-selector`, given that the relationship does not contain any role.

> This design is mainly for facilitating the deployment of multiple roles on a single node. Without explicit configuration, you should never treat the `client-selector` role as a member of the relationship with the same name.

Seems like we can build the server actor now... But wait!

Among the roles we just defined, Coordinator is the one that controls the whole Federated Learning process, and we have just wrapped the workflow into a task channel. So how do we call it?

Our recommended solution is to define another role that serve as the control plane and call that task to formally start a FL run. We will call this role _Conductor_. With its help, specific arguments for a run (e.g. number of rounds) can be first written into the Conductor's configuration, then passed to its `__init__` method and ultimately to the Coordinator's task handler.

In [None]:
from roleml.kits.interfaces import Runnable


class Conductor(rml.Role, Runnable):

    def __init__(self, num_rounds: int, select_ratio: float):
        super().__init__()  # don't forget to call the super when overriding the __init__ method
        self.num_rounds = num_rounds
        self.select_ratio = select_ratio
    
    def run(self):
        self.call_task('coordinator', 'run', args={
            'num_rounds': self.num_rounds, 'select_ratio': self.select_ratio
            }).result()

Separating the control plane allows us to switch to real-world deployment without modifying the existing source code. Note that the `Conductor` class implements a `Runnable` interface. When such a role is deployed to an actor, its `run()` method will automatically be executed in a separate thread.

Now we need to add contact information to the server node so that when the Coordinator is called, it will know that it is the Conductor who is calling (otherwise an error will be raised at runtime). We assume it will be running at 127.0.0.1:4000:

In [None]:
conf_server['contacts']['conductor'] = '127.0.0.1:4000'     # type: ignore

And we can finally build the server actor:

In [None]:
builder = rml.ActorBuilder()
builder.load_config(conf_server)
actor_server = builder.build()

Finally, another actor for the experiment conductor:

In [None]:
conf_run = {
    'num_rounds': 10,
    'select_ratio': 0.4,    # each round selects 2/5 clients
}

In [None]:
conf_conductor = {
    'name': 'conductor',
    'address': '127.0.0.1:4000',
    'procedure_invoker': RequestsProcedureInvoker,
    'procedure_provider': FlaskProcedureProvider,
    'roles': {
        'conductor': {
            'class': Conductor,
            'options': conf_run
        }
    },
    'relationships': {
        'coordinator': ['server/coordinator']
    },
    'contacts': {
        'server': '127.0.0.1:5000'
    },
    'log_file_path': "",    # disable logging to file
}   # type: rml.ActorBootstrapSpec

In [None]:
builder = rml.ActorBuilder()
builder.load_config(conf_conductor)
actor_conductor = builder.build()

### Make Actors Work

Now we are finally ready to run a Federated Learning application. At this stage, all the actors created above should have been initialized with the role(s) assigned to them, respectively. We can start them by executing their `run()` methods:

In [None]:
from concurrent.futures import ThreadPoolExecutor


executor = ThreadPoolExecutor()

for actor in actors:
    executor.submit(actor.run)
executor.submit(actor_server.run)

When you see in the output that the actors and roles are ready, it's time to start the Conductor to initiate the FL run. You may see the outputs from the Coordinator and other roles in previous cells.

In [None]:
executor.submit(actor_conductor.run)

If everything goes fine, the Coordinator should log that "FL is done". After that, we can stop the actors. To stop an actor gracefully, invoke the `stop()` method.

> When using the actor running scripts provided by RoleML, the `stop()` method will be automatically called on Ctrl+C.

In [None]:
for actor in actors:
    executor.submit(actor.stop)     # you may need to wait for a couple of seconds before the heartbeat thread stops
executor.submit(actor_server.stop)
executor.submit(actor_conductor.stop)
executor.shutdown()

Finally, if you don't need the dataset downloaded in this tutorial, you can remove it from your disk:

In [None]:
! rm -rf {DATASET_ROOT}

Done!

## What's Next?

You are now ready to run a distributed learning application developed with RoleML. Try to apply FedAvg, or any other architecture to train different models and datasets.

Meanwhile, **Distributed edge ML is more than Federated Learning.** _Gossip Learning_ is widely considered as an alternative to FL with a different topology and communication strategy. Another architecture named _E-Tree Learning_ leverages a multi-level tree structure to support decentralized and localized model aggregation to maximize the utilization of edge devices and reduce communication costs. RoleML provides example implementations for these architectures in the source code archive, while you can also use RoleML to implement many more architectures to address the challenges in edge environments, including heterogeneity and instability.

Alternatively, you can try to modify this example to:

* make it more modular
* add new features you wish
* better consider thread safety (e.g. try to use the `get_relationship_unsafe()` API mentioned)
* ...

In a word, try anything you want with RoleML!