<h1>Federated Learning - GTEx_V8 Example</h1>


<h2>Import dependencies</h2>

In [1]:
#dependencies for helper functions/classes
import pandas as pd
import pyarrow.parquet as pq
from typing import NamedTuple
import os.path as path
import os
import progressbar
import requests
import numpy as np
import random


#keras for ML
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dropout, Input, Dense
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.utils import plot_model, normalize
from tensorflow.keras import regularizers
from tensorflow.keras.optimizers import SGD, Adam, Nadam, Adadelta
from tensorflow.keras.activations import relu, elu, sigmoid

#sklearn for preprocessing the data and train-test split
from sklearn.utils import class_weight
from sklearn.model_selection import train_test_split, cross_val_score, KFold
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler, LabelEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, accuracy_score, classification_report
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, r2_score, mean_squared_error, mean_absolute_error

#for plots
import matplotlib
import matplotlib.pyplot as plt

#%matplotlib inline

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import backend as K

### # Parameter cell -->

In [25]:
seed = 7
_test_size = 0.2
comms_round = 50
local_epochs = 100
CLIENTS = 2
local_batch_size = 256
VERBOSE = 1

In [3]:
class Labels(NamedTuple):
    '''
    One-hot labeled data
    '''
    tissue: np.ndarray
    sex: np.ndarray
    age: np.ndarray
    death: np.ndarray
        

class Genes:
    '''
    Class to load GTEX samples and gene expressions data
    '''
    def __init__(self, samples_path: str = '', expressions_path: str = '', problem_type: str = "classification"):
        self.__set_samples(samples_path)
        self.__set_labels(problem_type)
        if expressions_path != '':
            self.expressions = self.get_expressions(expressions_path)

    def __set_samples(self, sample_path: str) -> pd.DataFrame:
        self.samples: pd.DataFrame = pq.read_table(sample_path).to_pandas()
        self.samples["Death"].fillna(-1.0, inplace = True)
        self.samples: pd.DataFrame = self.samples.set_index("Name")
        self.samples["Sex"].replace([1, 2], ['male', 'female'], inplace=True)
        self.samples["Death"].replace([-1,0,1,2,3,4], ['alive/NA', 'ventilator case', '<10 min.', '<1 hr', '1-24 hr.', '>1 day'], inplace=True)
        self.samples = self.samples[~self.samples['Death'].isin(['>1 day'])]
        return self.samples

    def __set_labels(self, problem_type: str = "classification") -> Labels:
        self.labels_list = ["Tissue", "Sex", "Age", "Death"]
        self.labels: pd.DataFrame = self.samples[self.labels_list]
        self.drop_list = self.labels_list + ["Subtissue", "Avg_age"]
        
        if problem_type == "classification":
            dummies_df = pd.get_dummies(self.labels["Age"])
            print(dummies_df.columns.tolist())
            self.Y = dummies_df.values
        
        if problem_type == "regression":
            self.Y = self.samples["Avg_age"].values
        
        return self.Y

    def sex_output(self, model):
        return Dense(units=self.Y.sex.shape[1], activation='softmax', name='sex_output')(model)

    def tissue_output(self, model):
        return Dense(units=self.Y.tissue.shape[1], activation='softmax', name='tissue_output')(model)

    def death_output(self, model):
        return Dense(units=self.Y.death.shape[1], activation='softmax', name='death_output')(model)

    def age_output(self, model):
        '''
        Created an output layer for the keras mode
        :param model: keras model
        :return: keras Dense layer
        '''
        return Dense(units=self.Y.age.shape[1], activation='softmax', name='age_output')(model)


    def get_expressions(self, expressions_path: str)->pd.DataFrame:
        '''
        load gene expressions DataFrame
        :param expressions_path: path to file with expressions
        :return: pandas dataframe with expression
        
        '''
        
        if expressions_path.endswith(".parquet"):
            return pq.read_table(expressions_path).to_pandas().set_index("Name") 
        else:
            separator = "," if expressions_path.endswith(".csv") else "\t"
            return pd.read_csv(expressions_path, sep=separator).set_index("Name") 

    def prepare_data(self, normalize_expressions: bool = True)-> np.ndarray:
        '''
        :param normalize_expressions: if keras should normalize gene expressions
        :return: X array to be used as input data by keras
        '''
        data = self.samples.join(self.expressions, on = "Name", how="inner")
        ji = data.columns.drop(self.drop_list)
        x = data[ji]
        
        # adding one-hot-encoded tissues and sex
        #x = pd.concat([x,pd.get_dummies(data['Tissue'], prefix='tissue'), pd.get_dummies(data['Sex'], prefix='sex')],axis=1)
        
        steps = [('standardization', StandardScaler()), ('normalization', MinMaxScaler())]
        pre_processing_pipeline = Pipeline(steps)
        transformed_data = pre_processing_pipeline.fit_transform(x)

        x = transformed_data
        
        print('Data length', len(x))
        
        return x #normalize(x, axis=0) if normalize_expressions else x
    
    def get_features_dataframe(self, add_tissues=False):
        data = self.samples.join(self.expressions, on = "Name", how="inner")
        ji = data.columns.drop(self.drop_list)
        df = data[ji]
        if add_tissues:
            df = pd.concat([df,pd.get_dummies(data['Tissue'], prefix='tissue'), pd.get_dummies(data['Sex'], prefix='sex')],axis=1)
        x = df.values
        
        min_max_scaler = MinMaxScaler()
        x_scaled = min_max_scaler.fit_transform(x)
        df_normalized = pd.DataFrame(x_scaled, columns=df.columns, index=df.index)
        return df_normalized


In [4]:
samples_path = '../data/gtex/v8_samples.parquet'
expressions_path = '../data/gtex/v8_expressions.parquet'

In [5]:
genes = Genes(samples_path, expressions_path, problem_type="regression")
X = genes.prepare_data(True)
Y = genes.Y

Data length 15343


In [6]:
#split data into training and test set
X_train, X_test, y_train, y_test = train_test_split(X, 
                                                    Y, 
                                                    test_size=_test_size, 
                                                    random_state=seed)

In [7]:
def create_clients(image_list, label_list, num_clients=10, initial='clients'):
    ''' return: a dictionary with keys clients' names and value as 
                data shards - tuple of images and label lists.
        args: 
            image_list: a list of numpy arrays of training images
            label_list:a list of binarized labels for each image
            num_client: number of fedrated members (clients)
            initials: the clients'name prefix, e.g, clients_1 
            
    '''

    #create a list of client names
    client_names = ['{}_{}'.format(initial, i+1) for i in range(num_clients)]

    #randomize the data
    data = list(zip(image_list, label_list))
    random.shuffle(data)

    #shard data and place at each client
    size = len(data)//num_clients
    shards = [data[i:i + size] for i in range(0, size*num_clients, size)]

    #number of clients must equal number of shards
    assert(len(shards) == len(client_names))

    return {client_names[i] : shards[i] for i in range(len(client_names))} 

In [8]:
#create clients
clients = create_clients(X_train, y_train, num_clients=CLIENTS, initial='client')

In [9]:
clients.keys(), clients['client_1'][0][0].shape

(dict_keys(['client_1', 'client_2']), (18388,))

In [10]:
def batch_data(data_shard, bs=256):
    '''Takes in a clients data shard and create a tfds object off it
    args:
        shard: a data, label constituting a client's data shard
        bs:batch size
    return:
        tfds object'''
    #seperate shard into data and labels lists
    data, label = zip(*data_shard)
    dataset = tf.data.Dataset.from_tensor_slices((list(data), list(label)))
    return dataset.shuffle(len(label)).batch(bs)

In [11]:
#process and batch the training data for each client
clients_batched = dict()
for (client_name, data) in clients.items():
    clients_batched[client_name] = batch_data(data, bs = local_batch_size)
    
#process and batch the test set  
test_batched = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(len(y_test))

In [12]:
clients_batched.keys(),clients_batched['client_1']

(dict_keys(['client_1', 'client_2']),
 <BatchDataset shapes: ((None, 18388), (None,)), types: (tf.float32, tf.float64)>)

In [13]:
from keras import backend as K

def coeff_determination(y_true, y_pred):
    SS_res =  K.sum(K.square( y_true-y_pred )) 
    SS_tot = K.sum(K.square( y_true - K.mean(y_true) ) ) 
    return ( 1 - SS_res/(SS_tot + K.epsilon()) )

def optimized_age_model_regression():
    # optimized_age_model(x_train, x_val, y_train, y_val, params: dict):
    input_layer = Input(shape=(clients['client_1'][0][0].shape[0],))
    reg = keras.regularizers.l1_l2(l1=0.3, l2=0.3)
    mod = Dense(1024, activation=relu)(input_layer) # 196
    mod = Dropout(0.1)(mod) 
    mod = Dense(512, activation=relu)(mod) # 196
    mod = Dropout(0.1)(mod)    
    mod = Dense(64, activation=relu)(mod) #64
    mod = Dropout(0.1)(mod)
    
    outputs = [Dense(1, name='age_output')(mod)] #let's try to make it simple and start with age 
    #outputs = [Dense(y_train.shape[1], activation='sigmoid', name='age_output')(mod)] #let's try to make it simple and start with age 
    loss = {'age_output': 'mse'}
    weights={'age_output': 1.0}
    metrics = {'age_output': ['mae', coeff_determination]}
    
    model = Model(inputs=input_layer, outputs=outputs)
    model.summary()
    model.compile(optimizer='adam',
              loss=loss,
              loss_weights=weights,
              metrics=metrics,
                 )

    return model


Using TensorFlow backend.


In [14]:
# class SimpleRegression:
#     @staticmethod
#     def build(shape = clients['client_1'][0][0].shape[0]):
#         model = Sequential()
#         model.add(Dense(1024, input_shape=(shape,)))
#         model.add(Activation("relu"))
#         model.add(Dropout(0.1))
#         model.add(Dense(512))
#         model.add(Activation("relu"))
#         model.add(Dropout(0.1))
#         model.add(Dense(64))
#         model.add(Activation("relu"))
#         model.add(Dropout(0.1))
#         model.add(Dense(1))
#         return model

In [15]:
# def global_model_init():
#     model = Sequential()
#     model.add(Dense(1024, input_shape=(clients['client_1'][0][0].shape[0],)))
#     model.add(Activation("relu"))
#     model.add(Dropout(0.1))
#     model.add(Dense(512))
#     model.add(Activation("relu"))
#     model.add(Dropout(0.1))
#     model.add(Dense(64))
#     model.add(Activation("relu"))
#     model.add(Dropout(0.1))
#     model.add(Dense(1))
#     return model

In [16]:
def Huber(yHat, y, delta=1.):
    return np.where(np.abs(y-yHat) < delta,.5*(y-yHat)**2 , delta*(np.abs(y-yHat)-0.5*delta))

def transform_to_probas(age_intervals):
    class_names = ['20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
    res = []
    for a in age_intervals:
        non_zero_index = class_names.index(a)
        res.append([0 if i != non_zero_index else 1 for i in range(len(class_names))])
    return np.array(res)
    
def transform_to_interval(age_probas):
    class_names = ['20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
    return np.array(list(map(lambda p: class_names[np.argmax(p)], age_probas)))        

In [17]:
def weight_scalling_factor(clients_trn_data, client_name):
    client_names = list(clients_trn_data.keys())
    #get the bs
    bs = list(clients_trn_data[client_name])[0][0].shape[0]
    #first calculate the total training data points across clinets
    global_count = sum([tf.data.experimental.cardinality(clients_trn_data[client_name]).numpy() for client_name in client_names])*bs
    # get the total number of data points held by a client
    local_count = tf.data.experimental.cardinality(clients_trn_data[client_name]).numpy()*bs
    return local_count/global_count


def scale_model_weights(weight, scalar):
    '''function for scaling a models weights'''
    weight_final = []
    steps = len(weight)
    for i in range(steps):
        weight_final.append(scalar * weight[i])
    return weight_final

def sum_scaled_weights(scaled_weight_list):
    '''Return the sum of the listed scaled weights. The is equivalent to scaled avg of the weights'''
    avg_grad = list()
    #get the average grad accross all client gradients
    for grad_list_tuple in zip(*scaled_weight_list):
        layer_mean = tf.math.reduce_sum(grad_list_tuple, axis=0)
        avg_grad.append(layer_mean)
        
    return avg_grad

In [None]:
rmse = []
mae = []
r2 = []
huber_loss = []
    
# loss = 'mse'
# metrics = ['mae', coeff_determination]

# loss = {'age_output': 'mse'}
# weights={'age_output': 1.0}
# metrics = {'age_output': ['mae', coeff_determination]}

    
#initialize global model
global_model = optimized_age_model_regression()
# smlp_global = SimpleRegression()
# global_model = smlp_global.build()
    
#commence global training loop
for comm_round in range(comms_round):
    print('='*62)
    print('---------<STARTING TRAINING FOR ROUND {}>-----------'.format(comm_round))
    
    # get the global model's weights - will serve as the initial weights for all local models
    global_weights = global_model.get_weights()
    
    #initial list to collect local model weights after scalling
    scaled_local_weight_list = list()

    #randomize client data - using keys
    client_names= list(clients_batched.keys())
    random.shuffle(client_names)
    
    #loop through each client and create new local model
    for client in client_names:
        print('---------<STARTING TRAINING FOR CLIENT {}>-----------'.format(client))

        local_model = optimized_age_model_regression()
#         smlp_local = SimpleRegression()
#         local_model = smlp_local.build()
#         local_model.compile(loss=loss, 
#                       optimizer='adam',
#                            metrics=metrics)
        
        #set local model weight to the weight of the global model
        local_model.set_weights(global_weights)
        
        
        local_model.fit(clients_batched[client], epochs=local_epochs, verbose=VERBOSE)
        predictions = local_model.predict(X_test)
        test_y = y_test

        print('---------<TEST RESULTS FOR CLIENT {} ; USING LOCAL MODEL>-----------'.format(client))
        print("R^2", r2_score(test_y, predictions))
        print("Mean squared error", mean_squared_error(test_y, predictions))
        print("Mean absolute error", mean_absolute_error(test_y, predictions))
        print('Huber loss', np.mean(Huber(test_y, predictions)))
        
        #scale the model weights and add to list
        scaling_factor = weight_scalling_factor(clients_batched, client)
        print("SCALING FACTOR : {0}".format(scaling_factor))
        scaled_weights = scale_model_weights(local_model.get_weights(), scaling_factor)
        scaled_local_weight_list.append(scaled_weights)
        
        #clear session to free memory after each communication round
        K.clear_session()
        
    #to get the average over all the local model, we simply take the sum of the scaled weights
    average_weights = sum_scaled_weights(scaled_local_weight_list)
    
    #update global model 
    global_model.set_weights(average_weights)
    
    predictions = global_model.predict(X_test)
    test_y = y_test

    print('--------<TEST RESULTS AFTER ROUND {} ; USING GLOBAL MODEL>---------'.format(comm_round))
    print("R^2", round(r2_score(test_y, predictions), 3))
    print("Mean squared error", round(mean_squared_error(test_y, predictions), 3))
    print("Mean absolute error", round(mean_absolute_error(test_y, predictions), 3))
    print('Huber loss', round(np.mean(Huber(test_y, predictions)), 3))
    
    rmse.append(mean_squared_error(test_y, predictions))
    mae.append(mean_absolute_error(test_y, predictions))
    r2.append(r2_score(test_y, predictions))
    huber_loss.append(np.mean(Huber(test_y, predictions)))
    
    print('='*62)

Model: "model_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_5 (InputLayer)         [(None, 18388)]           0         
_________________________________________________________________
dense_12 (Dense)             (None, 1024)              18830336  
_________________________________________________________________
dropout_12 (Dropout)         (None, 1024)              0         
_________________________________________________________________
dense_13 (Dense)             (None, 512)               524800    
_________________________________________________________________
dropout_13 (Dropout)         (None, 512)               0         
_________________________________________________________________
dense_14 (Dense)             (None, 64)                32832     
_________________________________________________________________
dropout_14 (Dropout)         (None, 64)                0   