# # Workflow Interface 104: Working with Keras on CPU
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/snehal-das/openfl/blob/develop/openfl-tutorials/experimental/104_Keras_MNIST_with_CPU.ipynb)

## Training a CNN on CPU using the Workflow Interface and MNIST data.

The workflow interface is a way of orchestrating a federated learning experiment with OpenFL. The fundamental idea is to allow collaborators to train the model using the training data, while the aggregator is largely responsible for aggregating the model weights returned by the collaborators.

The experiment can be broken down into the following steps:
1. Installing pre-requisites: This includes OpenFL, Tensorflow, Keras and NumPy for this example.
2. Downloading the training and testing data.
3. Setting up the neural network for training.
4. Define the Aggregator, Collaborators.
5. Defining the Workflow - This forms the crux of the example, intending to demonstrate how the training gets split between the aggregator and collaborators.
6. Running the experiment and evaluating the model performance.

#### STEP#1: Install pre-requisites for the exercise, including OpenFL and Tensorflow.

In [None]:
#Install openfl and required packages for the workflow APIs to function
%pip install git+https://github.com/securefederatedai/openfl.git
%pip install -r workflow_interface_requirements.txt

#Install Tensorflow to access Keras
%pip install tensorflow==2.17

# Uncomment this if running in Google Colab and set USERNAME if running in docker container.
# %pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt
# import os
# os.environ["USERNAME"] = "colab"

#### STEP#2: Download testing and training data.

For this example, we rely on the load_data() API of MNIST which upon being called downloads a total of 70,000 images of handwritten digits - 60,000 for training and 10,000 of testing the neural network model.

For more details on the implementation, refer to: https://github.com/keras-team/keras/blob/master/keras/src/datasets/mnist.py#L10

In [None]:
import tensorflow as tf
import tensorflow.python.keras as keras
from keras.utils import to_categorical
from keras.datasets import mnist

nb_classes = 10
(X_train, y_train), (X_test, y_test) = mnist.load_data()
print("X_train original shape", X_train.shape)
print("y_train original shape", y_train.shape)

# It is important to make sure that all values are scaled to the range [0..1] before you 
# pass them to a neural network - it is the usual convention for data preparation, 
# and all default weight initializations in neural networks are designed to work with this range.
# To achieve this:
# - Covert the integer values [0...255] to float32 
# - Divide each pixel value by 255 to get values in the range [0...1]

X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255.0
X_test /= 255.0

print("Training matrix shape", X_train.shape)
print("Testing matrix shape", X_test.shape)

Y_train = to_categorical(y_train, nb_classes)
Y_test = to_categorical(y_test, nb_classes)

At this point, we have installed the necessary pre-requisites, imported required packages and downloaded the dataset.
Next we define the NN model, pre-process the data for learning and define helper functions for training.

#### STEP#3: Setup the Neural Network Model; define helper functions.

In [None]:
from keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D
from keras.models import Sequential
import numpy as np

model = Sequential([
    Conv2D(filters=32, kernel_size=(3, 3), activation="relu", input_shape=(28, 28, 1)),
    MaxPool2D(), 
    Flatten(), # Converts the multi-dimensional feature map into a one-dimensional vector.
    Dense(512, activation="relu"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.
    Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids relying on a specific neuron.
    Dense(512, activation="relu"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.
    Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids relying on a specific neuron.
    Dense(nb_classes, activation="softmax"), # To turn output vector into probability vector. To understand why, refer to the comment below.
])
# Because the output of a fully-connected layer is not normalized to be between 0 and 1, it cannot be thought of as probability. 
# Moreover, if we want outputs to be probabilities of different digits, they all need to add up to 1. 
# To turn output vectors into probability vector, a function called Softmax is often used as 
# the last activation function in a classification neural network. For example, softmax([−1,1,2])=[0.035,0.25,0.705].

model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
print(model.summary())

# Helper function to merge the model weights returned from each of the collaborators.
def FedAvg(models):
    new_model = models[0]
    state_dicts = [model.weights for model in models]
    state_dict = new_model.weights
    for idx, _ in enumerate(models[1].weights):
        state_dict[idx] = np.sum(np.array([state[idx]
                                 for state in state_dicts], dtype=object), axis=0) / len(models)
    new_model.set_weights(state_dict)
    return new_model

# Helper function to calculate the loss, accurancy of a given model. 
def inference(model, test_loader, batch_size):
    x_test, y_test = test_loader
    loss, accuracy = model.evaluate(
        x_test,
        y_test,
        batch_size=batch_size,
        verbose=0
    )
    accuracy_percentage = accuracy * 100
    print(f"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%")
    return accuracy

#### STEP#4: Initialize the Aggregator and Collaborators.

We import the `FLSpec`, `LocalRuntime`, and the aggregator, collaborator placement decorators.

- `FLSpec` – Defines the flow specification. User defined flows are subclasses of this.
- `Runtime` – Defines where the flow runs, infrastructure for task transitions (how information gets sent). The `LocalRuntime` runs the flow on a single node.
- `aggregator/collaborator` - placement decorators that define where the task will be assigned.

Edit collaborator_names to add/remove collaborators.

In [None]:
from openfl.experimental.interface import FLSpec, Aggregator, Collaborator
from openfl.experimental.runtime import LocalRuntime
from openfl.experimental.placement import aggregator, collaborator

agg = Aggregator()

collaborator_names = ["Seattle", "London"]

def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):
    from openfl.utilities.data_splitters import EqualNumPyDataSplitter
    train_splitter = EqualNumPyDataSplitter()
    test_splitter = EqualNumPyDataSplitter()

    X_train, y_train = train_dataset
    X_test, y_test = test_dataset

    train_idx = train_splitter.split(y_train, n_collaborators)
    valid_idx = test_splitter.split(y_test, n_collaborators)

    train_dataset = X_train[train_idx[index]], y_train[train_idx[index]]
    test_dataset = X_test[valid_idx[index]], y_test[valid_idx[index]]

    return {
        "train_loader": train_dataset, 
        "test_loader": test_dataset,
        "batch_size": batch_size
    }

# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
    collaborators.append(
        Collaborator(
            name=collaborator_name,
            num_cpus=1,
            num_gpus=0,
            private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
            n_collaborators=len(collaborator_names),
            index=idx,
            train_dataset=(X_train, Y_train),
            test_dataset=(X_test, Y_test),
            batch_size=64
        )
    )

local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend="ray")
print(f'Local runtime collaborators = {local_runtime.collaborators}')

#### STEP#5: Define the workflow needed to train the model using the data and participants.

Now we come to the flow definition. The OpenFL Workflow Interface adopts the conventions set by Metaflow, that every workflow begins with `start` and concludes with the `end` task. The aggregator begins with an optionally passed in model and optimizer. The aggregator begins the flow with the `start` task, where the list of collaborators is extracted from the runtime (`self.collaborators = self.runtime.collaborators`) and is then used as the list of participants to run the task listed in `self.next`, `aggregated_model_validation`. The model, optimizer, and anything that is not explicitly excluded from the next function will be passed from the `start` function on the aggregator to the `aggregated_model_validation` task on the collaborator. Where the tasks run is determined by the placement decorator that precedes each task definition (`@aggregator` or `@collaborator`). Once each of the collaborators (defined in the runtime) complete the `aggregated_model_validation` task, they pass their current state onto the `train` task, from `train` to `local_model_validation`, and then finally to `join` at the aggregator. It is in `join` that an average is taken of the model weights, and the next round can begin.

In [None]:
class KerasMNISTWorkflow(FLSpec):
    def __init__(self, model, rounds=3, **kwargs):
        super().__init__(**kwargs)
        self.model = model
        self.n_rounds = rounds
        self.current_round = 1

    @aggregator
    def start(self):
        self.collaborators = self.runtime.collaborators
        self.next(self.aggregated_model_validation, foreach='collaborators')

    @collaborator
    def aggregated_model_validation(self):
        print(f'Performing aggregated model validation for collaborator {self.input}')
        self.agg_validation_score = inference(self.model, self.test_loader, self.batch_size)
        print(f'{self.input} value of {self.agg_validation_score}')
        self.next(self.train)

    @collaborator
    def train(self):
        x_train, y_train = self.train_loader
        history = self.model.fit(
            x_train, y_train,
            batch_size=self.batch_size,
            epochs=1,
            verbose=1,
        )
        self.loss = history.history["loss"][0]
        self.next(self.local_model_validation)

    @collaborator
    def local_model_validation(self):
        self.local_validation_score = inference(self.model, self.test_loader, self.batch_size)
        print(
            f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')
        self.next(self.join)

    @aggregator
    def join(self, inputs):
        self.average_loss = sum(input.loss for input in inputs) / len(inputs)
        self.aggregated_model_accuracy = sum(
            input.agg_validation_score for input in inputs) / len(inputs)
        self.local_model_accuracy = sum(
            input.local_validation_score for input in inputs) / len(inputs)
        print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')
        print(f'Average training loss = {self.average_loss}')
        print(f'Average local model validation values = {self.local_model_accuracy}')
        print("Taking FedAvg of models of all collaborators")
        self.model = FedAvg([input.model for input in inputs])

        self.next(self.internal_loop)

    @aggregator
    def internal_loop(self):
        if self.current_round == self.n_rounds:
            self.next(self.end)
        else:
            self.current_round += 1
            self.next(self.aggregated_model_validation, foreach='collaborators')

    @aggregator
    def end(self):
        print("Reached the end of the training flow; the model is ready to use!")
        loss, accuracy = self.model.evaluate(
        X_test, Y_test,
        10000,
        verbose=1)
        accuracy_percentage = accuracy * 100
        print(f"Final Loss, Accuracy numbers: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%")

#### STEP#6: Call KerasMNISTWorkflow to train the model.

At this point we are ready to train the model with the dataset downloaded from MNIST. 

In [None]:
flflow = KerasMNISTWorkflow(model, rounds=3, checkpoint=True)
flflow.runtime = local_runtime
flflow.run()