Title: Server_Client_TFFEMNIST_CIF_Group

Purpose: This is the group code intended to be updated by all

Contains: TFF-EMNIST database, CIF (Data Quality + SINR) function

Date of Creation: November 25th, 2020

### Import all Packages ###

In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import tensorflow_datasets as tfds
from matplotlib import pyplot as plt
import collections
import nest_asyncio
%reload_ext tensorboard
nest_asyncio.apply()
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist

TensorFlow Addons offers no support for the nightly versions of TensorFlow. Some things might work, some other might not. 
If you encounter a bug, do not file an issue on GitHub.


### Load in TFF - EMNIST Data Set ###

In [None]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(only_digits=True)

### Create a Server Class ###

In [None]:
class server:
    
    def __init__(self, numOfClients):
        self.numOfClients = numOfClients #Num of Client
        self.clientIds = list(range(0,self.numOfClients)) #Client List
        self.updateRoundNum = 0  # Update Round Number
        self.serverModel = self.createBaseModel() #Create a Keras Model for MNIST
        self.clientActive = [] #If client is available or not based on time - day/night
        self.clientCIF = []
        self.clientSelected = []
        self.maxClientsPerRound = 5
        self.updateFromClients=[]
        self.__serverTestData_X=[]
        self.__serverTestData_Y=[]
        self.__serverTestData = self.setServerTestDataTFF(emnist_test)
        self.predictionAcc=[]
        self.predictionLoss=[]
        
        
    def createBaseModel(self):
        #return tf.keras.models.Sequential([
        #    Dense(64, activation='relu',input_shape=(784,)),
        #    Dense(64, activation='relu'),
        #    Dense(10, activation='softmax'),])
        return tf.keras.models.Sequential([
            tf.keras.layers.Input(shape=(784,)),
            tf.keras.layers.Dense(10, kernel_initializer='zeros'),
            tf.keras.layers.Softmax()])
    
    def setServerTestData(self,Xtest,Ytest):  # NOT USED IN THIS CASE
        self.__serverTestData_X=Xtest
        self.__serverTestData_Y=Ytest
        
    def setServerTestDataTFF(self,serverTestData):  #Creates a test set for server
        BATCH_SIZE = 100
        SHUFFLE_BUFFER = 100
        PREFETCH_BUFFER= 10

        def preprocess(dataset):
            def batch_format_fn(element):
                """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
                return collections.OrderedDict(
                    x=tf.reshape(element['pixels'], [-1, 784]),
                    y=tf.reshape(element['label'], [-1, 1]))

            return dataset.repeat(10).shuffle(SHUFFLE_BUFFER).batch(
              BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

        serverData = tfds.as_numpy(preprocess(serverTestData.create_tf_dataset_for_client(serverTestData.client_ids[30])))

        return serverData
     
        
    def initialBroadcast(self):
        for i in self.clientIds:
            clientName = client_list["client_"+str(self.clientIds[i])]  #Create Client Name, Using Client_List Dictionary
            #clientName = "client_"+str(self.clientIds[i])  #Create Client Name
            #print(clientName)
            clientName.setInitialModel(self.serverModel) #setInitialModel-> Method of client Class
            #eval(clientName).setInitialModel(self.serverModel) #setInitialModel-> Method of client Class
            
    def getClientActiveStatus(self):  #if the client is available for update or not randomly set in client
        self.clientActive = []
        for i in self.clientIds:
            clientName = client_list["client_"+str(self.clientIds[i])]  #Create Client Name
            #print(clientName)
            if(clientName.sendActiveStatus() == 1): #getActiveStatus() -> method of client class
                self.clientActive.append(i)
               
    def getClientCIF(self): # Get Client Importance Factor for active clients
        self.clientCIF=[]
        for i in self.clientActive:
            clientName = client_list["client_"+str(i)]
            c_cif = clientName.sendCIF()   #sendCIF() - method of client
            self.clientCIF.append(c_cif)
        print("Clients with Acceptable CIF: ", self.clientCIF)
        
    def getClientSelected(self): # Select The Top (N = maxClientsPerRound) with the highest CIF value
        self.clientSelected = []
        # Use non-class, local variables to leave class variables clean
        clientCIF, clientActive = ( list(t) for t in zip(*sorted(zip(self.clientCIF, self.clientActive)))) # Sort Active Clients and their CIF values, by CIF
        self.clientSelected  = clientActive[-self.maxClientsPerRound:] # Select the N=maxClientsPerRound of clients with highest CIF
        self.clientCIF = clientCIF[-self.maxClientsPerRound:] # Select the N=maxClientsPerRound of highest CIF
            
    
    def getModelUpdateFromClients(self):
        print("--------------------------------------------\n","Round NO:",self.updateRoundNum)
        print("Active Clients: ",self.clientActive)
        print("CIF of Active Clients: ", self.clientCIF)
        print("Selected Clients: ", self.clientSelected)
        print("Training Clients with CIF > 10")
        self.updateRoundNum +=1
        self.updateFromClients=[]
        #self.dataPointsClients=[]
        
        for i in range (0,len(self.clientSelected)):
            if self.clientCIF[i] >= 10:  #Select Client only if CIF > some value 10 chosen for testing ONLY
                clientName = client_list["client_"+str(int(self.clientSelected[i]))]
                self.updateFromClients.append(clientName.sendClientUpdate()) #Get a list of updates from selected clients
            
        avgModelWeights = self.computeFedAvg(self.updateFromClients)
        #print("average model weights = \n",avgModelWeights)
        self.serverModel.set_weights(avgModelWeights)   
        
    def computeFedAvg(self,updates):  #Compute Federated Averaging from Available Clients
        totalDataPoints = 0
        scaleFactor=[]
        for i in range (0,len(updates)):
            totalDataPoints += updates[i][1]   #Sum the total Data Points on All Available Clients
            scaleFactor.append(updates[i][1])  #Store individual number datapoints for clients

        scaleFactor = np.array(scaleFactor)/totalDataPoints #Create the scale factor
       
        sumOfAvgWeights = []*len(updates[0][0]) 
        
        for j in range (0,len(updates[0][0])): #range of layers
            k=np.zeros_like(updates[0][0][j])
            #print("ShapeofK:",k.shape)
            for i in range(0,len(updates)): #range of clients
                k=k+updates[i][0][j]*scaleFactor[i]
                #print("ShapeofK:",k.shape)
            sumOfAvgWeights.append(k)
      
        return sumOfAvgWeights   
    
    def testServerModel(self):
        self.serverModel.compile(loss = 'sparse_categorical_crossentropy',
                                optimizer = tf.keras.optimizers.SGD (learning_rate=.01),
                                metrics=['accuracy'])
        testData = next(iter(self.__serverTestData))
        return self.serverModel.evaluate(testData['x'],
                                         testData['y'])
        
        
    def updateAllClients(self):
        for i in self.clientIds:
            clientName = client_list["client_"+str(self.clientIds[i])]  #Create Client Name
            clientName.setModelUpdateWeights(self.serverModel.get_weights()) 



### Create a Client Class ###

In [None]:
class client:
    
    def __init__(self,ID):
        self.id = ID
        self.__clientModel = tf.keras.models.Sequential()
        self.__clientModelWeights=[]
        self.activeStatus = 0
        self.__clientDataX=[] #Private to Client
        self.__clientDataY=[]
        self.clientCIF=0
        self.meanSINR = int(np.random.normal(15, 2.5, 1)) #Select random int from normal dist 
        self.roundSINR = 0
        self.propFair = 0
        self.epochs = 10
        self.lr = 0.01
        self.__clientData = self.__clientDataFromTFF(emnist_train)
        
    def __clientDataFromTFF(self,tffDataset):
        NUM_EPOCHS = self.epochs
        BATCH_SIZE = 100
        SHUFFLE_BUFFER = 100
        PREFETCH_BUFFER= 10

        def preprocess(dataset):
            def batch_format_fn(element):
                """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
                return collections.OrderedDict(
                    x=tf.reshape(element['pixels'], [-1, 784]),
                    y=tf.reshape(element['label'], [-1, 1]))

            return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
              BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
        
        return tfds.as_numpy(preprocess(tffDataset.create_tf_dataset_for_client(tffDataset.client_ids[self.id])))
    
    
    #Methods Private to client
    def __setActiveStatus(self): 
        #self.activeStatus = int(np.random.randint(0,2,1))
        self.activeStatus = 1
    
    def __setRoundSINR(self):
        self.roundSINR = np.round((np.random.normal(self.meanSINR, 1, 1)[0]), 2)
                               
    def __setPropFair(self):
        self.propFair = self.roundSINR / self.meanSINR
        
    def __setdataQuality(self):\
        
        ## Calculate for entropy - Start ##
        
#         for values in classification_probabilities:
#           for value in values:
#             score += value * np.log(value)
#         score = score * -1

        ## Calculate for entropy - End ##
        
        self.dataQuality = 20 # Set 20 so that CIF value will always be passing
    
    def __setCIF(self):
        #self.clientCIF = int(np.random.randint(0,50,1))  # Random Number between 0 and 49 for check purpose only
        self.__setRoundSINR()
        self.__setPropFair() #This is currently unused
        self.__setdataQuality()
        self.clientCIF = self.roundSINR + self.dataQuality
        # self.clientCIF = (-1/self.roundSINR) + self.dataQuality #Formula for computing client CIF
        
    def __getTrainData(self):
        return self.__clientDataX, self.__clientDataY  # Just for Testing will be expanded to select meaningful data
    
    #Public Methods
    def sendActiveStatus(self):
        self.__setActiveStatus()
        return self.activeStatus
    
    def sendCIF(self):
        self.__setCIF()
        return self.clientCIF
    
    def setInitialModel(self,model):
        self.__clientModel=model
        print(self.__clientModel.summary)
         
    def setClientData(self,X,Y):
        self.__clientDataX = X
        self.__clientDataY = Y
   

    def sendClientUpdate(self):
        print("->",self.id,end=" ")
        self.__clientModel.compile(loss = 'sparse_categorical_crossentropy',
                                optimizer = tf.keras.optimizers.SGD (learning_rate=self.lr),
                                metrics=['accuracy'])
        #print(self.__clientDataX.shape)
        #print(self.__clientDataY.shape)
        if len(self.__clientDataX) > 0:
            history = self.__clientModel.fit(self.__clientDataX,
                                   tf.keras.utils.to_categorical(self.__clientDataY),
                                   epochs=self.epochs,
                                   validation_split=0.2,
                                   verbose=0,
                                    shuffle=True)

            updatedWeights = self.__clientModel.get_weights()
        
        else:
            currentData = next(iter(self.__clientData))
            history = self.__clientModel.fit(currentData['x'],
                                   currentData['y'],
                                   epochs=self.epochs,
                                   verbose=0,shuffle=True)

            updatedWeights = self.__clientModel.get_weights()
       
        
        numOfDataPoints = len(currentData['x'])   #<-- Set the correct type to get the number of datapoints in the current update round

        return (updatedWeights,numOfDataPoints)
    
    def setModelUpdateWeights(self,modelUpdateWeights):
        self.__clientModel.set_weights(modelUpdateWeights)
        #self.__clientModel.
        
    def plotClientData(self):
        #print("Y_Data",self.__clientDataY)
        id0=[]
        id1=[]
        id2=[]
        id3=[]
        id4=[]
        id5=[]
        id6=[]
        id7=[]
        id8=[]
        id9=[]
        for i in range (0,len(self.__clientData['x'])):
            for j in range (0,10):
                if j==self.__clientData['y']:
                    eval('id'+str(j)).append(i)
        xAxis=['0','1','2','3','4','5','6','7','8','9']
        yAxis=[len(id0),len(id1),len(id2),len(id3),len(id4),len(id5),len(id6),len(id7),len(id8),len(id9)]
        plt.bar(xAxis,yAxis)
        title="Data for Client"+str(self.id)
        plt.title(title)
        plt.show()

### Initialize Clients ###

In [None]:
client_list = {}

# Set the number of clients
num_clients = 10
for i in range(num_clients):
    name = "client_" + str(i)
    client_list[name] = client(i)

### Initialize Server ###

In [None]:
# Initialize server with number of clients
serverFA = server(num_clients)

### Initial Broadcast from Server to Client ###

In [None]:
serverFA.initialBroadcast()

In [None]:
numberOfTraingRounds = 15
trainingMetrics=[]*numberOfTraingRounds
for i in range(0,numberOfTraingRounds):
    serverFA.getClientActiveStatus()
    serverFA.getClientCIF()
    serverFA.getClientSelected()
    serverFA.getModelUpdateFromClients()
    trainingMetrics.append(serverFA.testServerModel())
    serverFA.updateAllClients()

In [None]:
plt.plot(trainingMetrics)


In [None]:
raise SystemExit("Stop right there!")
