In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

In [None]:
from tensorflow.keras import layers
from tensorflow.keras import losses, optimizers, metrics, callbacks
from tensorflow.python.data.ops.options import model_pb2
from keras.engine.training import Model

In [None]:
tf.random.set_seed(12345)
np.random.seed(12345)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
histo = tfds.load('colorectal_histology', as_supervised=True, data_dir='/content/drive/MyDrive/NN')
histo = histo['train']

In [None]:
def preprocess(image,label):
  image = tf.cast(image,tf.float32) / 255.0
  return image, label


def add_conv_block(x, filters=64, kernel_size=3, stride_step=2, batch_norm=True, maxpool=False):
  # A function to add a block to the model, composed of Conv2D/BatchNormalization/(optional)MaxPool/ReLU.
  # l2 regularization and glorot normal initialization for conv layer, padding is always used
  # for conv layer and maxpool, that is proposed only with a pool size of 2.

  if maxpool == True: stride_step = 1

  x = layers.Conv2D(filters, kernel_size, strides=stride_step, padding='same',
                    kernel_regularizer=tf.keras.regularizers.l2(1e-3),
                    kernel_initializer=tf.keras.initializers.glorot_normal)(x)

  if maxpool == True: x = layers.MaxPool2D(pool_size=2, padding='same')(x)

  if batch_norm == True: x = layers.BatchNormalization()(x)

  x = layers.ReLU()(x)

  return x

In [None]:
def normalization_class_per_agent(class_portions, class_tot):

  tot_class_portions = np.sum(class_portions)

  if tot_class_portions > class_tot:
    class_portions[-1] -=  tot_class_portions - class_tot
  elif tot_class_portions < class_tot:
    class_portions[-1] += class_tot - tot_class_portions

  return np.array(list(class_portions), dtype=int), np.cumsum([0] + list(class_portions), dtype=int)


def distribute_over_agents(data, classes, agents, var, bs):

  # Splitting data by labels
  split_by_label = [data.filter(lambda features, label: label==i) for i in range(classes)]

  # Counting number of observations by label
  class_size = []

  for label in split_by_label:
    counter = 0
    for obs in label:
      counter += 1
    class_size.append(counter)

  # Generating Non-IID shards following given class-per-agent variance
  mu = 1
  shards = [[] for c in range(agents)]
  shards_size = {c:[] for c in range(agents)}

  for c in range(classes):
    class_portions = np.clip(np.random.normal(mu, var, size=agents), 0, None)
    class_portions = np.round((class_portions/sum(class_portions)) * class_size[c])
    class_portions, cum_class_portions = normalization_class_per_agent(class_portions, class_size[c])

    for agent in range(agents):
      shards_size[agent].append(class_portions[agent])
      shards[agent].append(split_by_label[c].skip(cum_class_portions[agent]).take(class_portions[agent]))

  # Concatenating classes for each shard + creating ready list of datasets with batches
  new_shards = []
  for a in range(agents):
    tmp_shard = shards[a][0]
    for c in range(classes-1):
      tmp_shard = tf.data.Dataset.concatenate(tmp_shard, shards[a][c+1])
    new_shards.append(tmp_shard.shuffle(len(data)).batch(bs))

  return new_shards, shards_size

In [None]:
# versione 2 --- con target

class Federated:


  def __init__(self, data, agents, distribution='IID', var=0.2):
    self.data = data.shuffle(len(data), reshuffle_each_iteration=False).map(preprocess)     # provando con uno shuffle iniziale, equivale a fare sample sulle singole classi dopo e fare subito preprocess per evitare problemi di compatibilità nello split
    self.data_shape = next(iter(self.data))[0].shape
    self.agents = agents
    self.labels = set([int(obs[1]) for obs in data])
    self.classes = len(self.labels)
    self.distribution = distribution

    self.N = len(data)
    self.train_size = int(self.N*0.9)
    self.test_size = int(self.N*0.1)

    self.train = self.data.take(self.train_size)
    self.test = self.data.skip(self.train_size).take(self.test_size).batch(32)
    split_train = int(self.train_size/self.agents)
    self.bs = 10
    self.var = var
    if distribution == 'IID':
      self.local_train = [self.train.skip(i*split_train).take(split_train).shuffle(self.N).batch(self.bs) for i in range(self.agents)]
    elif distribution == 'Non-IID':
      self.local_train, self.shards_size = distribute_over_agents(self.train, self.classes, self.agents, self.var, self.bs)


  def CNN(self):

    inp = layers.Input(shape=self.data_shape)

    conv1 = add_conv_block(inp, maxpool=True)
    conv2 = add_conv_block(conv1, filters=128)
    conv3_and_pool = add_conv_block(conv2, filters=128, maxpool=True)
    conv4 = add_conv_block(conv3_and_pool, filters=256)
    conv5_and_pool = add_conv_block(conv4, filters=256, maxpool=True)

    average_pooling = layers.GlobalAveragePooling2D()(conv5_and_pool)
    dense = layers.Dense(250, activation='relu',
                    kernel_regularizer=tf.keras.regularizers.l2(1e-4))(average_pooling)

    out = layers.Dense(self.classes, activation='softmax')(dense)

    model = tf.keras.Model(inp, out)

    return model


  def aggregation(self, local_weights_path, model):

    local_weights = []

    for k in range(self.agents):
      model.load_weights( local_weights_path+str(k) )
      local_weights.append(model.get_weights())

    layers = len(local_weights[0])
    global_layers = []

    if self.distribution == 'IID':
      obs_per_agent = [len(self.local_train[i])*self.bs for i in range(self.agents)]
    elif self.distribution == 'Non-IID':
      obs_per_agent = [sum(self.shards_size[a]) for a in range(self.agents)]

    for l in range(layers):
      aggr = 0
      for k in range(self.agents):
        aggr += local_weights[k][l] * ((obs_per_agent[k])/self.train_size)  #occhio che train size forse andrà cambiato qui più avanti
                                                                          #se ogni agent avrà il suo data-shard fissato
      global_layers.append(aggr)

    return global_layers


  def AverageLearning(self, model, rounds, local_epochs, target_acc=False):

    model.compile(loss=losses.SparseCategoricalCrossentropy(from_logits=False),
                optimizer=optimizers.Adam(),
                metrics=[metrics.SparseCategoricalAccuracy()])
    global_weights = model.get_weights()

    if target_acc != False: rounds = 100

    # Preparing paths to save models weights
    project_dir = '/content/drive/MyDrive/Colab Notebooks/SPBDproject'
    weights_path = project_dir + '/local_checkpoints/local_'
    global_path = project_dir + '/global_checkpoints/global_'

    for r in range(rounds):

      local_weights = []

      for k in range(self.agents):

        model.set_weights(global_weights)

        model.fit(self.local_train[k], epochs=local_epochs, verbose=0)
        model.evaluate(self.test)

        local_path = weights_path+str(k)
        model.save_weights(local_path)

      global_weights = self.aggregation(weights_path, model)
      print("Global test:")
      model.set_weights(global_weights)
      global_loss, global_acc = model.evaluate(self.test)
      print()

      model.save_weights( global_path+str(r) )

      if target_acc != False:
        if global_acc > target_acc: break


  def AverageLearningWithTarget(self, model, target, local_epochs):

    model.compile(loss=losses.SparseCategoricalCrossentropy(from_logits=False),
                optimizer=optimizers.Adam(),
                metrics=[metrics.SparseCategoricalAccuracy()])
    global_weights = model.get_weights()
    global_acc = 0

    while(target > global_acc):

      local_weights = []

      for k in range(self.agents):

        model.set_weights(global_weights)

        model.fit(self.local_train[k], epochs=local_epochs, verbose=0)
        model.evaluate(self.test)

        local_weights.append(model.get_weights())

      global_weights = self.aggregation(local_weights)
      print("Global test:")
      model.set_weights(global_weights)
      global_loss, global_acc = model.evaluate(self.test)
      print()


In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.2)  #run this

In [None]:
trial_model = trial.CNN()

In [None]:
trial.AverageLearning(trial_model, rounds=10, local_epochs=5, target_acc=0.8)

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:



In [None]:
# NonIID (0.2 var), 3 agents, 5 local epochs - 9 rounds - 1h40m

4500

In [None]:
trial = Federated(histo, 3, distribution='IID', var=0.2)  #run this

In [None]:
trial_model = trial.CNN()

In [None]:
trial.AverageLearning(trial_model, rounds=10, local_epochs=5, target_acc=0.8)

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:



In [None]:
# IID (uniform shuffle), 3 agents, 5 local epochs - 8 rounds - 20m

In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.5)  #run this

In [None]:
trial_model = trial.CNN()

In [None]:
trial.AverageLearning(trial_model, rounds=10, local_epochs=5, target_acc=0.8)

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:



In [None]:
# NonIID (0.5 var), 3 agents, 5 local epochs - 6 rounds - 1h

In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=1)  #run this

In [None]:
trial_model = trial.CNN()

In [None]:
trial.AverageLearning(trial_model, rounds=10, local_epochs=5, target_acc=0.8)

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:



In [None]:
trial.shards_size

{0: [244, 0, 228, 0, 85, 123, 155, 0],
 1: [227, 258, 169, 286, 205, 447, 0, 0],
 2: [91, 298, 158, 270, 280, 0, 410, 566]}

In [None]:
obs_per_agent = [sum(trial.shards_size[a]) for a in range(3)]
obs_per_agent

[835, 1592, 2073]

In [None]:
sum(obs_per_agent)

4500

In [None]:
# NonIID (1 var), 3 agents, 5 local epochs - 21 rounds - 2h 56m

In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.2)  #run this

In [None]:
trial_model = trial.CNN()

In [None]:
trial.AverageLearning(trial_model, rounds=10, local_epochs=5, target_acc=0.8)

Global test:

Global test:

Global test:

Global test:

Global test:

Global test:



In [None]:
# NonIID (0.2 var), 3 agents, 5 local epochs - 6 rounds - 50m (first weights saved attempt)

In [None]:
##############################################################
##############################################################

In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.2)  #run this
trial_model = trial.CNN()
trial.AverageLearning(trial_model, rounds=1, local_epochs=3)

Global test:



In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.2)  #run this
trial_model = trial.CNN()
trial.AverageLearning(trial_model, rounds=1, local_epochs=3)

Global test:



In [None]:
trial = Federated(histo, 3, distribution='Non-IID', var=0.2)  #run this

In [None]:
type(trial.data)

tensorflow.python.data.ops.map_op._MapDataset

In [None]:
len(trial.local_train[0].take(0))

TypeError: ignored

In [None]:
trial_IID = Federated(histo, 3, distribution='IID', var=0.2)

In [None]:
trial_IID.local_train[0].take(0)

<_TakeDataset element_spec=(TensorSpec(shape=(None, 150, 150, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>

In [None]:
trial_II