In [1]:
import collections

import numpy as np
import tensorflow as tf
# tf.enable_eager_execution() # this is necessary in audi8
import tensorflow_federated as tff
import nest_asyncio # this is necessary in audi 9 
nest_asyncio.apply()
np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
# see error meassage at https://github.com/tensorflow/federated/issues/842

INFO:tensorflow:Using local port 18904
INFO:tensorflow:Using local port 21987
INFO:tensorflow:Using local port 15873
INFO:tensorflow:Using local port 19605
INFO:tensorflow:Using local port 18785
INFO:tensorflow:Using local port 22784
INFO:tensorflow:Using local port 15582
INFO:tensorflow:Using local port 20826
INFO:tensorflow:Using local port 17114
INFO:tensorflow:Using local port 21055


TensorFlow Addons offers no support for the nightly versions of TensorFlow. Some things might work, some other might not. 
If you encounter a bug, do not file an issue on GitHub.


b'Hello, World!'

In [13]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

In [15]:
example_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[0])
example_dataset.element_spec

OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)),
             ('pixels',
              TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

In [None]:
example_element = next(iter(example_dataset))

In [18]:
list(example_dataset.as_numpy_iterator())

[OrderedDict([('label', 5),
              ('pixels',
               array([[1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        ],
                      [1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        , 1.        , 1.        ,
                       1.        , 1.        , 1.        

In [6]:
tf.executing_eagerly() 

True

In [8]:
NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER= 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]), # convert from 28*28 to 784 * 1
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

In [10]:
preprocessed_example_dataset

<PrefetchDataset shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

In [9]:
preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch

OrderedDict([('x',
              array([[1., 1., 1., ..., 1., 1., 1.],
                     [1., 1., 1., ..., 1., 1., 1.],
                     [1., 1., 1., ..., 1., 1., 1.],
                     ...,
                     [1., 1., 1., ..., 1., 1., 1.],
                     [1., 1., 1., ..., 1., 1., 1.],
                     [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)),
             ('y',
              array([[3],
                     [7],
                     [3],
                     [1],
                     [0],
                     [6],
                     [9],
                     [9],
                     [6],
                     [7],
                     [9],
                     [2],
                     [3],
                     [1],
                     [0],
                     [5],
                     [2],
                     [7],
                     [3],
                     [7]]))])

In [8]:
def make_federated_data(client_data, client_ids):
    return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

- In simulation setting, we choose subset of clients randomly. Random subset mighe **slow down convergence**. We will only sample the set of clients once, and reuse the same set across rounds to speed up convergence (intentionally over-fitting to these few user's data). 

In [9]:
sample_clients = emnist_train.client_ids[0:NUM_CLIENTS] # choose first 10 clients

federated_train_data = make_federated_data(emnist_train, sample_clients) # preprocessed data for 10 clients

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))

Number of client datasets: 10
First dataset: <PrefetchDataset shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>


- If we first define keras model 

In [10]:
def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

- need to convert the keras model to tff learning model interface using :

In [11]:
def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

- Next we can train model calling fed avg algorithm. **Keep in mind that the argument needs to be a constructor (such as model_fn above), not an already-constructed instance**

In [12]:
iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


Instructions for updating:
If using Keras pass *_constraint arguments to layers.


- `client_optimizer` computes the local client update
- `server_optimizer` computes the average of local update parameters

The `iterative_process` takes no argument, return a representation of the server status of the federative process

In [17]:
str(iterative_process.initialize.type_signature)

'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<>,model_broadcast_state=<>>@SERVER)'

Next we construct server state

In [14]:
state = iterative_process.initialize()

- `next` represents a single round of Federated Averaging, which consists of pushing the server state (including the model parameters) to the clients, on-device training on their local data, collecting and averaging model updates, and producing a new updated model at the server.

 SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

In [15]:
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics = {}'.format(metrics))

round 1, metrics = OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13127571), ('loss', 2.9841821)]))])


In [21]:
NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    print('round {:2d}, metrics={}'.format(round_num, metrics))

round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1382716), ('loss', 2.9017148)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.16111112), ('loss', 2.7337723)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.16296296), ('loss', 2.7093928)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21090534), ('loss', 2.5592349)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.23806584), ('loss', 2.4227028)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2516461), ('loss', 2.3178978)]))])
round  8, metrics=

## Visualize the metrics

In [16]:
logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

In [20]:
NUM_ROUNDS = 11
with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

In [23]:
%load_ext tensorboard

In [24]:
%tensorboard --logdir /tmp/logs/scalars/ --port=0

ERROR: Timed out waiting for TensorBoard to start. It may still be running as pid 632.

In [25]:
# Run this this cell to clean your directory of old output for future graphs from this directory.
!rm -R /tmp/logs/scalars/*

'rm' is not recognized as an internal or external command,
operable program or batch file.


## Create model variables from scratch

In [26]:
MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

In [27]:
def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Define the forward pass method that computes loss, emits predictions, and updates the cumulative statistics for a single batch of input data

In [28]:
def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

In [29]:
def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

In [30]:
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

- Construct `tff.learning.model`

In [31]:
class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

## Training in new model

In [32]:
iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))

In [33]:
state = iterative_process.initialize()

In [34]:
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))

round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.9460506), ('accuracy', 0.12119342)]))])


In [35]:
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))

round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.967553), ('accuracy', 0.13765432)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7426832), ('accuracy', 0.15884773)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.69955), ('accuracy', 0.18312757)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4934955), ('accuracy', 0.21069959)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.449169), ('accuracy', 0.22057614)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', ()), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.383666), ('accurac

## Model evaluation

In [36]:
evaluation = tff.learning.build_federated_evaluation(MnistModel)

In [37]:
train_metrics = evaluation(state.model, federated_train_data)

In [38]:
str(train_metrics)

"OrderedDict([('num_examples', 4860.0), ('loss', 1.611347), ('accuracy', 0.47839507)])"

- Train on test sample

In [40]:
federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]

(10,
 <PrefetchDataset shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)

In [41]:
test_metrics = evaluation(state.model, federated_test_data)

In [42]:
str(test_metrics)

"OrderedDict([('num_examples', 580.0), ('loss', 1.7383928), ('accuracy', 0.44827586)])"