##### Copyright 2020 The TensorFlow Authors.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/federated/tutorials/building_your_own_federated_learning_algorithm"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/federated/blob/main/docs/tutorials/building_your_own_federated_learning_algorithm.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/federated/blob/main/docs/tutorials/building_your_own_federated_learning_algorithm.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
  <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/federated/docs/tutorials/building_your_own_federated_learning_algorithm.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

## Before we start

Before we start, please run the following to make sure that your environment is
correctly setup. If you don't see a greeting, please refer to the
[Installation](../install.md) guide for instructions. 

In [2]:
#@test {"skip": true}
# !pip install --quiet --upgrade tensorflow-federated-nightly
# !pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()

In [3]:
import tensorflow as tf
import tensorflow_federated as tff

2022-02-23 00:45:17.127155: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


**NOTE**: This colab has been verified to work with the [latest released version](https://github.com/tensorflow/federated#compatibility) of the `tensorflow_federated` pip package, but the Tensorflow Federated project is still in pre-release development and may not work on `main`.

# Building Your Own Federated Learning Algorithm

In the [image classification](federated_learning_for_image_classification.ipynb) and
[text generation](federated_learning_for_text_generation.ipynb) tutorials, we learned how to set up model and data pipelines for Federated Learning (FL), and performed federated training via the `tff.learning` API layer of TFF.

This is only the tip of the iceberg when it comes to FL research. In this tutorial, we discuss how to implement federated learning algorithms *without* deferring to the `tff.learning` API. We aim to accomplish the following:

**Goals:**


*   Understand the general structure of federated learning algorithms.
*   Explore the *Federated Core* of TFF.
*   Use the Federated Core to implement Federated Averaging directly.

While this tutorial is self-contained, we recommend first reading the [image classification](federated_learning_for_image_classification.ipynb) and
[text generation](federated_learning_for_text_generation.ipynb) tutorials.


## Preparing the input data
We first load and preprocess the EMNIST dataset included in TFF. For more details, see the [image classification](federated_learning_for_image_classification.ipynb) tutorial.

In [4]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

2022-02-23 00:45:33.990938: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2022-02-23 00:45:34.011684: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-23 00:45:34.011749: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: node094
2022-02-23 00:45:34.011771: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: node094
2022-02-23 00:45:34.011909: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2022-02-23 00:45:34.011986: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2022-02-23 00:45:34.012004: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2022-02-23 00:45:34.015154: I tensorflow/core/platform/cpu_feature_g

In order to feed the dataset into our model, we flatten the data, and convert each example into a tuple of the form `(flattened_image_vector, label)`.

In [5]:
NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

We now select a small number of clients, and apply the preprocessing above to their datasets.

In [234]:
client_ids = sorted(emnist_train.client_ids)[:NUM_CLIENTS]
federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]


class ConvertFedData:
    def __init__(self, data, client_id):
        self.data = data
        self.client_id = client_id


federated_train_data_with_clients_ids = []
for l in range(len(federated_train_data)):
    print(l)
    federated_train_data_with_clients_ids.append(ConvertFedData(federated_train_data[l], l))

0
1
2
3
4
5
6
7
8
9


In [359]:
print(emnist_train.client_ids[:NUM_CLIENTS])
print(type(federated_train_data))


print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print("\n")
print("\n")
print('First dataset: {d}'.format(d=federated_train_data))
print("\n")
print('First dataset: {d}'.format(d=federated_train_data[0]))

['f0000_14', 'f0001_41', 'f0005_26', 'f0006_12', 'f0008_45', 'f0011_13', 'f0014_19', 'f0016_39', 'f0017_07', 'f0022_10']
<class 'list'>
Number of client datasets: 10




First dataset: [<MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>, <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>]


First dataset: <MapDataset shape

In [236]:
print(emnist_train.client_ids[:NUM_CLIENTS])
print(type(federated_train_data_with_clients_ids))
print(federated_train_data_with_clients_ids[0].client_id)


print('Number of client datasets: {l}'.format(l=len(federated_train_data_with_clients_ids)))
print('First dataset: {d}'.format(d=federated_train_data_with_clients_ids[0].data))
print('First dataset: {d}'.format(d=federated_train_data_with_clients_ids[0]))
print('First dataset: {d}'.format(d=emnist_train))

['f0000_14', 'f0001_41', 'f0005_26', 'f0006_12', 'f0008_45', 'f0011_13', 'f0014_19', 'f0016_39', 'f0017_07', 'f0022_10']
<class 'list'>
0
Number of client datasets: 10
First dataset: <MapDataset shapes: ((None, 784), (None, 1)), types: (tf.float32, tf.int32)>
First dataset: <__main__.ConvertFedData object at 0x2aac98cdf2b0>
First dataset: <tensorflow_federated.python.simulation.datasets.serializable_client_data.PreprocessSerializableClientData object at 0x2aaab7bc2040>


In [24]:
i=1
for batch in federated_train_data:
    print(i)
    i = i+1

1
2
3
4
5
6
7
8
9
10


In [169]:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS), tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures, client_additions):
  return tff.federated_mean(client_temperatures)

str(get_average_temperature.type_signature)
print(get_average_temperature([68.5, 70.3, 69.8], [1, 1, 1]))

69.53334


## Preparing the model

We use the same model as in the [image classification](federated_learning_for_image_classification.ipynb) tutorial. This model (implemented via `tf.keras`) has a single hidden layer, followed by a softmax layer.

In [237]:
def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])

In order to use this model in TFF, we wrap the Keras model as a [`tff.learning.Model`](https://www.tensorflow.org/federated/api_docs/python/tff/learning/Model). This allows us to perform the model's [forward pass](https://www.tensorflow.org/federated/api_docs/python/tff/learning/Model#forward_pass) within TFF, and [extract model outputs](https://www.tensorflow.org/federated/api_docs/python/tff/learning/Model#report_local_outputs). For more details, also see the [image classification](federated_learning_for_image_classification.ipynb) tutorial.

In [180]:
def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

While we used `tf.keras` to create a `tff.learning.Model`, TFF supports much more general models. These models have the following relevant attributes capturing the model weights:

*   `trainable_variables`: An iterable of the tensors corresponding to trainable layers.
*   `non_trainable_variables`: An iterable of the tensors corresponding to non-trainable layers.

For our purposes, we will only use the `trainable_variables`. (as our model only has those!).

# Building your own Federated Learning algorithm

While the `tff.learning` API allows one to create many variants of Federated Averaging, there are other federated algorithms that do not fit neatly into this framework. For example, you may want to add regularization, clipping, or more complicated algorithms such as [federated GAN training](https://github.com/tensorflow/federated/tree/main/tensorflow_federated/python/research/gans). You may also be instead be interested in [federated analytics](https://ai.googleblog.com/2020/05/federated-analytics-collaborative-data.html).

For these more advanced algorithms, we'll have to write our own custom algorithm using TFF. In many cases, federated algorithms have 4 main components:

1. A server-to-client broadcast step.
2. A local client update step.
3. A client-to-server upload step.
4. A server update step.

In TFF, we generally represent federated algorithms as a [`tff.templates.IterativeProcess`](https://www.tensorflow.org/federated/api_docs/python/tff/templates/IterativeProcess) (which we refer to as just an `IterativeProcess` throughout). This is a class that contains `initialize` and `next` functions. Here, `initialize` is used to initialize the server, and `next` will perform one communication round of the federated algorithm. Let's write a skeleton of what our iterative process for FedAvg should look like.

First, we have an initialize function that simply creates a `tff.learning.Model`, and returns its trainable weights.

In [181]:
def initialize_fn():
  model = model_fn()
  return model.trainable_variables

This function looks good, but as we will see later, we will need to make a small modification to make it a "TFF computation".

We also want to sketch the `next_fn`.

In [281]:
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)
    
  print('First dataset: {d}'.format(d=federated_dataset))

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

We'll focus on implementing these four components separately. We first focus on the parts that can be implemented in pure TensorFlow, namely the client and server update steps.


## TensorFlow Blocks 

### Client update

We will use our `tff.learning.Model` to do client training in essentially the same way you would train a TensorFlow model. In particular, we will use `tf.GradientTape` to compute the gradient on batches of data, then apply these gradient using a `client_optimizer`. We focus only on the trainable weights.


In [324]:
@tf.function
def client_update(model, dataset, server_weights, client_optimizer, server_state, client_states):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)


#   num_clients = len(dataset)
#   print("Num clients: ", num_clients)
  new_client_states = tf.zeros_like(client_states)

  # Use the client_optimizer to update the local model.
  running_client_id = 0
  client_updated_states = client_weights
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)
    

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)
#     client_updated_states[running_client_id] = new_client_states - server_state + (server_weights - client_weights)/0.01
    
    running_client_id = running_client_id + 1

  return client_weights, new_client_states
#   return client_weights, client_updated_states

In [280]:
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

### Server Update

The server update for FedAvg is simpler than the client update. We will implement "vanilla" federated averaging, in which we simply replace the server model weights by the average of the client model weights. Again, we only focus on the trainable weights.

In [325]:
@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

The snippet could be simplified by simply returning the `mean_client_weights`. However, more advanced implementations of Federated Averaging use `mean_client_weights` with more sophisticated techniques, such as momentum or adaptivity.

**Challenge**: Implement a version of `server_update` that updates the server weights to be the midpoint of model_weights and mean_client_weights. (Note: This kind of "midpoint" approach is analogous to recent work on the [Lookahead optimizer](https://arxiv.org/abs/1907.08610)!).

So far, we've only written pure TensorFlow code. This is by design, as TFF allows you to use much of the TensorFlow code you're already familiar with. However, now we have to specify the **orchestration logic**, that is, the logic that dictates what the server broadcasts to the client, and what the client uploads to the server.

This will require the *Federated Core* of TFF.

# Introduction to the Federated Core

The Federated Core (FC) is a set of lower-level interfaces that serve as the foundation for the `tff.learning` API. However, these interfaces are not limited to learning. In fact, they can be used for analytics and many other computations over distributed data.

At a high-level, the federated core is a development environment that enables compactly expressed program logic to combine TensorFlow code with distributed communication operators (such as distributed sums and broadcasts). The goal is to give researchers and practitioners expliict control over the distributed communication in their systems, without requiring system implementation details (such as specifying point-to-point network message exchanges).

One key point is that TFF is designed for privacy-preservation. Therefore, it allows explicit control over where data resides, to prevent unwanted accumulation of data at the centralized server location.

## Federated data

A key concept in TFF is "federated data", which refers to a collection of data items hosted across a group of devices in a distributed system (eg. client datasets, or the server model weights). We model the entire collection of data items across all devices as a single *federated value*.

For example, suppose we have client devices that each have a float representing the temperature of a sensor. We could represent it as a *federated float* by

In [326]:
federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

Federated types are specified by a type `T` of its member constituents (eg. `tf.float32`) and a group `G` of devices. We will focus on the cases where `G` is either `tff.CLIENTS` or `tff.SERVER`. Such a federated type is represented as `{T}@G`, as shown below.

In [327]:
str(federated_float_on_clients)

'{float32}@CLIENTS'

Why do we care so much about placements? A key goal of TFF is to enable writing code that could be deployed on a real distributed system. This means that it is vital to reason about which subsets of devices execute which code, and where different pieces of data reside.

TFF focuses on three things: *data*, where the data is *placed*, and how the data is being *transformed*. The first two are encapsulated in federated types, while the last is encapsulated in *federated computations*.

## Federated computations

TFF is a strongly-typed functional programming environment whose basic units are *federated computations*. These are pieces of logic that accept federated values as input, and return federated values as output.

For example, suppose we wanted to average the temperatures on our client sensors. We could define the following (using our federated float):

In [328]:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

You might ask, how is this different from the `tf.function` decorator in TensorFlow? The key answer is that the code generated by `tff.federated_computation` is neither TensorFlow nor Python code; It is a specification of a distributed system in an internal platform-independent *glue language*.

While this may sound complicated, you can think of TFF computations as functions with well-defined type signatures. These type signatures can be directly queried.

In [329]:
str(get_average_temperature.type_signature)

'({float32}@CLIENTS -> float32@SERVER)'

This `tff.federated_computation` accepts arguments of federated type `{float32}@CLIENTS`, and returns values of federated type `{float32}@SERVER`. Federated computations may also go from server to client, from client to client, or from server to server. Federated computations can also be composed like normal functions, as long as their type signatures match up.

To support development, TFF allows you to invoke a `tff.federated_computation` as a Python function. For example, we can call

In [330]:
get_average_temperature([68.5, 70.3, 69.8])

69.53334

## Non-eager computations and TensorFlow

There are two key restrictions to be aware of. First, when the Python interpreter encounters a `tff.federated_computation` decorator, the function is traced once and serialized for future use. Due to the decentralized nature of Federated Learning, this future usage may occur elsewhere, such as a remote execution environment. Therefore, TFF computations are fundamentally *non-eager*. This behavior is somewhat analogous to that of the [`tf.function`](https://www.tensorflow.org/api_docs/python/tf/function) decorator in TensorFlow.

Second, a federated computation can only consist of federated operators (such as `tff.federated_mean`), they cannot contain TensorFlow operations. TensorFlow code must be confined to blocks decorated with `tff.tf_computation`. Most ordinary TensorFlow code can be directly decorated, such as the following function that takes a number and adds `0.5` to it.

In [331]:
@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

These also have type signatures, but *without placements*. For example, we can call

In [332]:
str(add_half.type_signature)

'(float32 -> float32)'

Here we see an important difference between `tff.federated_computation` and `tff.tf_computation`. The former has explicit placements, while the latter does not.

We can use `tff.tf_computation` blocks in federated computations by specifying placements. Let's create a function that adds half, but only to federated floats at the clients. We can do this by using `tff.federated_map`, which applies a given `tff.tf_computation`, while preserving the placement.

In [333]:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

This function is almost identical to `add_half`, except that it only accepts values with placement at `tff.CLIENTS`, and returns values with the same placement. We can see this in its type signature:

In [334]:
str(add_half_on_clients.type_signature)

'({float32}@CLIENTS -> {float32}@CLIENTS)'

In summary:

*   TFF operates on federated values.
*   Each federated value has a *federated type*, with a *type* (eg. `tf.float32`) and a *placement* (eg. `tff.CLIENTS`).
*   Federated values can be transformed using *federated computations*, which must be decorated with `tff.federated_computation` and a federated type signature.
*   TensorFlow code must be contained in blocks with `tff.tf_computation` decorators. 
*   These blocks can then be incorporated into federated computations.


# Building your own Federated Learning algorithm, revisited

Now that we've gotten a glimpse of the Federated Core, we can build our own federated learning algorithm. Remember that above, we defined an `initialize_fn` and `next_fn` for our algorithm. The `next_fn` will make use of the `client_update` and `server_update` we defined using pure TensorFlow code.

However, in order to make our algorithm a federated computation, we will need both the `next_fn` and `initialize_fn` to each be a `tff.federated_computation`.

## TensorFlow Federated blocks 

### Creating the initialization computation

The initialize function will be quite simple: We will create a model using `model_fn`. However, remember that we must separate out our TensorFlow code using `tff.tf_computation`.

In [335]:
@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables





We can then pass this directly into a federated computation using `tff.federated_value`.

In [336]:
@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

### Creating the `next_fn`

We now use our client and server update code to write the actual algorithm. We will first turn our `client_update` into a `tff.tf_computation` that accepts a client datasets and server weights, and outputs an updated client weights tensor.

We will need the corresponding types to properly decorate our function. Luckily, the type of the server weights can be extracted directly from our model.

In [337]:
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)





In [338]:
# tf_dataset_type = federated_train_data_with_clients_ids[0]

Let's look at the dataset type signature. Remember that we took 28 by 28 images (with integer labels) and flattened them.

In [339]:
str(tf_dataset_type)

'<float32[?,784],int32[?,1]>*'

We can also extract the model weights type by using our `server_init` function above.

In [340]:
model_weights_type = server_init.type_signature.result

Examining the type signature, we'll be able to see the architecture of our model!

In [341]:
str(model_weights_type)

'<float32[784,10],float32[10]>'

In [342]:
# Scaffold updated

server_state_type = tf.constant(0.0).dtype
client_states_type = tf.zeros(10, tf.float32).dtype

We can now create our `tff.tf_computation` for the client update.

In [221]:
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)





In [343]:
# SCAFFOLD UPDATED 

@tff.tf_computation(tf_dataset_type, model_weights_type, server_state_type, client_states_type)
def client_update_fn(tf_dataset, server_weights, server_state, client_states):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer, server_state, client_states)





The `tff.tf_computation` version of the server update can be defined in a similar way, using types we've already extracted.

In [344]:
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)





Last, but not least, we need to create the `tff.federated_computation` that brings this all together. This function will accept two *federated values*, one corresponding to the server weights (with placement `tff.SERVER`), and the other corresponding to the client datasets (with placement `tff.CLIENTS`).

Note that both these types were defined above! We simply need to give them the proper placement using `tff.FederatedType`.

In [345]:
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

In [346]:
# Scaffold UPDATED

federated_server_state_type = tff.FederatedType(server_state_type, tff.SERVER)
federated_client_states_type = tff.FederatedType(client_states_type, tff.CLIENTS)
print(federated_server_state_type)
print(federated_client_states_type)

float32@SERVER
{float32}@CLIENTS


Remember the 4 elements of an FL algorithm?

1. A server-to-client broadcast step.
2. A local client update step.
3. A client-to-server upload step.
4. A server update step.

Now that we've built up the above, each part can be compactly represented as a single line of TFF code. This simplicity is why we had to take extra care to specify things such as federated types!

In [226]:
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))
  
  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

In [347]:
#   def run_one_round_public(server_state, federated_dataset, client_states):
#     """Orchestration logic for one round of computation.
#     Args:
#       server_state: A `ServerState`.
#       federated_dataset: A federated `tf.data.Dataset` with placement
#         `tff.CLIENTS`.
#     Returns:
#       A tuple of updated `ServerState` and `tf.Tensor` of average loss.
#     """
#     server_message = tff.federated_map(server_message_fn, server_state)
#     server_message_at_client = tff.federated_broadcast(server_message)

#     client_outputs = tff.federated_map(
#         client_update_fn, (federated_dataset, client_states, server_message_at_client))

#     # Model deltas are equally weighted in DP.
#     round_model_delta = tff.federated_mean(client_outputs.weights_delta)
#     updated_client_states = client_outputs.client_state

#     sum_control_weights_delta = tff.federated_sum(
#         client_outputs.control_weights_delta)
#     server_state = tff.federated_map(
#         public_server_update_fn,
#         (server_state, round_model_delta, sum_control_weights_delta))

#     round_loss_metric = tff.federated_mean(client_outputs.model_output)

#     return server_state, round_loss_metric, updated_client_states


@tff.federated_computation(federated_server_type, federated_dataset_type, federated_server_state_type, federated_client_states_type)
def next_fn(server_weights, federated_dataset, server_state, client_states):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)
  server_state_at_client = tff.federated_broadcast(server_state)

  # Each client computes their updated weights.
  client_weights, client_updated_states = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client, server_state_at_client, client_states))
  
  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)
  # mean_client_state = tff.federated_mean(client_.....)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

In [None]:
class ScaffoldOptimizer(keras.optimizers.Optimizer):
    def __init__(self, learning_rate=0.01, name="ScaffoldOptimizer", **kwargs):
        """Call super().__init__() and use _set_hyper() to store hyperparameters"""
        super().__init__(name, **kwargs)
        self._set_hyper("learning_rate", kwargs.get("lr", learning_rate)) # handle lr=learning_rate
        self._is_first = True
    
    def _create_slots(self, var_list):
        """For each model variable, create the optimizer variable associated with it.
        TensorFlow calls these optimizer variables "slots".
        For momentum optimization, we need one momentum slot per model variable.
        """
        for var in var_list:
            self.add_slot(var, "pv") #previous variable i.e. weight or bias
        for var in var_list:
            self.add_slot(var, "pg") #previous gradient


    @tf.function
    def _resource_apply_dense(self, grad, var, server_controls, client_controls):
        """Update the slots and perform one optimization step for one model variable
        """
        var_dtype = var.dtype.base_dtype
        lr_t = self._decayed_lr(var_dtype) # handle learning rate decay
        new_var_m = var - (grad - c_i + c) * lr_t
        pv_var = self.get_slot(var, "pv")
        pg_var = self.get_slot(var, "pg")
        
        if self._is_first:
            self._is_first = False
            new_var = new_var_m
        else:
            cond = grad*pg_var >= 0
            print(cond)
            avg_weights = (pv_var + var)/2.0
            new_var = tf.where(cond, new_var_m, avg_weights)
        pv_var.assign(var)
        pg_var.assign(grad)
        var.assign(new_var)

    def _resource_apply_sparse(self, grad, var):
        raise NotImplementedError

    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "learning_rate": self._serialize_hyperparameter("learning_rate"),
        }

We now have a `tff.federated_computation` for both the algorithm initialization, and for running one step of the algorithm. To finish our algorithm, we pass these into `tff.templates.IterativeProcess`.

In [348]:
federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

In [50]:
federated_algorithm_public = build_scaffold_averaging_process(
    model_fn,
    num_clients=10,
    dp_clip_norm=1.0,
    server_optimizer_fn=DEFAULT_SERVER_OPTIMIZER_FN,
    client_optimizer_fn=DEFAULT_CLIENT_OPTIMIZER_FN,
    use_simulation_loop=True,
    update_type='public')

federated_algorithm_private = build_scaffold_averaging_process(
    model_fn,
    num_clients=10,
    dp_clip_norm=1.0,
    server_optimizer_fn=DEFAULT_SERVER_OPTIMIZER_FN,
    client_optimizer_fn=DEFAULT_CLIENT_OPTIMIZER_FN,
    use_simulation_loop=True,
    update_type='private')













































Let's look at the *type signature* of the `initialize` and `next` functions of our iterative process.

In [349]:
str(federated_algorithm.initialize.type_signature)

'( -> <float32[784,10],float32[10]>@SERVER)'

This reflects the fact that `federated_algorithm.initialize` is a no-arg function that returns a single-layer model (with a 784-by-10 weight matrix, and 10 bias units).

In [350]:
str(federated_algorithm.next.type_signature)

'(<server_weights=<float32[784,10],float32[10]>@SERVER,federated_dataset={<float32[?,784],int32[?,1]>*}@CLIENTS,server_state=float32@SERVER,client_states={float32}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

Here, we see that `federated_algorithm.next` accepts a server model and client data, and returns an updated server model.

## Evaluating the algorithm

Let's run a few rounds, and see how the loss changes. First, we will define an evaluation function using the *centralized* approach discussed in the second tutorial.

We first create a centralized evaluation dataset, and then apply the same preprocessing we used for the training data.

In [351]:
central_emnist_test = emnist_test.create_tf_dataset_from_all_clients()
central_emnist_test = preprocess(central_emnist_test)

In [72]:
# # print('Number of client datasets: {l}'.format(l=len(central_emnist_test)))
# print('First dataset: {d}'.format(d=central_emnist_test[0]))

Next, we write a function that accepts a server state, and uses Keras to evaluate on the test dataset. If you're familiar with `tf.Keras`, this will all look familiar, though note the use of `set_weights`!

In [352]:
def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

Now, let's initialize our algorithm and evaluate on the test set.

In [353]:
server_state = federated_algorithm.initialize()
# print(server_state)
evaluate(server_state)








In [66]:
server_state = federated_algorithm_public.initialize()
# print(server_state.model.trainable)
evaluate(server_state.model.trainable)


[array([[-0.0968272 , -0.02746578, -0.03778507, ...,  0.04118082,
         0.09491801, -0.00668141],
       [ 0.02177309,  0.0265556 , -0.03350093, ...,  0.01478878,
        -0.06294745, -0.07656068],
       [ 0.05641175, -0.01847302,  0.07688204, ...,  0.01760333,
         0.00447983,  0.03216938],
       ...,
       [ 0.00653809, -0.02494238,  0.0393483 , ...,  0.03873714,
         0.0476413 ,  0.05248557],
       [-0.03213382,  0.00638541,  0.00612707, ..., -0.0424929 ,
        -0.08011755,  0.0508595 ],
       [-0.05420608,  0.05183837, -0.00588615, ...,  0.04451389,
         0.04831592,  0.08302379]], dtype=float32), array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], dtype=float32)]






Let's train for a few rounds and see if anything changes.

In [76]:
# round_num = tf.cast(0, tf.int32)

# for round in range(15):
#   # 1. Update the model weights using the private clients and using the old control variates
#   state_w_weights, _, client_states = federated_algorithm_private.next(
#       server_state, federated_train_data,
#       client_states)

#   # 2. Update the control variates using the public clients
#   state_w_controls, _, client_states = federated_algorithm_public.next(
#       server_state, federated_train_data,
#       client_states)

#   # 3. Merge updates from both states into original state variable
#   server_state = tff.structure.update_struct(
#     server_state,
#     model = state_w_weights.model,
#     optimizer_state = state_w_weights.optimizer_state,
#     round_num=round_num + tf.cast(1, tf.int32),
#     mean_control_weights=state_w_controls.mean_control_weights)

In [106]:
evaluate(server_state.model.trainable)

AttributeError: 'list' object has no attribute 'model'

In [354]:
# for round in range(10):
#   server_state = federated_algorithm.next(server_state, federated_train_data)
server_states = 0
length = len(federated_train_data)
print(length)

client_states = [0.0]*length
print(client_states)

for round in range(10):
  print(round)
  server_state = federated_algorithm.next(server_state, federated_train_data, server_states, client_states)

10
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
0
1
2
3
4
5
6
7
8
9


In [233]:
federated_algorithm.next(server_state, federated_train_data)

[array([[-0.09994101, -0.02816051, -0.03737728, ...,  0.0424078 ,
          0.09573044, -0.00639346],
        [ 0.01865929,  0.02586088, -0.03309314, ...,  0.01601576,
         -0.06213504, -0.07627273],
        [ 0.05329795, -0.01916775,  0.07728984, ...,  0.01883031,
          0.00529224,  0.03245732],
        ...,
        [ 0.00342429, -0.02563711,  0.0397561 , ...,  0.03996412,
          0.04845372,  0.05277352],
        [-0.03524762,  0.00569069,  0.00653488, ..., -0.04126592,
         -0.07930513,  0.05114744],
        [-0.05731988,  0.05114365, -0.00547834, ...,  0.04574087,
          0.04912834,  0.08331173]], dtype=float32),
 array([-0.0031138 , -0.00069473,  0.0004078 , -0.00053501,  0.00369056,
        -0.00076896, -0.0013132 ,  0.00122698,  0.00081242,  0.00028795],
       dtype=float32)]

In [55]:
client_states = []

def client_init_fn(client_id):
    control_weights = tf.nest.map_structure(tf.zeros_like,
                                            server_state.model.trainable)
    return ClientState(
        client_id=client_id, control_weights=control_weights)

c_id=str(1)
for client_id in federated_train_data:
      client_state = client_init_fn(c_id)
      client_states.append(client_state)
      c_id = str(int(c_id) + 1)

# print(client_states)

for round in range(5):
  server_state, _, client_states = federated_algorithm_public.next(
      server_state, federated_train_data,
      client_states)


In [355]:
evaluate(server_state)
# evaluate(server_state.model.trainable)








We see a slight decrease in the loss function. While the jump is small, we've only performed 15 training rounds, and on a small subset of clients. To see better results, we may have to do hundreds if not thousands of rounds.

## Modifying our algorithm

At this point, let's stop and think about what we've accomplished. We've implemented Federated Averaging directly by combining pure TensorFlow code (for the client and server updates) with federated computations from the Federated Core of TFF.

To perform more sophisticted learning, we can simply alter what we have above. In particular, by editing the pure TF code above, we can change how the client performs training, or how the server updates its model.

**Challenge:** Add [gradient clipping](https://towardsdatascience.com/what-is-gradient-clipping-b8e815cdfb48) to the `client_update` function.


If we wanted to make larger changes, we could also have the server store and broadcast more data. For example, the server could also store the client learning rate, and make it decay over time! Note that this will require changes to the type signatures used in the `tff.tf_computation` calls above.

**Harder Challenge:** Implement Federated Averaging with learning rate decay on the clients.

At this point, you may begin to realize how much flexibility there is in what you can implement in this framework. For ideas (including the answer to the harder challenge above) you can see the source-code for [`tff.learning.build_federated_averaging_process`](https://www.tensorflow.org/federated/api_docs/python/tff/learning/build_federated_averaging_process), or check out various [research projects](https://github.com/google-research/federated) using TFF.