##### Copyright 2020 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# TFF for Federated Learning Research: Model and Update Compression

**NOTE**: This colab has been verified to work with the [latest released version](https://github.com/tensorflow/federated#compatibility) of the `tensorflow_federated` pip package, but the Tensorflow Federated project is still in pre-release development and may not work on `master`.

In this tutorial, we use the [EMNIST](https://www.tensorflow.org/federated/api_docs/python/tff/simulation/datasets/emnist) dataset to demonstrate how to enable lossy compression algorithms to reduce communication cost in the Federated Averaging algorithm using the `tff.learning.build_federated_averaging_process` API and the [tensor_encoding](http://jakubkonecny.com/files/tensor_encoding.pdf) API. For more details on the Federated Averaging algorithm, see the paper [Communication-Efficient Learning of Deep Networks from Decentralized Data](https://arxiv.org/abs/1602.05629).

## Before we start

Before we start, please run the following to make sure that your environment is
correctly setup. If you don't see a greeting, please refer to the
[Installation](../install.md) guide for instructions.

In [None]:
#@title Install tensorflow_federated and load TensorBoard
#@test {"skip": true}
!pip install --quiet --upgrade tensorflow_federated
!pip install --quiet --upgrade nest_asyncio

%load_ext tensorboard

import nest_asyncio
nest_asyncio.apply()

In [None]:
import functools
import logging
import time
import warnings

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

# Ignore warning messages from TensorFlow and Python.
tf.get_logger().setLevel(logging.ERROR)
warnings.simplefilter("ignore")

np.random.seed(0)
tff.federated_computation(lambda: 'Hello, world!')()

## Preparing the input data
In this section we load and preprocess the EMNIST dataset similiar to the process in [Federated Learning for Image Classification](https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#preparing_the_input_data) tutorial. However there is a small difference in the `batch_format_fn` since the model in that tutorial expects an input with shape (-1, 784) and the model in this tutorial expects an input with shape (28, 28, 1). You should modify the `batch_format_fn` function corresponding to the input shape of your model.


In [None]:
NUM_CLIENTS = 10
NUM_EPOCHS = 1
BATCH_SIZE = 20
PREFETCH_BUFFER = 10
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
SHUFFLE_BUFFER = 418

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def preprocess_fn(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  def batch_format_fn(element):
    """Reshape a batch `pixels` and return the features as an `OrderedDict`."""
    return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

emnist_train = emnist_train.preprocess(preprocess_fn)
emnist_test = emnist_test.preprocess(preprocess_fn)

## Defining a model

Here we define a keras model based on the orginial FedAvg CNN, and then wrap the keras model in an instance of [tff.learning.Model](https://www.tensorflow.org/federated/api_docs/python/tff/learning/Model) so that it can be consumed by TFF.

### Define a keras model

Note that in [Federated Learning for Image Classification](https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#creating_a_model_with_keras) tutorial, the model is a simple one-layer neural network where in this tutorial, we define a slightly more complicated model.

In [None]:
def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

We can use [tf.keras.Model.summary()](https://www.tensorflow.org/api_docs/python/tf/keras/Model#summary) to see the network architecture. 

In [None]:
print(create_original_fedavg_cnn_model().summary())

### Define a TFF model function

Wrap the keras model in an instance of `tff.learning.Model`, note that we'll need a **function** which produces a model instead of simply a model directly. In addition, the function **cannot** just capture a pre-constructed model, it must create the model in the context that it is called. The reason is that TFF is designed to go to devices, and needs control over when resources are constructed so that they can be captured and packaged up.

In [None]:
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])
input_spec = example_dataset.element_spec


def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

## Run training and evaluation with metrics visualization

Now we are ready to construct a Federated Averaging algorithm and train and evaluate the defined model on EMNIST dataset.

### Build a training process
First we need to build a Federated Averaging algorithm using the [tff.learning.build_federated_averaging_process](https://www.tensorflow.org/federated/api_docs/python/tff/learning/build_federated_averaging_process) API.

In [None]:
federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

### Build a evaluation process

Then, we build a Federated evaluation process using the [tff.learning.build_federated_evaluation](https://www.tensorflow.org/federated/api_docs/python/tff/learning/build_federated_evaluation) API.

In [None]:
evaluation_process = tff.learning.build_federated_evaluation(tff_model_fn)

### Run training and evaluation process

Before the actual run, let's define some utility functions.

In [None]:
#@title Load utility functions
def format_size(size):
  """A helper function for creating a human-readable size."""
  size = float(size)
  for unit in ['B','KiB','MiB','GiB']:
    if size < 1024.0:
      return "{size:3.2f}{unit}".format(size=size, unit=unit)
    size /= 1024.0
  return "{size:.2f}{unit}".format(size=size, unit='TiB')

def make_federated_data(client_data, client_ids):
  return [
      client_data.create_tf_dataset_for_client(x)
      for x in client_ids
  ]

def set_sizing_environment():
  """Creates an environment that contains sizing information."""
  # Creates a sizing executor factory to output communication cost
  # after the training finishes. Note that sizing executor only provides an
  # estimate (not exact) of communication cost, and doesn't capture cases like
  # compression of over-the-wire representations. However, it's perfect for
  # demonstrating the effect of compression in this tutorial.
  sizing_factory = tff.framework.sizing_executor_factory()

  # TFF has a modular runtime you can configure yourself for various
  # environments and purposes, and this example just shows how to configure one
  # part of it to report the size of things.
  context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
  tff.framework.set_default_context(context)

  return sizing_factory

Now let's define a function to run the Federated Averaging algorithm. The execution of a Federated Learning algorithm from the perspective of TFF looks like this:

1. Initialize the algorithm and get the inital server state. The server state contains necessary information to perform the algorithm. Recall, since TFF is functional, that this state includes both any optimizer state the algorithm uses (e.g. momentum terms) as well as the model parameters themselves--these will be passed as arguments and returned as results from TFF computations.
2. Execute the algorithm round by round. In each round, a new server state will be returned as the result of each client training the model on its data. Typically in one round:
    1. Server broadcast the model to all the participating clients.
    2. Each client perform work based on the model and its own data.
    3. Server aggregates all the model to produce a sever state which contains a new model.

Training metrics are written to the Tensorboard directory for displaying during the training and evaluation process.

In [None]:
def run_experiment(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Runs the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      round_start_time = time.time()
      # Sample the clients participated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = make_federated_data(emnist_train, sampled_clients)
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print(f'Train: round {round_num:2d}, metrics={metrics}, '
            f'broadcasted_bits={format_size(broadcasted_bits)}, '
            f'aggregated_bits={format_size(aggregated_bits)}, '
            f'time={time.time() - round_start_time:.2f}s')

      sampled_clients = np.random.choice(
        emnist_test.client_ids,
        size=num_clients_per_round,
        replace=False)
      federated_test_data = make_federated_data(emnist_test, sampled_clients)
      test_metrics = evaluation_process(state.model, federated_test_data)
      print('Eval: test_metrics={}'.format(test_metrics))

      # Add test metrics to Tensorboard.
      for name, value in test_metrics.items():
        tf.summary.scalar('test_' + name, value, step=round_num)

      # Add training metrics to Tensorboard.
      for name, value in metrics['train'].items():
        tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()

Start TensorBoard display the training and evaluation metrics. It can take a few seconds for the data to load while the new data generated from training. Except for Loss and Accuracy, we also output the amount of broadcasted and aggregated data. Broadcasted data refers to tensors the server pushes to each client while aggregated data refers to tensors each client returns to the server.

In [None]:
# Clean the log directory to avoid conflicts.
!rm -R /tmp/logs/scalars/*

logdir = "/tmp/logs/scalars/original/"
%tensorboard --logdir /tmp/logs/scalars/ --port=0

Run!

In [None]:
# Set up the log directory and writer for Tensorboard.
summary_writer = tf.summary.create_file_writer(logdir)

run_experiment(federated_averaging_process=federated_averaging, num_rounds=10,
               num_clients_per_round=10, summary_writer=summary_writer)

In the above cell, we run for 10 rounds just to test everything out, but for an interesting experiment we recommend running for at least 1500 rounds, which should produce a model with over 98% accuracy; test accuracy will continue to improve up to 2000 rounds, with accuracy plateauing around 99%. Running for 1500 rounds on a CPU Colab backend should take about 2.8 hours. If you fine-tune the hyperparameters, you can get even better convergence speed.

## Build a custom broadcast and aggregate function

Now let's implement function to use lossy compression algorithms on broadcasted data and aggregated data using the [tensor_encoding](http://jakubkonecny.com/files/tensor_encoding.pdf) API.

### tensor_encoding API

tensor_encoding API is a general TensorFlow tool for invertible, potentially lossy, transformations.

Research surface API:
* `te.encoders.identity`
* `te.encoders.uniform_quantization`

Platform surface API:
* `te.core.SimpleEncoder`: An encoder for broadcast process in Federated Learning.
* `te.core.GatherEncoder`: An encoder for aggregation process of Federated Learning.



**How to build an encoder to perform uniform quantization?**

Uniform quantization is basically a simple rounding process, in which each tensor value is rounded to the nearest value from a set of possible quantization levels ([min, max] separated into 2^8 buckets).

An exmaple of 2-bit uniform quantization of x1 in `X=[x1, x2, x3, ..., xn]` (Note that `Xmin = min(X)`, `Xman = max(X)`):

```
 |--------0-|-------0---------0---------|
Xmin        x1                         Xmax
```
After uniform quantization:

```
 |--------*---------0---------0---------|
Xmin      x1                           Xmax
```

In [None]:
te.encoders.uniform_quantization(bits=8)

**How to build an encoder for TFF?**

In [None]:
spec = tf.TensorSpec(tf.TensorShape([10, 10]), tf.float32)

# A te.core.SimpleEncoder for broadcast process.
te.encoders.as_simple_encoder(te.encoders.uniform_quantization(bits=8), spec)

# A te.core.GatherEncoder for aggregation process.
te.encoders.as_gather_encoder(te.encoders.uniform_quantization(bits=8), spec)

**How to build a federated averaging algorithm with compression?**

First, we define two functions:
* `broadcast_encoder_fn` which creates an instance of [te.core.SimpleEncoder](https://github.com/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/python/core/internal/tensor_encoding/core/simple_encoder.py#L30) to encode tensors or variables in server to client communication (Broadcast data).
* `mean_encoder_fn` which creates an instance of [te.core.GatherEncoder](https://github.com/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/python/core/internal/tensor_encoding/core/gather_encoder.py#L30) to encode tensors or variables in client to server communicaiton (Aggregation data).

It is important to note that we do not apply a compression method to the entire model at once. Instead, we decide how (and whether) to compress each variable of the model independently. The reason is that generally, small variables such as biases are more sensitive to inaccuracy, and being relatively small, the potential communication savings are also relatively small. Hence we do not compress small variables by default. In this example, we apply uniform quantization to 8 bits (256 buckets) to every variable with more than 10000 elements, and only apply identity to other variables.

In [None]:
def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(value):
  """Function for building encoded mean."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

**Iterative process**

A composition of the following processes:
* A **broadcast process** which sends model to each clients.
* A **client process** which does something using the model and client data.
* A **aggregation process** which sends the whatever produced in clients back to server.
* A **server process** which does something using the aggregated models.

TFF provides APIs to convert the encoder function into a format that `tff.learning.build_federated_averaging_process` API can consume. By using the `tff.learning.framework.build_encoded_broadcast_from_model` and `tff.learning.framework.build_encoded_mean_from_model`, we can create two functions that can be passed into `broadcast_process` and `aggregation_process` agruments of `tff.learning.build_federated_averaging_process` to create a Federated Averaging algorithms with a lossy compression algorithm.

In [None]:
encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))
encoded_mean_process = (
    tff.learning.framework.build_encoded_mean_process_from_model(
    tff_model_fn, mean_encoder_fn))

Build the Federated Averaging algorithmm again with the compression algorithms.

In [None]:
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    aggregation_process=encoded_mean_process)

## Training the model again

Now let's run the new Federated Averaging algorithm.

In [None]:
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

run_experiment(federated_averaging_process=federated_averaging_with_compression,
               num_rounds=10,
               num_clients_per_round=10,
               summary_writer=summary_writer_for_compression)

Please navigate back to the tensorboard above to compare the training metrics between two runs.

As you can see in Tensorboard, there is a significant reduction between the `orginial` and `compression` curves in the `broadcasted_bits` and `aggregated_bits` plots while in the `loss` and `sparse_categorical_accuracy` plot the two curves are pretty similiar.

In conclusion, we implemented a compression algorithm that can achieve similar performance as the orignial Federated Averaging algorithm while the comminucation cost is significently reduced.

## Exercises

To implement a custom compression algorithm and apply it to the training loop,
you can:

1.  Implement a new compression algorithm as a subclass of
    [`EncodingStageInterface`](https://github.com/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/python/core/internal/tensor_encoding/core/encoding_stage.py#L75)
    or its more general variant,
    [`AdaptiveEncodingStageInterface`](https://github.com/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/python/core/internal/tensor_encoding/core/encoding_stage.py#L274)
    following
    [this example](https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/research/compression/sparsity.py).
1.  Construct your new
    [`Encoder`](https://github.com/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/python/core/internal/tensor_encoding/core/core_encoder.py#L38)
    and specialize it for
    [model broadcast](https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/research/compression/run_experiment.py#L95)
    or
    [model update averaging](https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/research/compression/run_experiment.py#L121).
1.  Use those objects to build the entire
    [training computation](https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/research/compression/run_experiment.py#L204).

Potentially valuable open research questions include: non-uniform quantization, lossless compression such as huffman coding, and mechanisms for adapting compression based on the information from previous training rounds.

Recommended reading materials:
* [Expanding the Reach of Federated Learning by Reducing Client Resource Requirements](https://research.google/pubs/pub47774/)
* [Federated Learning: Strategies for Improving Communication Efficiency](https://research.google/pubs/pub45648/)
* _Section 3.5 Communication and Compression_ in [Advanced and Open Problems in Federated Learning](https://arxiv.org/abs/1912.04977)