# Ensemble methods to compute participants' contribution 

In [1]:
import sys
sys.path.append("../")

In [2]:
from decentralized_smart_grid_ml.federated_learning.federated_aggregator import weighted_average_aggregation

In [3]:
import pandas as pd
from tensorflow.keras.experimental import LinearModel

In [4]:
import numpy as np

In [5]:
train_set_path1 = "../data_sample/simple_ml_task/participants/participant_0/simple_ml_task_0.csv"
train_set_df1 = pd.read_csv(train_set_path1)
x_train1, y_train1 = train_set_df1[["x1", "x2"]].values[:10], train_set_df1["y"].values[:10]

In [6]:
train_set_path2 = "../data_sample/simple_ml_task/participants/participant_1/simple_ml_task_1.csv"
train_set_df2 = pd.read_csv(train_set_path2)
x_train2, y_train2 = train_set_df2[["x1", "x2"]].values, train_set_df2["y"].values

In [7]:
model1 = LinearModel(activation="sigmoid")
model1.compile(optimizer="sgd", loss="mse", metrics="accuracy")

model2 = LinearModel(activation="sigmoid")
model2.compile(optimizer="sgd", loss="mse", metrics="accuracy")

In [8]:
model1.fit(x_train1, y_train1, epochs=3)
model2.fit(x_train2, y_train2, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7efe3c787590>

In [9]:
models = [model1, model2]

In [10]:
test_set_path = "../data_sample/simple_ml_task/validator/simple_ml_task_test.csv"
test_set_df = pd.read_csv(test_set_path)
x_test, y_test = test_set_df[["x1", "x2"]].values, test_set_df["y"].values

In [11]:
global_model = LinearModel(activation="sigmoid")
global_model.compile(optimizer="sgd", loss="mse", metrics="accuracy")
# here the fit function is called because it needs the build. The trained model will NOT be used becuase
# we will override its weights with the new ones
global_model.fit(x_train1, y_train1)



<tensorflow.python.keras.callbacks.History at 0x7efe3c0cf050>

In [12]:
participants_weights = []
for model in models:
    participants_weights.append(model.get_weights())

# Simple average of local models

In [13]:
average_weights = weighted_average_aggregation(participants_weights, [0.5, 0.5])

2021-11-22 16:49:47,807 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - INFO - Start models' weights aggregation of 2 participants
2021-11-22 16:49:47,809 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 0 related to participant 0
2021-11-22 16:49:47,811 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 1 related to participant 0
2021-11-22 16:49:47,813 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 0 related to participant 1
2021-11-22 16:49:47,815 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 1 related to participant 1
2021-11-22 16:49:47,818 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - INFO - Finish models' weights aggregation of 2 participants


In [14]:
global_model.set_weights(average_weights)
simple_average_evaluation = global_model.evaluate(x_test, y_test)



# Ensamble method based on local models' output

In this approach we apply the softmax function to the output computed by each local models so that we can weight the contribution of each local model. 

In [15]:
from scipy.special import softmax as sc_softmax

In [16]:
score1 = model1.evaluate(x_test, y_test)[1]
score2 = model2.evaluate(x_test, y_test)[1]



In [17]:
scores = [score1, score2]

In [18]:
alpha = sc_softmax(scores)
print(alpha)

[0.36586441 0.63413559]


In [19]:
ensamble_general_weights = weighted_average_aggregation(participants_weights, alpha)
global_model.set_weights(ensamble_general_weights)

2021-11-22 16:50:09,939 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - INFO - Start models' weights aggregation of 2 participants
2021-11-22 16:50:09,941 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 0 related to participant 0
2021-11-22 16:50:09,942 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 1 related to participant 0
2021-11-22 16:50:09,944 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 0 related to participant 1
2021-11-22 16:50:09,945 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - DEBUG - Update layer 1 related to participant 1
2021-11-22 16:50:09,946 - decentralized_smart_grid_ml.federated_learning.federated_aggregator - INFO - Finish models' weights aggregation of 2 participants


In [20]:
ensamble_general_evaluation = global_model.evaluate(x_test, y_test)



# Ensamble method based on machine learning model

In this approach we fit a ML model to understand the best weight to assign at each participant's model.

Here we need 
- `models` contains the list of participants' models
- `participants_weights` contains the list of partiipants models' weights
- `global_model` is the model used for this task
- `(x_test, y_test)` is the test set
- `weighted_average_aggregation` function for the weighted aggregation of the model weights

In [21]:
import tensorflow as tf
from tensorflow import keras

In [22]:
class WeightedAverage(keras.layers.Layer):
    
    def __init__(self, n_models):
        super(WeightedAverage, self).__init__()
        self.n_models = n_models
        
    def build(self):
        self.W = self.add_weight(
            shape=(1, 1, self.n_models),
            initializer='uniform',
            dtype=tf.float32,
            trainable=True
        )
        
    def call(self, inputs):
        weighted_average_aggregation()
        inputs = [tf.expand_dims(i, -1) for i in inputs]
        inputs = Concatenate(axis=-1)(inputs) # (n_batch, n_feat, n_inputs)
        weights = tf.nn.softmax(self.W, axis=-1) # (1,1,n_inputs)
        # weights sum up to one on last dim
        return tf.reduce_sum(weights*inputs, axis=-1) # (n_batch, n_feat)

# to print the probabilities
# tf.nn.softmax(m.get_weights()[-3]).numpy()

In [23]:
W = tf.Variable(
    shape=(1, 2),
    initial_value=[[0.5, 0.5]],
    dtype=tf.float32,
    trainable=True
)

# LOOK AT THIS EXAMPLE

In [24]:
x = tf.constant(3.0)
with tf.GradientTape(persistent=True) as g:
  g.watch(x)
  y = x * x
  z = y * 2
dz_dx = g.gradient(z, x)  # (4*x^3 at x = 3)
print(dz_dx)

tf.Tensor(12.0, shape=(), dtype=float32)


In [35]:
class StackingWeightedEnsamble(keras.Sequential):
    
    def __init__(self, weak_models_weights, meta_model):
        super(StackingWeightedEnsamble, self).__init__()
        self.weak_models_weights = weak_models_weights
        self.meta_model = meta_model
        self.W = tf.Variable(
            shape=(1, len(self.weak_models_weights)),
            initial_value=[[0.5, 0.5]],
            dtype=tf.float32,
            trainable=True
        )
        print(type(self.W))

    def compile(self, optimizer, loss, metrics):
        super(StackingWeightedEnsamble, self).compile(optimizer=optimizer, loss=loss, metrics=metrics)
        self.meta_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
        
    def train_step(self, data):
        # Unpack the data
        x, y = data

        #with tf.GradientTape(watch_accessed_variables=False, persistent=True) as tape:
        with tf.GradientTape() as tape:
            # compute the model weights with a weighted average
            new_weights = weighted_average_aggregation(self.weak_models_weights, self.W)
            self.meta_model.set_weights(new_weights)
            # forward pass of the model
            #tape.watch(self.W)
            #y_i = self.W ** 2
            y_pred = self.meta_model(x)  # Forward pass
            loss = self.compiled_loss(
                y,
                y_pred
            )
        # TODO: change here
        # Compute gradients
        trainable_vars = self.meta_model.trainable_variables
        trainable_vars.append(self.W)
        print("trainable vars -> ", trainable_vars)
        gradients = tape.gradient(loss, trainable_vars)
        print("gradients -> ", gradients)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

In [36]:
global_model.set_weights(participants_weights[0])

In [37]:
stacking_weighted = StackingWeightedEnsamble(participants_weights, global_model)

<class 'tensorflow.python.ops.resource_variable_ops.ResourceVariable'>


In [38]:
stacking_weighted.compile(optimizer="sgd", loss="mse", metrics="accuracy")

In [39]:
stacking_weighted.fit(x_test, y_test, epochs=10)

Epoch 1/10


TypeError: in user code:

    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:3417 _call_for_each_replica
        return fn(*args, **kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    <ipython-input-35-51a768733441>:26 train_step
        new_weights = weighted_average_aggregation(self.weak_models_weights, self.W)
    ../decentralized_smart_grid_ml/federated_learning/federated_aggregator.py:32 weighted_average_aggregation
        if round(sum(alpha), 2) != 1:
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:1116 __iter__
        raise TypeError("'Variable' object is not iterable.")

    TypeError: 'Variable' object is not iterable.


In [31]:
stacking_weighted.fit(x_test, y_test, epochs=10)

Epoch 1/10
trainable vars ->  <tf.Variable 'Variable:0' shape=(1, 2) dtype=float32>
gradients ->  None


TypeError: in user code:

    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:3417 _call_for_each_replica
        return fn(*args, **kwargs)
    /home/fabsam/miniconda3/envs/py3.7/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    <ipython-input-26-3361c25cc470>:44 train_step
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

    TypeError: zip argument #1 must support iteration


# Comparison of aggregation methods

In [None]:
print("Simple average evaluation: %s" % simple_average_evaluation)
print("Ensamble general evaluation: %s" % ensamble_general_evaluation)