# Federated Tensorflow Mnist Tutorial

In [1]:
# # Install dependencies if not already installed
# !pip install tensorflow==2.3.1

## Connect to the Federation

In [2]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'api'
cert_dir = 'cert'
director_node_fqdn = '192.168.1.129'
director_port=50051
# 1) Run with API layer - Director mTLS 
# If the user wants to enable mTLS their must provide CA root chain, and signed key pair to the federation interface
# cert_chain = f'{cert_dir}/root_ca.crt'
# api_certificate = f'{cert_dir}/{client_id}.crt'
# api_private_key = f'{cert_dir}/{client_id}.key'

# federation = Federation(
#     client_id=client_id,
#     director_node_fqdn=director_node_fqdn,
#     director_port=director_port,
#     cert_chain=cert_chain,
#     api_cert=api_certificate,
#     api_private_key=api_private_key
# )

# --------------------------------------------------------------------------------------------------------------------

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(
    client_id=client_id,
    director_node_fqdn=director_node_fqdn,
    director_port=director_port, 
    tls=False
)


In [3]:
shard_registry = federation.get_shard_registry()
shard_registry

{'env_three': {'shard_info': node_info {
    name: "env_three"
  }
  shard_description: "Mnist dataset, shard number 3 out of 3"
  sample_shape: "416"
  sample_shape: "416"
  sample_shape: "3"
  target_shape: "1",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-04-11 12:25:21',
  'current_time': '2022-04-11 12:25:50',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'env_one': {'shard_info': node_info {
    name: "env_one"
  }
  shard_description: "Mnist dataset, shard number 1 out of 3"
  sample_shape: "416"
  sample_shape: "416"
  sample_shape: "3"
  target_shape: "1",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-04-11 12:25:39',
  'current_time': '2022-04-11 12:25:50',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'env_two': {'shard_info': node_info {
    name: "env_two"
  }
  shard_description: "Mnist dataset, shard number 2 out of 3"
  sample_shape: "41

In [4]:
# First, request a dummy_shard_desc that holds information about the federated dataset 
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
dummy_shard_dataset = dummy_shard_desc.get_dataset('train')
sample, target = dummy_shard_dataset[0]
f"Sample shape: {sample.shape}, target shape: {target.shape}"

'Sample shape: (416, 416, 3), target shape: (1,)'

## Describing FL experimen

In [5]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment

2022-04-11 12:25:50.406174: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2022-04-11 12:25:51.080869: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2022-04-11 12:25:51.101779: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-04-11 12:25:51.101949: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:01:00.0 name: NVIDIA GeForce RTX 2070 Super computeCapability: 7.5
coreClock: 1.38GHz coreCount: 40 deviceMemorySize: 7.79GiB deviceMemoryBandwidth: 417.29GiB/s
2022-04-11 12:25:51.101966: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2022-04-11 12:25:51.104257: I tensorflow/stream_execu

### Register model

In [6]:
from layers import create_model, optimizer
import os
from IPython.display import clear_output

framework_adapter = 'openfl.plugins.frameworks_adapters.keras_adapter.FrameworkAdapterPlugin'
model = create_model()
clear_output()
if os.path.exists('best_model.h5'):
    model.load_weights('best_model.h5')
    print('Model is loaded with pretrained weight')
else:
    print('Model is initialized with empty weight')
MI = ModelInterface(model=model, optimizer=optimizer, framework_plugin=framework_adapter)

Model is initialized with empty weight


In [7]:
# model.summary()

### Register dataset

In [8]:
import numpy as np
from tensorflow.keras.utils import Sequence

class DataGenerator(Sequence):

    def __init__(self, shard_descriptor, batch_size):
        self.shard_descriptor = shard_descriptor
        self.batch_size = batch_size
        self.indices = np.arange(len(shard_descriptor))
        self.on_epoch_end()

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, index):
        index = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.indices[k] for k in index]

        X, y = self.shard_descriptor[batch]
        return X, y

    def on_epoch_end(self):
        np.random.shuffle(self.indices)


class MnistFedDataset(DataInterface):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor
        
        self.train_set = shard_descriptor.get_dataset('train')
        self.valid_set = shard_descriptor.get_dataset('val')

    def __getitem__(self, index):
        return self.shard_descriptor[index]

    def __len__(self):
        return len(self.shard_descriptor)

    def get_train_loader(self):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        if self.kwargs['train_bs']:
            batch_size = self.kwargs['train_bs']
        else:
            batch_size = 32
        return DataGenerator(self.train_set, batch_size=batch_size)

    def get_valid_loader(self):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        if self.kwargs['valid_bs']:
            batch_size = self.kwargs['valid_bs']
        else:
            batch_size = 32
        
        return DataGenerator(self.valid_set, batch_size=batch_size)

    def get_train_data_size(self):
        """
        Information for aggregation
        """

        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)

### Create Mnist federated dataset

In [9]:
fed_dataset = MnistFedDataset(train_bs=32, valid_bs=32)

## Define and register FL tasks

In [10]:
TI = TaskInterface()

import time
import tensorflow as tf
from layers import train_acc_metric, val_acc_metric, loss_fn

from math import pi, cos, acos

ROUNDS = 40 #30 #20

@TI.register_fl_task(model='model', data_loader='train_dataset', \
                     device='device', optimizer='optimizer')     
def train(model, train_dataset, optimizer, device, loss_fn=loss_fn, warmup=False):
    start_time = time.time()  

    """Assign the initial learning rate if this is first epoch"""
    # hperparameter
    initial_learning_rate = 0.0005
    decay_steps = ROUNDS -1
    alpha = 1/100
    
    # get current lr
    try:
        lr = optimizer.lr.numpy()
    except:
        lr = optimizer.lr('float32').numpy()
    
    # assign lr with intitial lr
    print("################################################################")
    if lr > 1:
        lr = initial_learning_rate
        optimizer.lr = initial_learning_rate
        print(f"Current lr: {initial_learning_rate}") 
    else:
        print(f"Current lr: {lr}") 

        
    """Training Loop for this Epoch"""
    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 10 batches.
        if step % 10 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * len(y_batch_train)))
        if warmup:
            break

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    
    
    """reduce lr using cosine decay"""
    # get current lr
    try:
        lr = optimizer.lr.numpy()
    except:
        lr = optimizer.lr('float32').numpy()

    # reverse engineer to get current step (AKA epoch)
    decayed = lr / initial_learning_rate
    cosine_decay = (decayed - alpha) / (1 - alpha)
    value = cosine_decay / 0.5 - 1
    if value >= 1:
        value = 1
    elif value <= -1:
        value = -1
    step = (decay_steps / pi * acos(value))

    # get new learning rate
    step = step + 1
    step = min(step, decay_steps)
    cosine_decay = 0.5 * (1 + cos(pi * step / decay_steps))
    decayed = (1 -alpha) * cosine_decay + alpha
    lr = initial_learning_rate * decayed

    # assign new lr to optimizer
    optimizer.lr = lr  

        
    return {'train_acc': train_acc,}


@TI.register_fl_task(model='model', data_loader='val_dataset', device='device')     
def validate(model, val_dataset, device):
    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
            
    return {'validation_accuracy': val_acc,}

## Time to start a federated learning experiment

In [11]:
# create an experimnet in federation
experiment_name = 'disaster_classification'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

In [12]:
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(model_provider=MI, 
                   task_keeper=TI,
                   data_loader=fed_dataset,
                   rounds_to_train=ROUNDS,
                   opt_treatment='CONTINUE_GLOBAL')



In [13]:
fl_experiment.stream_metrics()

In [14]:
xxx = dir(fl_experiment)
for _ in xxx:
    print(_)

__class__
__delattr__
__dict__
__dir__
__doc__
__eq__
__format__
__ge__
__getattribute__
__gt__
__hash__
__init__
__init_subclass__
__le__
__lt__
__module__
__ne__
__new__
__reduce__
__reduce_ex__
__repr__
__setattr__
__sizeof__
__str__
__subclasshook__
__weakref__
_assert_experiment_accepted
_get_initial_tensor_dict
_pack_the_workspace
_prepare_plan
_rebuild_model
_serialize_interface_objects
current_model_status
define_task_assigner
experiment_accepted
experiment_name
federation
get_best_model
get_last_model
logger
plan
prepare_workspace_distribution
remove_experiment_data
remove_workspace_archive
restore_experiment_state
serializer_plugin
start
stream_metrics
summary_writer
task_runner_stub
train_task_exist
validation_task_exist
write_tensorboard_metric


In [15]:
best_model = fl_experiment.get_best_model()



In [16]:
best_model.save_weights('best_model.h5')