# Federated Next Word Prediction with Director example
## Using low-level Python API

In [1]:
# Install dependencies if not already installed
# !pip install tensorflow==2.5.1

# Connect to the Federation

In [1]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
cliend_id = 'frontend'

# 1) Run with API layer - Director mTLS 
# If the user wants to enable mTLS their must provide CA root chain, and signed key pair to the federation interface
# cert_chain = 'cert/root_ca.crt'
# API_certificate = 'cert/frontend.crt'
# API_private_key = 'cert/frontend.key'

# federation = Federation(client_id='frontend', director_node_fqdn='localhost', director_port='50051', disable_tls=False,
#                        cert_chain=cert_chain, api_cert=API_certificate, api_private_key=API_private_key)

# --------------------------------------------------------------------------------------------------------------------

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(client_id='frontend', director_node_fqdn='localhost', director_port='50051', tls=False)

In [2]:
shard_registry = federation.get_shard_registry()
shard_registry

{'env_one': {'shard_info': node_info {
    name: "env_one"
  }
  shard_description: "Dataset from The Whisperer in Darkness by H. P. Lovecraft"
  n_samples: 22492
  sample_shape: "3"
  sample_shape: "96"
  target_shape: "48904",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2021-09-02 18:02:15',
  'current_time': '2021-09-02 18:02:55',
  'valid_duration': seconds: 120}}

In [3]:
federation.target_shape

['48904']

In [4]:
# First, request a dummy_shard_desc that holds information about the federated dataset 
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
sample, target = dummy_shard_desc[0]

## Creating a FL experiment using Interactive API

In [5]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment

2021-09-02 18:02:59.041885: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


### Register dataset

In [6]:
from tensorflow.keras.utils import Sequence

class DataGenerator(Sequence):

    def __init__(self, shard_descriptor, indices, batch_size):
        self.shard_descriptor = shard_descriptor
        self.batch_size = batch_size
        self.indices = indices
        self.on_epoch_end()

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, index):
        index = self.index[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.indices[k] for k in index]

        X, y = self.shard_descriptor[batch]
        return X, y

    def on_epoch_end(self):
        self.index = np.arange(len(self.indices))


# Now you can implement you data loaders using dummy_shard_desc
class NextWordSD(DataInterface):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor

        train = round(len(self) * 0.8)
        self.train_indeces = list(range(1, train))
        self.valid_indeces = list(range(train, len(self)))

    def __getitem__(self, index):
        return self.shard_descriptor[index]

    def __len__(self):
        return len(self.shard_descriptor)

    def get_train_loader(self):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        if self.kwargs['train_bs']:
            batch_size = self.kwargs['train_bs']
        else:
            batch_size = 64

        return DataGenerator(self.shard_descriptor, self.train_indeces, batch_size=batch_size)

    def get_valid_loader(self):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        if self.kwargs['valid_bs']:
            batch_size = self.kwargs['valid_bs']
        else:
            batch_size = 512

        return DataGenerator(self.shard_descriptor, self.valid_indeces, batch_size=batch_size)

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_indeces)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_indeces)


### Describe a model and optimizer

In [None]:
# import tensorflow as tf
# from tensorflow.keras.layers import LSTM, Dense
# from tensorflow.keras.optimizers import Adam


# class Model(tf.keras.Model):
#     """Model definition."""

#     def __init__(self, **kwargs):
#         super().__init__()

#         vocab_size = 48904
#         self.lstm1 = LSTM(1000, return_sequences=True)
#         self.lstm2 = LSTM(1000)
#         self.d1 = Dense(1000, activation='tanh')
#         self.d2 = Dense(vocab_size, activation='softmax')

#     def call(self, x):
#         x = self.lstm1(x)
#         x = self.lstm2(x)
#         x = self.d1(x)
#         x = self.d2(x)
#         return x
    
#     def get_config(self):
#         config = dict({'name': self.name})
#         config['layers'] = [
#             {'class_name': layer.__class__.__name__,
#              'config': layer.get_config()} 
#             for layer in model._self_tracked_trackables
#         ]

#         return config

# # Construct an instance of Model
# model = Model()
# optimizer = Adam(learning_rate=0.001)
# model.compile(loss='categorical_crossentropy', optimizer=optimizer,
#               metrics=[tf.keras.metrics.CategoricalAccuracy()])
# batch_size = 64
# model.build(input_shape=[batch_size, 3, 96])

In [8]:
import tensorflow as tf
from tensorflow.keras.layers import Masking, LSTM, Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.losses import CategoricalCrossentropy

gpu_devices = tf.config.experimental.list_physical_devices("GPU")
for device in gpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

model = Sequential()
model.add(Masking(mask_value=0))
# model.add(LSTM(1000, return_sequences=True))
model.add(LSTM(20))
model.add(Dense(50, activation='tanh'))
model.add(Dense(48904, activation='softmax'))

optimizer = Adam(learning_rate=0.001)
loss_fn = CategoricalCrossentropy()
train_acc_metric = CategoricalAccuracy()
val_acc_metric = CategoricalAccuracy()

batch_size = 64
model.build(input_shape=[64, 3, 96])

In [9]:
fed_dataset = NextWordSD(train_bs=64, valid_bs=512)

### Define and register FL tasks

In [10]:
TI = TaskInterface()

# https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit
@TI.register_fl_task(model='model', data_loader='train_loader', device='device', optimizer='optimizer')
def train(model, train_loader, device, optimizer, loss_fn=loss_fn):
#     if tf.test.is_gpu_available():
#         device = tf.device('/gpu:0')
#     else:
#         device = tf.device('/cpu:0')

#     with device:
    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_loader):
#         train_loader = tqdm.tqdm(train_loader, desc='train')

        y = tf.convert_to_tensor(y_batch_train)
        with tf.GradientTape() as tape:
            y_pred = model(x_batch_train, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = loss_fn(y, y_pred)

        # Compute gradients
        trainable_vars = model.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update metrics
        train_acc_metric.update_state(y, y_pred)
    
    # Reset training metrics at the end of each epoch
    train_acc = train_acc_metric.result()
    train_acc_metric.reset_states()
    return {'train_acc': train_acc, 'loss': loss}


@TI.register_fl_task(model='model', data_loader='val_loader', device='device')
def validate(model, val_loader, device=''):
#     if tf.test.is_gpu_available():
#         device = tf.device('/gpu:0')
#     else:
#         device = tf.device('/cpu:0')

#     with device:
    for x_batch_val, y_batch_val in val_loader:
#         val_loader = tqdm.tqdm(val_loader, desc='validate')
        y = tf.convert_to_tensor(y_batch_val)
        # Compute predictions
        y_pred = model(x_batch_val, training=False)
        # Update the metrics.
        val_acc_metric.update_state(y, y_pred)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    return {'validation_accuracy': val_acc}

#### Register model

In [11]:
from copy import deepcopy

framework_adapter = 'openfl.plugins.frameworks_adapters.keras_adapter.FrameworkAdapterPlugin'
MI = ModelInterface(model=model, optimizer=optimizer, framework_plugin=framework_adapter)
# Save the initial model state
initial_model = deepcopy(model)

## Time to start a federated learning experiment

In [12]:
# create an experimnet in federation
experiment_name = 'word_prediction_test_experiment'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

In [16]:
# If I use autoreload I got a pickling error

# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(model_provider=MI, 
                    task_keeper=TI,
                    data_loader=fed_dataset,
                    rounds_to_train=2,
                    opt_treatment='RESET')

In [None]:
# If user want to stop IPython session, then reconnect and check how experiment is going 
# fl_experiment.restore_experiment_state(MI)

fl_experiment.stream_metrics()

In [None]:
# todo: add testing on metamorphosis

In [14]:
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Market shard descriptor."""

import re
from pathlib import Path

import numpy as np
import pandas as pd
from tensorflow.keras.utils import to_categorical

from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor


class NextWordShardDescriptor(ShardDescriptor):
    """"Data - any text."""

    def __init__(self, title: str = '', author: str = '') -> None:
        """Initialize NextWordShardDescriptor."""
        super().__init__()

        self.title = title
        self.author = author
        self.dataset_dir = list(Path.cwd().rglob(f'{title}.txt'))[0]
        self.data = self.load_data(self.dataset_dir)  # list of words
        self.X, self.y = self.get_sequences(self.data)

    def __len__(self):
        """Number of sequences."""
        return len(self.X)

    def __getitem__(self, index: int):
        """Return an item by the index."""
        return self.X[index], self.y[index]

    @property
    def sample_shape(self):
        """Return the sample shape info."""
        return ['3', '96']  # three vectors

    @property
    def target_shape(self):
        """Return the target shape info."""
        return ['48904']  # row at one-hot matrix with n = vocab_size

    @property
    def dataset_description(self) -> str:
        """Return the dataset description."""
        return f'Dataset from {self.title} by {self.author}'

    @staticmethod
    def load_data(path):
        """Load text file, return list of words."""
        file = open(path, 'r', encoding='utf8').read()
        data = re.findall(r'[a-z]+', file.lower())
        return data

    @staticmethod
    def get_sequences(data):
        """
        Transform words to sequences, for X transform to vectors as well.

        To make vocab, clean it and get keyed vectors:
            if not spacy.util.is_package('en_core_web_sm'):
                spacy.cli.download('en_core_web_sm')
            nlp = spacy.load('en_core_web_sm')
            clean_vocab_list = [word for word in nlp.vocab.strings if re.fullmatch(r'[a-z]+', word)]

            word_to_vector = pd.Series([], name='vector')
            for word in clean_vocab_list:
                word_to_vector[word] = nlp(word).vector
            word_to_vector.to_pickle('keyed_vectors.pkl')
        """
        # spacy en_core_web_sm vocab_size = 48904, vector_size = 96
        x_seq = []
        y_seq = []
        vectors = pd.read_pickle('keyed_vectors.pkl')
        for i in range(len(data) - 3):
            x = data[i:i + 3]  # make 3-grams
            y = data[i + 3]
            cur_x = [vectors[word] for word in x if word in vectors]
            if len(cur_x) == 3 and y in vectors:
                x_seq.append(cur_x)
                y_seq.append(vectors.index.get_loc(y))

        x_seq = np.array(x_seq)
        y_seq = to_categorical(y_seq, num_classes=vectors.shape[0])
        return x_seq, y_seq

In [15]:
fed_dataset.shard_descriptor = NextWordShardDescriptor(title='metamorphosis')

for epoch in range(20):
    train_loader = fed_dataset.get_train_loader()
    metrics = train(model, train_loader, 'cpu', optimizer)
    m = {key: value.numpy() for key, value in metrics.items()}
    print(f'Epoch {epoch} train {m}')

    val_loader = fed_dataset.get_valid_loader()
    metrics = validate(model, val_loader, 'cpu')
    m = {key: value.numpy() for key, value in metrics.items()}
    print(f'Epoch {epoch} validate {m}')

2021-09-02 18:04:31.076576: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudnn.so.8
2021-09-02 18:04:31.257442: I tensorflow/stream_executor/cuda/cuda_dnn.cc:359] Loaded cuDNN version 8202
2021-09-02 18:04:31.373647: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-09-02 18:04:31.739895: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11


Epoch 0 train {'train_acc': 0.051533017, 'loss': 6.7986197}
Epoch 0 validate {'validation_accuracy': 0.049560547}
Epoch 1 train {'train_acc': 0.0526533, 'loss': 6.4928417}
Epoch 1 validate {'validation_accuracy': 0.049560547}
Epoch 2 train {'train_acc': 0.052771226, 'loss': 6.3223925}
Epoch 2 validate {'validation_accuracy': 0.049560547}
Epoch 3 train {'train_acc': 0.06456368, 'loss': 6.092044}
Epoch 3 validate {'validation_accuracy': 0.07495117}
Epoch 4 train {'train_acc': 0.08166274, 'loss': 5.8576427}
Epoch 4 validate {'validation_accuracy': 0.083740234}
Epoch 5 train {'train_acc': 0.092629716, 'loss': 5.6957226}
Epoch 5 validate {'validation_accuracy': 0.08984375}
Epoch 6 train {'train_acc': 0.100943394, 'loss': 5.552137}
Epoch 6 validate {'validation_accuracy': 0.103271484}
Epoch 7 train {'train_acc': 0.113148585, 'loss': 5.420711}
Epoch 7 validate {'validation_accuracy': 0.10571289}
Epoch 8 train {'train_acc': 0.11933962, 'loss': 5.3094616}
Epoch 8 validate {'validation_accuracy'