## Federated learning
Una proporción importante de los modelos de IA requieren recopilar información de los usuarios para poder realizar un aprendizaje centralizado y, posteriormente, suministrar las predicciones a los usuarios que lo solicitan. Un ejemplo de esta situación son los Sistemas de Recomendación, en los que los usuarios deben aportar información tal como los productos que han comprado en una plataforma de comercio electrónico, en una plataforma de streaming de vídeo las películas que han visto y lo que les han gustado, lo mismo con canciones, etc. Esta situación presenta dos inconvenientes:
1. Vulnera la privacidad de los usuarios, al requerirles datos personales de consumo o valoraciones de productos y servicios.
2. Pone en peligro los datos anteriores de cientos de miles o de millones de usuarios si la empresa es hackeada con éxito.
3. Obliga a que exista un costoso procesamiento centralizado de los millones de datos recopilados.

El aprendizaje federado permite evitar los inconvenientes anteriormente mencionados. Su diseño de alto nivel se explica en la siguiente figura. En un bucle sin fin, se realizan las siguientes acciones:<br>

bucle: 

0. El servidor envía su modelo 'global' de aprendizaje a cada uno de los usuarios que se concectan.
1. Cada usuario entrena localmente el modelo utilizando exclusivamente sus datos actuales (últimas compras, últimas valoraciones, últimas canciones escuchadas...)
2. Cada usuario envía al servidor su modelo recien actualizado (entrenado con sus últimos datos). 
3. El servidor actualiza un modelo 'global' con la información de los últimos modelos 'locales' enviados por los usuarios concectados. Aquí se encuentra la clave de este campo de la IA: cómo crear un modelo global a partir de _N_ modelos locales; el enfoque más sencillo, y que funciona, es simplemente promediar los pesos de todos los modelos locales; en realidad es un promedio ponderado, donde los pesos de los modelos locales más evolucionados ponderan más (por ejemplo, los modelos locales de los usuarios que han escuchado cientos de canciones desde la última vez que entrenaron su modelo ponderarán más que aquellos correspondientes a los usuarios que solo han escuchado unas pocas canciones).

Nótese que cada vez que los usuarios reciben un modelo, éste está actualizado por el servidor respecto a la última versión que recibieron los usuarios. De esta manera, los _N_ modelos que recibe el servidor no parten de cero, si no que son _N_ evoluciones independientes de la última versión que había en el servidor.

<br><img src="concepto.png" width=500>
<br><br>Es importante resaltar que:
1. Los usuarios __nunca envían sus datos__, lo que envían son modelos entrenados (mejorados) con los datos que han creado en un intervalo de tiempo. Por lo tanto no se vulnera la privacidad de sus datos ni por parte de la empresa que ofrece el servicio, ni por parte de las comunicaciones entre el usaurio y el servidor. 
2. Los servidores __no reciben ni contienen ningún dato__, por lo que un hackeo de su sistema no comporta (de manera directa) un 'robo de datos'.
3. El procesamiento que debe hacer el servidor o cloud de la empresa es muchísmimo menor que el habitual: __es mucho más rápido crear el modelo global a partir de los modelos locales que entrenar el modelo global__.

Para mostrar el funcionamiento de un modelo federado, vamos a implementar una versión simplificada que realice una clasificación con el MNIST. En este caso, el mismo equipo va a ejecutar la parte del servidor y la de los clientes. En un caso real habría que añadir el nivel de comunicaciones servidor/cliente. El ejemplo mostrado es una versión simplificada, elaborada a partir del código en: https://towardsdatascience.com/federated-learning-a-step-by-step-implementation-in-tensorflow-aac568283399

In [1]:
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.utils import np_utils
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,MaxPooling2D,Dropout,Flatten,Dense

Aquí realizamos la carga del dataset MNIST y lo procesamos de la manera habitual para ejecutar una clasificación: pasamos las etiquetas a formato categórico y dividimos en conjuntos de entreanamiento y de testeo (10% del tamaño total).

In [3]:
(X_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# convert to categorical labels
y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)

#split data into training and test set
X_train, X_test, y_train, y_test = train_test_split(X_train, 
                                            y_train, test_size=0.1, random_state=42)

El siguiente código crea un diccionario con 10 clientes (tantos como categorías del MNIST), pero podría ser cualquier otro número de clientes.<br>
{ 'clients_1': datos del cliente 1,<br>
  'clients_2': datos del cliente 2,<br>
  etc. }
<br> Los datos unifican las Xs e Ys del MNIST (línea 16) convenientemente barajados (línea 17) para evitar sesgos. A cada uno de los 10 clientes les corresponde un décimo de los datos disponibles (línea 20). Esto se gestiona con facilidad creando una lista con un comprehension y usando slices (línea 21). nota: 'shard' se traduce como 'fragmento'. La línea 26 crea el diccionario descrito, y la línea 29 lo almacena en la variable 'clients'.

In [4]:
def create_clients(X_train, y_train, num_clients=10, initial='clients'):
    ''' return: a dictionary with keys clients' names and value as 
                data shards - tuple of images and label lists.
        args: 
            X_train: a list of numpy arrays of training images
            y_train:a list of categorized labels for each image
            num_client: number of fedrated members (clients)
            initials: the clients'name prefix, e.g, clients_1 
            
    '''

    #create a list of client names
    client_names = ['{}_{}'.format(initial, i+1) for i in range(num_clients)]

    #randomize the data
    data = list(zip(X_train, y_train))
    random.shuffle(data)

    #shard data and place at each client
    size = len(data)//num_clients
    shards = [data[i:i + size] for i in range(0, size*num_clients, size)]

    #number of clients must equal number of shards
    assert(len(shards) == len(client_names))

    return {client_names[i] : shards[i] for i in range(len(client_names))} 

#create clients
clients = create_clients(X_train, y_train, num_clients=10, initial='client')

Para procesar eficientemente la información en un modelo de red neuronal, tanto los datos de shape (None,28,28,1) como las etiquetas de shape (None,10) (línea 9) se introducen, de manera unificada, en un 'Dataset' TensorFlow (línea 10). 'De nuevo' se barajan los datos en la línea 11, y se devuelven al programa llamante. En las líneas 15 a 17 se prepara las versiones tensorFlow Dataset de entrenamiento _clients_batched_, y en la línea 20 la versión TensorFlow Dataset de testeo _test_batched_. _clients_batched_ va a ser posteriormente utilizado para entrenameinto en el _fit_ del modelo, y  _test_batched_ en el código de testeo de la calidad de los resultados.

In [5]:
def batch_data(data_shard, bs=32):
    '''Takes in a clients data shard and create a tfds object off it
    args:
        shard: a data, label constituting a client's data shard
        bs:batch size
    return:
        tfds object'''
    #seperate shard into data and labels lists
    data, label = zip(*data_shard)
    dataset = tf.data.Dataset.from_tensor_slices((list(data), list(label)))
    return dataset.shuffle(buffer_size=len(label)).batch(bs)


#process and batch the training data for each client
clients_batched = dict()
for (client_name, data) in clients.items():
    clients_batched[client_name] = batch_data(data)

#process and batch the test set  
test_batched = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(len(y_test))

Aquí creamos un método estático (no será necesario instanciar la clase para usarlo) que nos construye un sencillo clasificador CNN para el MNIST. Al parámetro shape le tendrá que llegar como argumento el shape de MNIST: (None,28,28,1), y en 'classes' el número de categorías: 10.

In [6]:
class SimpleMLP:
    @staticmethod
    def build(shape, classes):
        model = Sequential()
        model.add(Conv2D(filters=32,kernel_size=(3,3), input_shape=shape, activation="relu", padding="same"))
        model.add(MaxPooling2D())
        model.add(Conv2D(filters=64,kernel_size=(3,3), input_shape=shape, activation="relu", padding="same"))
        model.add(Dropout(0.4))
        model.add(MaxPooling2D())
        model.add(Flatten())
        model.add(Dense(classes, activation="softmax"))
        return model

En este ejemplo, el servidor actualizará el modelo haciendo una media ponderada de los pesos de los modelos de cada uno de los clientes. La idea es que los clientes a los que se les ha asignado más datos ponderen más en la consecución del modelo global que se crea a partir del conjunto de modelos locales. En este caso se les ha dado los mismos datos a todos (líneas 20 y 21 tres celdas hacia arriba, en la función _create_clients_). Se puede hacer el ejemplo más real cambiando esas asignaciones. <br><br>
En la línea 2 se obtiene la relación de clientes, que se usa en la línea 3 para recorrerlos uno a uno, obtener el tamaño (la cardinalidad) del Dataset de cada uno de ellos, y sumarlo todo para hallar la cardinalidad total. En la línea 5 se halla la cardinalidad del cliente solicitado _client_name_ (parámetro de la función), y en la línea 6 se introduce en la variable _scalar_ el valor decimal que define la importancia del modelo de ese cliente (_client_name_). en las líneas 7 a 10 modificamos los pesos del modelo de este cliente para que reflejen su importancia ponderada de cara a que el servidor construya el modelo global (unificado). La ecuación de la derecha formaliza esta ponderación, donde 'K' representa a los clientes, y Fk(w) son los pesos modificados del modelo del cliente 'k' (lo que acabamos de explicar).<br><br>
La siguiente función: _sum_scaled_weights_ se encarga de crear el modelo global a partir de los 'K' modelos locales anteriores (ecuación a la izquierda).
<img src="weighting.webp" width=500>

In [11]:
def weight_scalling_factor(clients_trn_data, client_name, weight):
    client_names = list(clients_trn_data.keys())
    global_count = sum([tf.data.experimental.cardinality(clients_trn_data[client]).numpy() for client in client_names])
    # get the total number of data points held by a client
    local_count = tf.data.experimental.cardinality(clients_trn_data[client_name]).numpy()
    scalar = local_count/global_count
    weight_final = []
    steps = len(weight)
    for i in range(steps):
        weight_final.append(scalar * weight[i])
    return weight_final


def sum_scaled_weights(scaled_weight_list):
    '''Return the sum of the listed scaled weights. The is equivalent to scaled avg of the weights'''
    avg_grad = list()
    #get the average grad accross all client gradients
    for grad_list_tuple in zip(*scaled_weight_list):
        layer_mean = tf.math.reduce_sum(grad_list_tuple, axis=0)
        avg_grad.append(layer_mean)        
    return avg_grad

<img src="federated_learning_esquema.jpg" width=70%>
Ya estamos en disposición de simular la ejecución del servidor y la de los 'K' clientes. Las líneas 2 y 3 nos crean el modelo *global_model*, haciendo uso de la función _build_ en la que programamos el clasificador convolucional. Vamos a actualizar este modelo un número limitado de veces; en nuestro caso, *federated_loops=10*. En un caso real sería un bucle infinito, desde el punto de vista de que los usuarios siempre están enviando nuevos modelos locales al servidor y el servidor siempre les envía cada modelo global actualizado.<br><br>
Cada vuelta del bucle (línea 7) el servidor recoge los pesos del modelo global (línea 10), que posteriormente serán enviados (en nuestro caso copiados) a cada uno de los clientes (línea 26). *scaled_local_weight_list* (línea 13) es una variable importante, ya que va a albergar la lista de todos los pesos de los modelos de todos los clientes; es decir será una lista de 'K' posiciones, donde cada una contendrá los pesos del modelo correspondiente al cliente 'K'. Cuando esa lista esté rellena, podremos unificar, en la línea 34, esos pesos (que ya fueron ponderados), y actualizar el modelo global con la información federada (línea 37).<br><br>
En definitiva, solo nos queda rellenar esa lista *scaled_local_weight_list* con los pesos de los modelos de los *K* clientes. Para ello recorremos cada uno de los *K* clientes (línea 20), y a cada uno le construimos su propio clasificador en las líneas 21 y 22 (idéntico al de los demás clientes e idéntico al del servidor). También lo compilamos de la misma manera que los demás (línea 23). Todavía no lo podemos ejecutar, porque debemos hacerlo a partir de la última versión del modelo global (línea 26). cuando por fin lo ejecutamos (línea 28), nos aseguramos de hacerlo únicamente con los datos de ese cliente _clients_batched[client]_. Finalmente, con los pesos actualizados correspondientes al último entrenamiento del cliente procesado, obtenemos los pesos ponderados de su modelo (línea 30) y los añadimos a la lista que contiene los pesos ponderados de los *K* modelos correspondientes a los *K* clientes (línea 31).   


In [13]:
#initialize global model
smlp_global = SimpleMLP()
global_model = smlp_global.build((28,28,1), 10)

federated_loops = 10
#commence global training loop
for current_loop in range(federated_loops):
    print(federated_loops-current_loop)            
    # get the global model's weights - will serve as the initial weights for all local models
    global_weights = global_model.get_weights()
    
    #initial list to collect local model weights after scalling
    scaled_local_weight_list = list()

    #randomize client data - using keys
    client_names= list(clients_batched.keys())
    random.shuffle(client_names)
    
    #loop through each client and create new local model
    for client in client_names:
        smlp_local = SimpleMLP()
        local_model = smlp_local.build((28,28,1), 10)
        local_model.compile(loss='categorical_crossentropy', metrics=['accuracy'] )
        
        #set local model weight to the weight of the global model
        local_model.set_weights(global_weights)
        #fit local model with client's data
        local_model.fit(clients_batched[client], epochs=2, verbose=1, batch_size=512)
        #scale the model weights and add to list
        scaled_weights = weight_scalling_factor(clients_batched, client,local_model.get_weights())
        scaled_local_weight_list.append(scaled_weights)
        
    #to get the average over all the local model, we simply take the sum of the scaled weights
    average_weights = sum_scaled_weights(scaled_local_weight_list)
    
    #update global model 
    global_model.set_weights(average_weights)
    

10
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
9
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
8
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
7
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
6
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2

Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
1
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2


Es el momento de comparar las calidades obtenidas con el modelo federado y el modelo global. Antes de nada, indicar que este ejemplo no es representativo del sesgo que se dará en situaciones reales; aquí todos los clientes tienen una porción aleatoria y de igual tamaño de las imágenes del MNIST. En casos reales, cada usuario aporta modelos 'sesgados' en el sentido de que sus datos tendrán distribuciones de probabilidad algo diferentes a las de otros usuarios. Por ejemplo, un usuario habrá aportado pocos datos, mientras que otro habrá contribuido con muchos, uno habrá adquirido bastantes productos para bebé, mientras que la mayoría no, etc. La manera de unificar los diferentes modelos locales en un modelo global es actualmente motivo de investigación.<br><br>
Para testear el modelo federado, en las líneas 10 y 11 se proporcionan los datos *X_test* y etiquetas *y_test* de testeo preparados al comienzo del notebook, y se llama a la función *test_model* que hace el feedforward (*predict*) de los datos (*X_test*), obteniendo las predicciones que aporta el clasificador global (*y_pred* en la línea 3). Estas predicciones se comparan, en la línea 4, con las etiquetas (*Y_test) utilizando *CategoricalCrossentropy* (línea 2), para hallar el loss. También se comparan para hallar el accuracy (línea 5). Ambas medidas se imprimen en la línea 6.<br><br>
Para testear el modelo global, nos 'cargamos' el modelo federado; para ello, entre las líneas 14 y 20 definimos de nuevo el *global_model* y lo entrenamos desde el principio con los datos del MNIST (nada de pesos ponderados de cada cliente). En las líneas 23 y 24 testeamos sus resultados.<br><br>
Como se puede observar, en este caso tan sencillo, equilibrado, y no sesgado, los resultados federados y tradicionales tienen la misma calidad. Lo importante es que hemos comprobado que __la implementación "federated learning" funciona__, y a partir de aquí se puede adaptar a datos y situaciones que sean más complejos y reales.

In [16]:
def test_model(X_test, Y_test,  model, federated_loops):
    cce = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
    y_pred = model.predict(X_test)
    loss = cce(Y_test, y_pred)
    acc = accuracy_score(tf.argmax(y_pred, axis=1), tf.argmax(Y_test, axis=1))
    print('federated_loops: {} | global_acc: {:.3%} | global_loss: {}'.format(federated_loops, acc, loss))
    return acc, loss

#test global model (federated learning) and print out metrics after each communications round
for(X_test, Y_test) in test_batched:
    global_acc, global_loss = test_model(X_test, Y_test, global_model, federated_loops)

# Test traditional MNIST classification    
global_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(y_train)).batch(320)
smlp = SimpleMLP()
global_model = smlp.build((28,28,1), 10) 

# compile & fit the global training data to model
global_model.compile(loss='categorical_crossentropy', metrics=['accuracy'] )
global_model.fit(global_dataset, epochs=10, verbose=1, batch_size=512)

#test the federated model and print out metrics
for(X_test, Y_test) in test_batched:
        acc, loss = test_model(X_test, Y_test, global_model, 1)

comm_round: 10 | global_acc: 98.850% | global_loss: 1.474400281906128
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
comm_round: 1 | global_acc: 98.650% | global_loss: 1.4762946367263794
