<a href="https://colab.research.google.com/github/joshua-stock/fl-official-statistics/blob/main/med-insurance/fl-tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hands-on introduction to Federated Learning for tabular data.

## 0. Introduction

### Cf. Tensorflow Federated Tutorials

**Getting started**

1. [Federated Learning for image classification](https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification)
1. [Federated Learning for Text Generation](https://www.tensorflow.org/federated/tutorials/federated_learning_for_text_generation)
1. [Tuning recommended aggregations for learning](https://www.tensorflow.org/federated/tutorials/tuning_recommended_aggregators)
1. [Federated Reconstruction for Matrix Factorization](https://www.tensorflow.org/federated/tutorials/federated_reconstruction_for_matrix_factorization)

**... and  [more](https://www.tensorflow.org/federated/tutorials/tutorials_overview)**



### Setup

In [1]:
# Setup colab if needed

import os

try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

print("COLAB? {}".format(IN_COLAB))

if IN_COLAB:

    # rm repo from gdrive
    if os.path.exists("fl-official-statistics"):
      %rm -r fl-official-statistics

    # clone
    !git clone https://github.com/joshua-stock/fl-official-statistics
    %cd fl-official-statistics

    # pull (the currenct version of the repo)
    !git pull

    !pip install -q tensorflow-federated==0.56.0
    # or possibly !pip install -r requirements.txt

    os.chdir("med-insurance")
    

# suppress tf debug logging
# =========================
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

#0 = all messages are logged (default behavior)
#1 = INFO messages are not printed
#2 = INFO and WARNING messages are not printed
#3 = INFO, WARNING, and ERROR messages are not printed

# S. https://stackoverflow.com/questions/35911252/disable-tensorflow-debugging-information

COLAB? False


### Data (medical insurance)

In [3]:
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder, MinMaxScaler

client_var = 'region'   # variable, used to split clients
target     = 'charges'  # target of the estimation

df_raw  = pd.read_csv('data/insurance.csv')

# preprocessing
df = df_raw.copy()

df[['sex', 'smoker']] = OrdinalEncoder(
  ).fit_transform(df[['sex', 'smoker']].astype('category'))
df[['age', 'bmi', 'children']] = MinMaxScaler(
  ).fit_transform(df[['age', 'bmi', 'children']])

clients = df[client_var].unique() # define the clients.

df.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,0.021739,0.0,0.321227,0.0,1.0,southwest,16884.924
1,0.0,1.0,0.47915,0.2,0.0,southeast,1725.5523
2,0.217391,1.0,0.458434,0.6,0.0,southeast,4449.462
3,0.326087,1.0,0.181464,0.0,0.0,northwest,21984.47061
4,0.304348,1.0,0.347592,0.0,0.0,northwest,3866.8552


## 1. Minimal example (no custom wrapper)

The following script takes as input data, a model architecture and some training specifics and applies Federated Learning to train a neural network.

No custom wrapper are used. Try to understand, how to use Tensorflow Federated.

In [8]:
# imports
# =======

from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import InputLayer, Dense
import tensorflow_federated as tff
from sklearn.metrics import r2_score

# parameter
# =========

SEED = 1234

# a. model architecture
def create_keras_model():
  return tf.keras.models.Sequential([
      InputLayer(input_shape=[len(df.columns) - 2]),
      Dense(40, activation='relu'),
      Dense(40, activation='relu'),
      Dense(20, activation='relu'),
      Dense(1)
  ])

# b. training specifics
NUM_ROUNDS = 5        # communication
NUM_EPOCHS = 5        # local client rounds
BATCH_SIZE = 128      # more training parameter
SHUFFLE_BUFFER = 20
PREFETCH_BUFFER = 5

# Validation split
# ================

train_data, test_data = train_test_split(df, test_size = 0.2, random_state = SEED)

# distribute data to clients 
# ==========================
# (... and convert to tensor)

train_data_fed = []
for client in clients:

  X_client = train_data.loc[
    train_data[client_var] == client,  ~df.columns.isin([target, client_var])]
  y_client = train_data.loc[train_data[client_var] == client,  target]

  tensor_client = tf.data.Dataset.from_tensor_slices((
    tf.convert_to_tensor(X_client),
    tf.convert_to_tensor(y_client)))

  train_data_fed.append(tensor_client)

# convert model for FL
# ====================
def model_fn():
  model = create_keras_model()
  return tff.learning.models.from_keras_model(
      model,
      input_spec = (
        tf.TensorSpec((
          model.input.shape[0], 
          model.input.shape[1]
          ), dtype = tf.float64),
        tf.TensorSpec((None,), dtype = tf.float64)
      ), loss = tf.keras.losses.MeanSquaredError(),
      metrics =  [
        tf.keras.metrics.MeanAbsoluteError()
        , tf.keras.metrics.MeanSquaredError()
        ]
  )

# Training
# ========

# prepare the data for training
train_data_fed_proc = [
  data.
    repeat(NUM_EPOCHS).
    shuffle(SHUFFLE_BUFFER, seed = SEED).
    batch(BATCH_SIZE).
    prefetch(PREFETCH_BUFFER)
  for data in train_data_fed]

# build a tff learning process
process = tff.learning.algorithms.build_weighted_fed_avg(
  model_fn,
	client_optimizer_fn = lambda: tf.optimizers.Adam(learning_rate = .05),
	server_optimizer_fn = lambda: tf.optimizers.Adam(learning_rate = .05))

# rem.: 
# 1. weighted FedAvg means weighted by the number of examples of each client
# 2. because of the choice of server_optimizer, actually FedAdam is used (instead of FedAvg).
# C.f.: https://www.tensorflow.org/federated/api_docs/python/tff/learning/algorithms/build_weighted_fed_avg 

# initialize the tff process
state = process.initialize()

# apply training (each round the updated model is communicated to each client)
hist = []

for round in range(NUM_ROUNDS):
  state, perf = process.next(state, train_data_fed_proc)
  hist.append(dict(perf['client_work']['train'].items()))

print("== Training Performance ==")
print(dict(perf['client_work']['train'].items()))

# eval ???
# ========

# test
# ====

# Fetch a fresh model
model = create_keras_model()
model.compile(
    loss = 'mean_squared_error',
    metrics = ['mean_squared_error',"mae", r2_score],
    run_eagerly = True)

# Support the new model with the calculated weights
weights = process.get_model_weights(state)
weights.assign_weights_to(model)

# Evaluate the model on the test data.
perf_test = model.evaluate(
    test_data.loc[:,~df.columns.isin([target, client_var])],
    test_data[target],
    verbose = 0)

print("== Testing Performance ==")
print(dict(zip(model.metrics_names, perf_test)))

== Training Performance ==
{'mean_absolute_error': 5992.9106, 'mean_squared_error': 82627130.0, 'loss': 82899660.0, 'num_examples': 53500, 'num_batches': 420}
== Testing Performance ==
{'loss': 314921760.0, 'mean_squared_error': 314921760.0, 'mae': 13084.958984375, 'r2_score': -1.2825415134429932}


## 2. Minimal example with wrappers

This section shows how to do the same operations from the first example using custom wrappers that we defined.

In [25]:
# import
import pandas as pd
from sklearn.preprocessing import (
  OrdinalEncoder, OneHotEncoder, MinMaxScaler)
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow_federated as tff


# import custom wrapper
from FLutils import (
    create_keras_model,    # construct a deep neural network (keras)
    model_fn,              # convert keras model to tff.learning.models
    prep_fed_train,        # convert training data to tensors for training w/ tensorflow
    prep_fed_test,         # convert test data to tensors for testing w/ tensorflow (other format than training data)
    train_fed              # train a keras model federated with distributed data
    )

# Compare:
help(prep_fed_train)

Help on function prep_fed_train in module FLutils:

prep_fed_train(X_train: pandas.core.frame.DataFrame, y_train: pandas.core.frame.DataFrame)
    Converts training data to Tensor Object.
    
    See https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#preprocessing_the_input_data
    
    Args:
        X_train (pd.DataFrame): Training features.
        y_train (pd.DataFrame): Training target.
    
    Returns:
        A `Tensor` based on `X_test`, `y_test`.



In [11]:
# define model
def keras_blueprint(compile = False, nfeatures = None):
    if nfeatures == None: nfeatures = len(df.columns) - 2
    # "- 2" because the target and a variable defining the clients are no features
    return create_keras_model(
        nfeatures = nfeatures,
        units = [40, 40, 20],
        activations = ['relu'] * 3,
        compile = compile)

In [12]:
# distribute data to clients 
# ==========================
# (... and convert to tensor)

train_data, test_data = train_test_split(
      df, test_size = 0.2, random_state = 42)

train_data_fed = []
test_data_fed = []

for client in clients:
  df_client = train_data[train_data[client_var] == client]
  df_client_train = df_client
  train_data_fed.append(
      prep_fed_train(
        df_client_train.loc[:,~ df_client_train.columns.isin([target, client_var])],
        df_client_train[target]
  ))
train_data_fed

[<_TensorSliceDataset element_spec=(TensorSpec(shape=(5,), dtype=tf.float64, name=None), TensorSpec(shape=(), dtype=tf.float64, name=None))>,
 <_TensorSliceDataset element_spec=(TensorSpec(shape=(5,), dtype=tf.float64, name=None), TensorSpec(shape=(), dtype=tf.float64, name=None))>,
 <_TensorSliceDataset element_spec=(TensorSpec(shape=(5,), dtype=tf.float64, name=None), TensorSpec(shape=(), dtype=tf.float64, name=None))>,
 <_TensorSliceDataset element_spec=(TensorSpec(shape=(5,), dtype=tf.float64, name=None), TensorSpec(shape=(), dtype=tf.float64, name=None))>]

In [19]:
# Training
result =  train_fed(
        model = model_fn(
            keras_creator = keras_blueprint,
            loss = tf.losses.MeanSquaredError()
        ),

        train_data = train_data_fed,
        eval_data = None,
        NUM_ROUNDS = 5,
        NUM_EPOCHS = 20,
        client_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05),
        server_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05),
        BATCH_SIZE = 128,
        SHUFFLE_BUFFER = 20,
        PREFETCH_BUFFER = 5,
        SEED = 42,
        verbose = True
    )

result['history'][-1]

TRAIN: {'mean_absolute_error': 9253.215, 'mean_squared_error': 169215840.0, 'loss': 170328030.0, 'num_examples': 21400, 'num_batches': 169}
TRAIN: {'mean_absolute_error': 8794.255, 'mean_squared_error': 151058500.0, 'loss': 152083970.0, 'num_examples': 21400, 'num_batches': 169}
TRAIN: {'mean_absolute_error': 8512.211, 'mean_squared_error': 141342750.0, 'loss': 142294370.0, 'num_examples': 21400, 'num_batches': 169}
TRAIN: {'mean_absolute_error': 8276.477, 'mean_squared_error': 133320940.0, 'loss': 134199990.0, 'num_examples': 21400, 'num_batches': 169}
TRAIN: {'mean_absolute_error': 8035.832, 'mean_squared_error': 125608580.0, 'loss': 126420820.0, 'num_examples': 21400, 'num_batches': 169}


{'mean_absolute_error': 8035.832,
 'mean_squared_error': 125608580.0,
 'loss': 126420820.0,
 'num_examples': 21400,
 'num_batches': 169}

In [20]:
# Evaluation
weights = result['process'].get_model_weights(result['state'])

model = keras_blueprint(compile = True)
weights.assign_weights_to(model)

perf_test = model.evaluate(
    test_data.loc[:,~test_data.columns.isin([target, client_var])], 
    test_data[target]
    )
dict(zip(model.metrics_names, perf_test))



{'loss': 316873056.0,
 'mae': 12751.40234375,
 'mean_squared_error': 316873056.0,
 'r2_score': -1.1070221662521362}

## Production

An example for production.

In [32]:
import numpy as np
from sklearn.preprocessing import (
  OrdinalEncoder, OneHotEncoder, MinMaxScaler)
import matplotlib.pyplot as plt

# tensorflow (federated) and keras
import tensorflow_federated as tff
import tensorflow as tf
from keras.callbacks import CSVLogger

# model selection
from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold

# statusbar for loops
import tqdm

from FLutils import (
    load_df,               # load data
    create_keras_model,    # construct a deep neural network (keras)
    model_fn,              # convert keras model to tff.learning.models
    prep_fed_train,        # convert training data to tensors for learning with tensorflow
    prep_fed_test,         # convert test data to tensors for testing with tensorflow (other format than training data)
    train_model,           # train a keras model
    train_fed              # train a keras model federated with distributed data
    )


# create evaluation splits
# ========================

nreps, nfolds = 1, 3
evaluation = RepeatedStratifiedKFold(n_splits = nfolds, n_repeats = nreps, random_state = 42)


# training budget
# ===============

n_epochs_fed =  5 # epochs for each client in one server iteration (federated training)
n_rounds_fed =  5 # federated training rounds including distribution to the clients and aggregation of the results 

# define model architecture
# =========================

features_fed = df.columns[~df.columns.isin([target, client_var])]

def keras_blueprint(compile = False, nfeatures = len(features_fed)):
    if nfeatures == None: nfeatures = len(features)
    
    return create_keras_model(
        nfeatures = nfeatures, 
        units = [40, 40, 20], 
        activations = ['relu'] * 3, 
        compile = compile)

# Note 1: we do not compile the model yet. The loss, metrics, and optimizers are introduced later.
#   S. https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#creating_a_model_with_keras
# Note 2: this function has to generate a new instance of a keras_model 
#   to be useable for generating a federated learning process
# Note 3: loss = mae -> overfitting?

# experiment logging
# ==================

experiment_name = 'zz_tutorial'
out_path = 'output/experiments'
experiment_path = out_path + "/" + experiment_name + "/"
if not os.path.exists(experiment_path + 'logs'): os.makedirs(experiment_path + 'logs')
if not os.path.exists(experiment_path + 'models'): os.makedirs(experiment_path + 'models')
if not os.path.exists(experiment_path + 'results'): os.makedirs(experiment_path + 'results')


# compute train
# =============

results_fed = []

eval_ind = 0 #nfolds*7#0
for train, test in tqdm.tqdm(list(evaluation.split(df, df[client_var]))[:]):

    # Logging
    rep  = int(eval_ind / nfolds)
    fold = int(eval_ind % nfolds)
    eval_ind += 1
    id = "r" + str(rep) + "f" + str(fold)
    #print('======= rep %s - fold %s  =======' % (rep, fold))


    # distribute train (and eval) data over the client and prep tensors.
    train_data_fed = []
    eval_data_fed  = []   
    for client in clients:
        outer_train_data_client = df[(df.index.isin(train)) & (df[client_var] == client)]
        train_data_client, eval_data_client = train_test_split(outer_train_data_client, test_size = 0.1, random_state = 42)
        
        train_data_fed.append(
            prep_fed_train(train_data_client[features_fed], train_data_client[target])) 
        eval_data_fed.append(
            prep_fed_test(eval_data_client[features_fed], eval_data_client[target]))
        
    # train
    #with tf.device('/device:gpu:0'): # possibly needed for colab
    result =  train_fed(
        model = model_fn(
            keras_creator = keras_blueprint,
            loss = tf.losses.MeanSquaredError()
        ),
        train_data = train_data_fed,
        eval_data  = eval_data_fed,
        NUM_ROUNDS = n_rounds_fed,
        NUM_EPOCHS = n_epochs_fed,
        client_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05),
        server_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05),
        BATCH_SIZE = 128,
        SHUFFLE_BUFFER = 20,
        PREFETCH_BUFFER = 5,
        SEED = 42,
        verbose = False
    )
    
    # save history
    pd.DataFrame(result['history']).to_csv(experiment_path + "logs/" + id + '_log.csv', sep = ";")

    # save model
    model = keras_blueprint()
    model_weights = result['process'].get_model_weights(result['state'])
    model_weights.assign_weights_to(model)
    model.save_weights(experiment_path + "models/" + id + '_weights.h5')
    
    # Note: load with e.g. 
    #   model = keras_blueprint(compile = True)
    #   model.load_weights(experiment_path + 'models/r0f0_weights.h5')
    #   model.weights

    results_fed.append(result)

100%|██████████| 3/3 [00:17<00:00,  5.86s/it]


## Exercises

### A. Overview

- Run the complete notebook.
- Take a closer look on the minimal example 1 and try to understand the steps.
- For what are the imported functions from FLutil.py used? Check using `help`.
- What are changes in the section "Production"?

### B. Performance in training volume

Change epochs (1, 5, 10, 25, 50) and rounds (5, 10, 25, 50), save the results and plot the test performance. Which hyperparameter has the higher impact on the performance - epochs or rounds?

### C. Weighted or unweighted average?

Use `build_unweighted_fed_avg` instead of `build_weighted_fed_avg` (or optionally custom weights). Are there changes in the result and its performance?

Use the best epochs/rounds combination you have found in B.

### D. New data (optional)

Apply Federated Learning for a 2-class classification problem.

In [None]:
#from pycaret.datasets import get_data
#data = get_data('diabetes')
#data

# baselines here: https://pycaret.gitbook.io/docs/get-started/quickstart#classification


### Optional additional exercises.

1. Try to optimize the layers and units of the model!
1. Add cross validation to minimal example 1!
1. Add training evaluation to minimal example 1!
1. In minimal example 1, decrease the lines of code without changing functionality or generality!
1. In minimal example 1: first distribute the data to each client, then generate a train-test split for each client! Rem.: Be careful with the randomization.