In [1]:
#@test {"skip": true}

!which python
import nest_asyncio
nest_asyncio.apply()

%load_ext tensorboard

from matplotlib import pyplot as plt
import sys

if not sys.warnoptions:
    import warnings
    warnings.simplefilter("ignore")

/usr/local/bin/python


In [2]:

import collections

from IPython.display import display, HTML, IFrame

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

print(tff.__version__)

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

np.random.seed(0)
#def greetings():
#  display(HTML('<b><font size="6" color="#ff00f4">Greetings, virtual tutorial participants!</font></b>'))
#  return True
#l = tff.federated_computation(greetings)()


0.18.0


# User clustering

In the previous tutorials, we learned how to set up model and data pipelines, and use these to perform federated training using the `tff.learning` API.

Of course, this is only the tip of the iceberg when it comes to FL research. In this tutorial, we are going to discuss how to implement federated learning algorithms *without* deferring to the `tff.learning` API. We aim to accomplish the following:

**Goals:**


*   Understand the general structure of federated learning algorithms.
*   Explore the *Federated Core* of TFF.
*   Use the Federated Core to implement Federated Averaging directly.




## Preparing the input data
We first load and preprocess the EMNIST dataset included in TFF. We essentially use the same code as in the first tutorial.

In [3]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(only_digits=False)

In [4]:
# Let's look at the shape of our data
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])
#example_dataset
example_dataset.element_spec

OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)),
             ('pixels',
              TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

In [None]:
# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12,7))
f.suptitle("Label Counts for a Sample of Clients")
for i in range(6):
  ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i])
  k = collections.defaultdict(list)
  for e in ds:
    k[e['label'].numpy()].append(e['label'].numpy())
  plt.subplot(2, 3, i+1)
  plt.title("Client {}".format(i))
  for j in range(62):
    plt.hist(k[j], density=False, bins=[i for i in range(62)])

In [5]:
NUM_CLIENTS = 20
NUM_EPOCHS = 1
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER=10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 28,28,1]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

In [6]:
def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

In [7]:
#Choose 10 clients randomly
client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False)

federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

In [8]:
# Let's look at the shape of our data
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])
#example_dataset
example_dataset.element_spec

OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)),
             ('pixels',
              TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

In [9]:
preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

## Preparing the model

We use the same model as the first tutorial, which has a single hidden layer, followed by a softmax layer.

In [10]:
def create_keras_model():
  data_format = 'channels_last'
  initializer = tf.keras.initializers.RandomNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(28, 28,1)),
      tf.keras.layers.Conv2D(32,(3,3), activation='relu'),
      tf.keras.layers.Conv2D(64,(3,3), activation='relu'),
      tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2)),
      tf.keras.layers.Dropout(rate=0.75),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu', kernel_initializer=initializer),
      tf.keras.layers.Dropout(rate=0.5, seed=1),
      tf.keras.layers.Dense(62, kernel_initializer=initializer),
      tf.keras.layers.Softmax()
  ])

In [11]:
def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

We wrap this Keras model as a `tff.learning.Model`.

In [12]:
def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [13]:
@tf.function
def client_update(model, dataset,server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.weights.trainable
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

In [14]:
@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.weights.trainable
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

In [15]:
@tff.tf_computation()
def server_init():
  model = model_fn()
  return model.weights.trainable

In [16]:
dummy_model = model_fn()
tf_dataset_type = tff.SequenceType(dummy_model.input_spec)
model_weights_type = server_init.type_signature.result
str(tf_dataset_type), str(model_weights_type)

('<x=float32[?,28,28,1],y=int32[?,1]>*',
 '<float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[9216,128],float32[128],float32[128,62],float32[62]>')

In [17]:
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

In [18]:
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

In [19]:
@tff.federated_computation
def initialize_fn():
   #return tff.federated_map(server_update_fn, model_weights)
   return tff.federated_value(server_init(), tff.SERVER)

In [20]:

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

Remember the 4 elements of an FL algorithm?

1. A server-to-client broadcast step.
2. A local client update step.
3. A client-to-server upload step.
4. A server update step.

Now that we've built up the above, each part can be compactly represented as a single line of TFF code. This simplicity is why we had to take extra care to specify things such as federated types!

In [21]:
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))
  
  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights, client_weights

In [22]:
federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

In [23]:
str(federated_algorithm.initialize.type_signature),str(federated_algorithm.next.type_signature)

('( -> <float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[9216,128],float32[128],float32[128,62],float32[62]>@SERVER)',
 '(<server_weights=<float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[9216,128],float32[128],float32[128,62],float32[62]>@SERVER,federated_dataset={<x=float32[?,28,28,1],y=int32[?,1]>*}@CLIENTS> -> <<float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[9216,128],float32[128],float32[128,62],float32[62]>@SERVER,{<float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[9216,128],float32[128],float32[128,62],float32[62]>}@CLIENTS>)')

In [24]:
state=federated_algorithm.initialize()

In [25]:
fed_avg = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.03),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0, momentum=0.9))
    

In [26]:
import os
import shutil

logdir = "/tmp/logs/scalars/training/"
if os.path.exists(logdir):
  shutil.rmtree(logdir)

# Your code to create a summary writer:
summary_writer = tf.summary.create_file_writer(logdir)

In [33]:
NUM_ROUNDS = 100
with summary_writer.as_default():
  for round_num in range(0, NUM_ROUNDS):
    state,client_weights = federated_algorithm.next(state, federated_train_data)


KeyboardInterrupt: 

In [None]:
print(client_weights[0])
# It is a list of 8 tensors , each tensor is a layer weight

In [35]:
NUM_OF_CLUSTERS = 2
print(client_ids)

['f1649_03' 'f1623_09' 'f3123_42' 'f0875_02' 'f3966_01' 'f0882_28'
 'f0696_13' 'f1973_24' 'f4000_40' 'f1972_15' 'f2191_53' 'f1088_00'
 'f1241_00' 'f2215_69' 'f3436_22' 'f3514_20' 'f0326_35' 'f3329_30'
 'f0686_09' 'f3955_03']


In [36]:
#Randomly choose two client weights for initialization
q = np.random.choice([i for i in range(0,8)], size=NUM_OF_CLUSTERS, replace=False)
print(q)
q_ids = client_ids[q]
print(q_ids)

[4 5]
['f3966_01' 'f0882_28']


In [37]:
@tf.function
def client_forward_pass(model, dataset, cluster_weights ):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.weights.trainable
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, cluster_weights)

  # Use the client_optimizer to update the local model.
  total_loss=0.0
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)
      total_loss=total_loss+outputs.loss

  return total_loss

In [None]:
#clustering code here

import time
start = time.process_time()

NUM_CLIENTS =20
model = model_fn()

#select train client ids
import random
shuffled_ids = emnist_train.client_ids.copy()
random.shuffle(shuffled_ids)
clients = shuffled_ids[0:2500]

#clients = emnist_train.client_ids.copy()

NUM_ROUNDS_SGD = 5

clusters = {client_ids[idx]: [{'weights':client_weights[idx]},{'counts':0},client_ids[idx]] for idx in q}

while len(clients)!=0:
    
    # Randomly pick p clients
    print('Number of clients left '+ str(len(clients)))
          
    if len(clients) < NUM_CLIENTS :
         p = np.random.choice(clients, size=len(clients), replace=False)
    else :
         p = np.random.choice(clients, size=NUM_CLIENTS, replace=False)
            
    #subtrack the selected clients from k 
    for j in p:
        clients.remove(j)
        
    print('Number of clients left '+ str(len(clients)))
    
    #for each client in p
    for x in p:
        
        # create dataset for client x
        train_data = preprocess(emnist_train.create_tf_dataset_for_client(x))
        
        #Initialize loss 
        loss = []
        
        #looping over k clusters
        for key in list(clusters.keys()):
            # calculate the loss for every client in p with clusters
            loss.append(client_forward_pass(model, train_data, clusters[key][0]['weights']))
        
        #assign client_id to the cluster with minimum loss
        min_loss = min(loss)
        min_loss_index = loss.index(min_loss)
        l = list(clusters.keys())
        clusters[l[min_loss_index]] = clusters[l[min_loss_index]] + [x]
    
    # Run SGD for few steps
    for cluster_id in list(clusters.keys()):
        
        cluster_clients = clusters[cluster_id][2:].copy()
        # Size of the cluster
        print('Size of cluster '+str(cluster_id)+'is '+ str(len(cluster_clients)))
        # Perform Fedavg using models from all the clients with mk (number of samples with each client)
        state = fed_avg.initialize()
        weights = clusters[cluster_id][0]['weights']
        model = model_fn()
        model_weights = model.weights.trainable
  
        tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, weights)
        
        state = tff.learning.state_with_new_model_weights(state,
                    trainable_weights=[v.numpy() for v in model.weights.trainable],
                    non_trainable_weights=[v.numpy() for v in model.weights.non_trainable] )
        
        while len(cluster_clients) != 0 :
            
            if len(cluster_clients) < 20 :
                sample_clients =  np.random.choice(cluster_clients, size = len(cluster_clients),replace=False)
            else :
                sample_clients =  np.random.choice(cluster_clients, size = 20, replace=False)
                
            for j in sample_clients:
                cluster_clients.remove(j)
            #Prepare dataset for all clients in the cluster
            train_data = make_federated_data(emnist_train, sample_clients)
            # Clients send mk*hk(local weights of the clients)
            # Global_cluster_model = Fedavg(m1h1,m2h2...mkhk)
            for i in range(0,NUM_ROUNDS_SGD):
                clusters[cluster_id][1]['counts'] =  clusters[cluster_id][1]['counts'] + 1
                state, metrics = fed_avg.next(state, train_data)
        print('metrics={},rounds of cluster={}'.format(metrics['train'],clusters[cluster_id][1]['counts']))
         
        clusters[cluster_id][0]['weights'] = state.model.trainable
    
print('Time taken for clustering'+ str(time.process_time() - start))

    # Get the weights

In [48]:
#Saving json

import pickle
f = open("clusters.pkl","wb")
pickle.dump(clusters,f)
f.close()



## Evaluating the Clusters

In [27]:
import pickle
with open('clusters.pkl', 'rb') as f:
    clusters = pickle.load(f)
    f.close()

In [None]:
clusters

In [33]:
NUM_ROUNDS = 1000
NUM_CLIENTS = 20 

for cluster_id in list(clusters.keys()):
    state = fed_avg.initialize()
    clients = clusters[cluster_id][2:]
    for round_num in range(0, NUM_ROUNDS):
        sample_clients = np.random.choice(clients, NUM_CLIENTS, replace=False)
        federated_test_data = make_federated_data(emnist_train, sample_clients)
        state, metrics = fed_avg.next(state, federated_test_data)
        print('round {:2d}, metrics={}'.format(round_num, metrics['train']))
    clusters[cluster_id][0]['weights']=  state.model.trainable

        



round  0, metrics=OrderedDict([('sparse_categorical_accuracy', 0.023565574), ('loss', 4.1502867)])
round  1, metrics=OrderedDict([('sparse_categorical_accuracy', 0.058364544), ('loss', 3.9683173)])
round  2, metrics=OrderedDict([('sparse_categorical_accuracy', 0.06329114), ('loss', 3.71865)])
round  3, metrics=OrderedDict([('sparse_categorical_accuracy', 0.067474045), ('loss', 3.5589485)])
round  4, metrics=OrderedDict([('sparse_categorical_accuracy', 0.07064094), ('loss', 3.5892222)])
round  5, metrics=OrderedDict([('sparse_categorical_accuracy', 0.06870229), ('loss', 3.5836356)])
round  6, metrics=OrderedDict([('sparse_categorical_accuracy', 0.07915484), ('loss', 3.4961405)])
round  7, metrics=OrderedDict([('sparse_categorical_accuracy', 0.07274556), ('loss', 3.4587097)])
round  8, metrics=OrderedDict([('sparse_categorical_accuracy', 0.07576747), ('loss', 3.4432707)])
round  9, metrics=OrderedDict([('sparse_categorical_accuracy', 0.07181572), ('loss', 3.4032202)])
round 10, metrics=O

In [34]:

import pickle
f = open("clusters_new.pkl","wb")
pickle.dump(clusters,f)
f.close()


In [35]:
# Construct federated evaluation computation here:
evaluation = tff.learning.build_federated_evaluation(model_fn)

In [37]:
for cluster_id in list(clusters.keys()):
    sample_clients = clusters[cluster_id][2:]
    federated_test_data = make_federated_data(emnist_test, sample_clients)
    state = fed_avg.initialize()
    weights = clusters[cluster_id][0]['weights']
    model = model_fn()
    model_weights = model.weights.trainable
  
    tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, weights)
        
    state = tff.learning.state_with_new_model_weights(state,
                    trainable_weights=[v.numpy() for v in model.weights.trainable],
                    non_trainable_weights=[v.numpy() for v in model.weights.non_trainable] )
    print('Iam here')
    test_metrics = evaluation(state.model, federated_test_data)
    print(test_metrics)

Iam here
OrderedDict([('sparse_categorical_accuracy', 0.86944807), ('loss', 0.41926345)])
Iam here
OrderedDict([('sparse_categorical_accuracy', 0.8951457), ('loss', 0.33125398)])


In [35]:
# Run evaluation on the test data here, using the federated model produced from 
# training:
