In [1]:
import tensorflow as tf
import tensorflow_federated as tff
import numpy as np
import matplotlib.pyplot as plt
import wandb
from wandb.integration.keras import WandbCallback
import os

2024-07-31 11:42:08.849666: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-31 11:42:08.851754: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-31 11:42:08.874474: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-31 11:42:08.874508: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-31 11:42:08.874531: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to regi

### Hyperparameters

Wandb (Weights & Biases) is a machine learning platform that helps developers track experiments, visualize data, and share insights to improve models efficiently. (https://wandb.ai)

**NUM_CLIENTS**: Number of federated Clients. The dataset is distributed equally among them

**NUM_ROUNDS**: Number of federated rounds. One round represents a single round of Federated Averaging, which consists of pushing the server state (including the model parameters) to the clients, on-device training on their local data, collecting and averaging model updates, and producing a new updated model at the server.

**BATCH_SIZE** = Number of samples distributed in one round to one client

In [2]:
USE_WANDB = False
NUM_CLIENTS = 10
NUM_ROUNDS = 10
BATCH_SIZE = 20

### Dataload

In [3]:
# Load CIFAR10 dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

# Normalize pixel values
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train, x_test = x_train.astype(np.float32), x_test.astype(np.float32)

# One-hot encode the labels
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)

### Creation of Model, Federated Training and Evaluation

For federated learning we first must create a federated dataset from the regular dataset loaded with TF, by slicing it by the nr of federated clients and by the batch size. We can use https://www.tensorflow.org/api_docs/python/tf/data/Dataset#from_tensor_slices for that.

Then the federated algorithm is built via:
[build_weighted_fed_avg](https://www.tensorflow.org/federated/api_docs/python/tff/learning/algorithms/build_weighted_fed_avg)
or
[build_unweighted_fed_avg](https://www.tensorflow.org/federated/api_docs/python/tff/learning/algorithms/build_unweighted_fed_avg)
respectively where different optimizers and aggregators can be specified for our experiments

To execute one round of federated learning we execute the [IterativeProcess.next-function](https://www.tensorflow.org/federated/api_docs/python/tff/templates/IterativeProcess)

For good comparison to centralised learning we dont execute the evaluation in a federated way (this would also be possible with TFF), instead we copy the weights to a centralized clone of the model and evaluate the holdout testing set in each round.

Metrics of the evaluation start with ``val_`` in the output

In [9]:
if USE_WANDB:
    wandb.init(project="federated_learning", group="group_1", name=f"tf_federated_differential_privacy_{NUM_CLIENTS}clients_{NUM_ROUNDS}rounds_{BATCH_SIZE}batchsize")
    
# Create a function that returns a compiled Keras model
def create_keras_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(20, (5, 5), activation='relu', input_shape=(32, 32, 3)),
        tf.keras.layers.MaxPooling2D((2, 2)),

        tf.keras.layers.Conv2D(50, (5, 5), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),

        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(500, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model

# Wrap the Keras model for use with TFF
def model_fn():
    keras_model = create_keras_model()
    return tff.learning.models.from_keras_model(
        keras_model,
        input_spec=(tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32),
                    tf.TensorSpec(shape=[None, 10], dtype=tf.float32)),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=[tf.keras.metrics.CategoricalAccuracy()])

#Convert the dataset to a federated dataset
def preprocess(dataset):
    def batch_format_fn(element):
        return (tf.reshape(element['x'], [-1, 32, 32, 3]), tf.reshape(element['y'], [-1, 10]))
    return dataset.batch(BATCH_SIZE).map(batch_format_fn)

client_data = np.array_split(x_train, NUM_CLIENTS)
client_labels = np.array_split(y_train, NUM_CLIENTS)

federated_train_data = [
    preprocess(tf.data.Dataset.from_tensor_slices({'x': client_data[i], 'y': client_labels[i]}))
    for i in range(NUM_CLIENTS)
]

# Create a federated averaging process
# iterative_process = tff.learning.algorithms.build_unweighted_fed_avg( # EXPERIMENT: Unweighted training, also needed fro Differential Privacy
iterative_process = tff.learning.algorithms.build_weighted_fed_avg(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.Adam(),
    # client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.1), # EXPERIMENT: Different Client Optimizer
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    # server_optimizer_fn=lambda: keras.optimizers.Adam(), # EXPERIMENT: Different Server Optimizer
    # model_aggregator=tff.learning.model_update_aggregator.dp_aggregator(noise_multiplier=0.1, clients_per_round=NUM_CLIENTS), # EXPERIMENT: Differential Privacy
    # model_aggregator=tff.learning.compression_aggregator() # EXPERIMENT: Compression
)

state = iterative_process.initialize()

eval_model = create_keras_model()
eval_model.compile(
loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=[tf.keras.metrics.CategoricalAccuracy()])

# Save the model for logging
full_path = os.path.join("./", "model.keras")
eval_model.save(filepath=full_path)
if USE_WANDB:
    wandb.log_model(path=full_path, name="CIFAR10_CNN")

# Train the model for a few rounds
for round_num in range(1, NUM_ROUNDS + 1):
    state, metrics = iterative_process.next(state, federated_train_data)
    
    model_weights = iterative_process.get_model_weights(state)
    model_weights.assign_weights_to(eval_model)
    eval_metrics = eval_model.evaluate(x_test, y_test, verbose=2)
    
    print(f'Round {round_num}, Metrics={metrics}, Val_Metrics={eval_metrics}')
    
    if USE_WANDB:
        wandb.log({
            'round': round_num,
            'loss': metrics['client_work']['train']['loss'],
            'categorical_accuracy': metrics['client_work']['train']['categorical_accuracy'],
            'val_loss': eval_metrics[0],
            'val_categorical_accuracy': eval_metrics[1]
        })
    
if USE_WANDB:
    wandb.finish()

2024-07-31 11:57:03.498940: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 11:57:03.499074: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 11:57:03.515904: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 11:57:03.516062: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 11:57:03.783965: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 11:57:03.784063: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 11:57:03.880125: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 11:57:03.880240: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session


313/313 - 1s - loss: 2.0625 - categorical_accuracy: 0.3142 - 600ms/epoch - 2ms/step
Round 1, Metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('categorical_accuracy', 0.27886), ('loss', 1.9431365), ('num_examples', 50000), ('num_batches', 2500)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))]), Val_Metrics=[2.062493324279785, 0.3142000138759613]
313/313 - 1s - loss: 1.5602 - categorical_accuracy: 0.4363 - 675ms/epoch - 2ms/step
Round 2, Metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('categorical_accuracy', 0.37194), ('loss', 1.7072825), ('num_examples', 50000), ('num_batches', 2500)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))]), Val_Metrics=[1.5601530075073242, 0.43630000948905945]
313/313 - 1s - loss: 1.4250 - categorica

In [5]:
if USE_WANDB:
    wandb.finish()