# Federated Learning (FL) Allocation Algorithm

## The Algorithm

In [1]:
# Reminder of what each state needs:
# state0_FL() needs () and returns (clients, aggregators, T_proc, T_transf, T_agg, paths)
# state1_FL() needs (clients, aggregators, T_proc, T_transf, T_agg, paths) and returns (paths)
# OR (clients, aggregators, T_proc, T_transf, T_agg, paths)
# state2_FL() needs (paths) and returns (best_path)

def FL_allocation_alg(event_processors, event_links, event_clients):
    
    # Set global variables so all states run correctly
    global processors
    processors = event_processors
    global links
    links = event_links
    global state
    state = 0
    global previous_state
    previous_state = 0
    global min_clients
    min_clients = event_clients
    
    # Create local local variable used to store the result of state2_FL(), which needs to be used later on
    result_state2 = None
    
    # With the obtained data, run the simulator for the desired single event
    start = time.time()
    
    while state < 3:

        if state == 0:        
            result_state0 = state0_FL()

        elif state == 1:
            # If I got here from state0_FL()
            if previous_state == 0:
                result_state1 = state1_FL(result_state0[0], result_state0[1], result_state0[2], 
                                       result_state0[3], result_state0[4], result_state0[5])
            
            # If I got here from state1_FL()
            elif previous_state == 1:
                result_state1 = state1_FL(result_state1[0], result_state1[1], result_state1[2], 
                                       result_state1[3], result_state1[4], result_state1[5])

        elif state == 2:
            result_state2 = state2_FL(result_state1)

    end = time.time()
    runtime = end - start
    
    # Return a specific result depending on the final state at the end of the algorithm
    if state == 3:
        return ["Success", result_state2[0], result_state2[1], result_state2[2], runtime]
    elif state == 4:
        return ["Unfeasible Type A", None, None, None, runtime]
    elif state == 5:
        return ["Unfeasible Type B", None, None,None, runtime]

## Each individual state_FL() method

In [2]:
# Declare our global variables first
def state0_FL():
    # Create our variables that will be passed from one state to another
    paths = []
    clients = []
    aggregators = []
    T_proc = []
    T_transf = []
    T_agg = []
    
    # Create our global state and previous_state monitor variables
    global state
    state = 0
    global previous_state
    previous_state = 0
    
    # First we check which processors can house the current NN with the amount of data they want to run
    # The bigger the data they want to train the NN with, the more memory they will need to house it
    for processor in processors:
        #If the client can house the NN, then it CAN be considered a viable client
        # Plus a heuristic to only consider "max_paths" clients
        if processor.residual >= (processor.M_base + total_batch_size) and len(clients) <= max_paths:
            clients.append(processor)
        # If the client CANNOT house its own NN, then it can only be considered as an Aggregator. This will only
        # add a max of "max_paths" aggregators to the list, to limit the processing time of the algorithm
        # Additionally, a processor can only be an aggregator if it has the memory to house all the weights
        elif processor.residual >= processor.M_agg and len(aggregators) <= max_paths:
            aggregators.append(processor)
    
    # The sign "<=" is correct, because ONE of these processors might be chosen later on as the aggregator. Hence,
    # we would still need at least a "max_paths" number of processors in the list to be able to build the whole 
    # distributed training topology
    
    # If no clients exist, then we have to stop the program here
    if len(clients) == 0:
        # Finish the current state
        previous_state = 0
        # Go to state that adequately finishes the program
        state = 4
        return
    
    # If the min_clients > len(clients), then end the program
    elif len(clients) < min_clients:
        # Finish the current state
        previous_state = 0
        # Go to state that adequately finishes the program
        state = 5
        return
    
    # If no aggregators exist, then that means that every client can actually house its own NN
    # In this case, we have no preferences for which processor should be the aggregator, so we just make them
    # all potential candidates. Also include a heuristic to limit it to "max_paths" potential aggregators
    if len(aggregators) == 0:
        aggregators = processors [:max_paths]
    # The [:] here is because I want the COPY of processors to be called "aggregators", NOT for aggregators to 
    # be another reference/name for processors!!
    
    # Now we can calculate the time it would take for each client to FETCH the data AND PROCESS its NN
    # AND the time it would take for each client to process the aggregation phase
    for client in clients:
        T_proc.append([client, (1 + 1.5) * (client.G_whole / client.power) 
                       + client.D_client_in / (find_in_list(links, "link_" + str(client.ID) 
                                                            + str(client.ID)).value)])
        
        T_agg.append([client, client.G_agg / client.power])
    # Remember that G_agg is the same for all processors, since whoever ends up being the
    # Aggregator will always need to do the same simple operation: average the weights
    
    # The size of the weights-only output for all G's is the same because the NN is the same
    # The time it takes to PROCESS the NN will depend on many factors, but the weights-only output stays the same
    # Hence, we now calculate T_transf with D_weights and the info in the links list
    D_weights = clients[0].D_weights # Just take any D_weights from any processor, since it's the same for all
    for link in links:
        T_transf.append([link, 2.0 * (D_weights / link.value)])
        
    else:
         # Define the state that just finished!
        previous_state = 0
        
         # Define the next state!
        state = 1
        return (clients, aggregators, T_proc, T_transf, T_agg, paths)

In [3]:
def state1_FL(clients, aggregators, T_proc, T_transf, T_agg, paths):

    global previous_state
    global state
    
    # Create our T_train where two "best" paths will be added:
    # 1. One "best" path based on which clients can process the NN the fastest
    # 2. Another "best" path based on which clients have the fastest bandwidth between client and aggregator
    # This way the "final" selected path for the event will be able to counter the main system bottleneck:
    # bandwidth or processing power, and choose the best training time for the currently available system
    
    T_train = []
    
    # Choose our first potential aggregator
    current_aggregator = aggregators[0]
    
    # Create a new variable to NOT modify the original T_proc. Hence I HAVE to use the [:] at the end to create
    # a COPY, and not just a new REFERENCE to T_proc
    local_T_proc = T_proc[:]
    
    # Remove all instances of this aggregator in local_T_proc (will usually only be one)
    for element in T_proc:
        if element[0].name == current_aggregator.name:
            local_T_proc.remove(element)
            
    # Create a new variable to NOT modify the original T_transf. Same case as with T_proc
    local_T_transf = T_transf[:]    
    
    # Now I will modify T_transf so that only the transfer times between clients and my 
    # current aggregator are considered. In other words, all values where the ORIGIN of the link is the
    # current_aggregator will be removed, to only consider those links FROM clients TO aggregator
    
    # If the dest of the link is NOT the current aggregator...
    # ...OR the origin and dest are the same...
    # OR the origin is the aggregator, then REMOVE that link from local_T_transf
    links_to_remove = []
    # Look for the links to remove first
    for link in local_T_transf:
        if ( link[0].dest != current_aggregator.ID 
            or link[0].dest == link[0].origin 
            or link[0].origin == current_aggregator.ID ):
            links_to_remove.append(link)
    # Then remove them
    for link in links_to_remove:
        local_T_transf.remove(link)
            
    ############################################################################################################
    # First we will calculate the best training time based only on which clients process the NN the fastest
    
    # Now I will reorganize T_proc from fastest processing time to slowest processing time
    # Remember that T_proc = [client, client.flops / client.power]
    sorted_T_proc = sorted(local_T_proc, key=lambda x: x[1])
    
    # Now I will calculate the Training Time based on the fastest processors
    # First, get all the T_procs I need to sum, based on how many clients I want (defined by min_clients)
    # And do the same with the T_transf
    T_proc_to_sum = []
    T_transf_to_sum = []
    
    # If we don't have enough clients, then send an error code
    if len(sorted_T_proc) < min_clients:
        # Finish the current state
        previous_state = 0
        # Go to state that adequately finishes the program
        state = 5
        return
    
    for i in range(min_clients):
        T_proc_to_sum.append(sorted_T_proc[i]) 
        for element in local_T_transf:
            if element[0].origin == T_proc_to_sum[i][0].ID and element[0].dest == current_aggregator.ID:
                T_transf_to_sum.append(element)
    
    # Now to the calculation for the best T_train based on highest processing power!
#     T_train_pow = (current_aggregator.G_agg / current_aggregator.power
#                    + sum(i[1] for i in T_proc_to_sum) 
#                    + sum(i[1] for i in T_transf_to_sum))
#     T_train_pow = (current_aggregator.G_agg / current_aggregator.power
#                    + sum(i[1] for i in T_proc_to_sum) 
#                    + sum(i[1] for i in T_transf_to_sum))
    
    # Remember that I don't have to add ALL of the times, because everything is happening in PARALLEL! If I added
    # all of the times one after the other, then it would essentially be a sequential algorithm!! By the time the
    # BIGGEST time (slowest client) is done, THAT is when everyone is done!
#     T_train_pow = (current_aggregator.G_agg / current_aggregator.power
#                     + max(T_proc_to_sum, key = itemgetter(1))[1] * epochs * batch_size * total_batches_FL
#                     + max(T_transf_to_sum, key = itemgetter(1))[1] * epochs * batch_size * total_batches_FL
#                   )
#     T_train_pow = (current_aggregator.G_agg / current_aggregator.power
#                     + max(T_proc_to_sum, key = itemgetter(1))[1] * epochs * total_batches_FL
#                     + max(T_transf_to_sum, key = itemgetter(1))[1] * epochs * total_batches_FL
#                   )
    T_train_pow = (current_aggregator.G_agg / current_aggregator.power
                    + max(T_proc_to_sum, key = itemgetter(1))[1] * total_batches_FL
                    + max(T_transf_to_sum, key = itemgetter(1))[1]
                  )
    
    # Create list of clients used in thise addition
    clients_pow = []
    for element in T_proc_to_sum:
        clients_pow.append(element[0].name)
    
    # Add it to the current T_train. T_train_pow is currently in SECONDS and considers only ONE epoch. Hence, 
    # we want the total train time to consider all "epochs" defined in config.ipynb AND have its output be in 
    # HOURS, not seconds
#     T_train.append([current_aggregator.name, clients_pow, (T_train_pow * epochs * correction_factor_parall) / time_factor])
    T_train.append([current_aggregator.name, clients_pow, (T_train_pow * epochs) / time_factor])
    
    ############################################################################################################
    # Second we will calculate the best training time based only on which LINKS between clients and aggregator
    # are the fastest
    
    # First we reorganize T_transf from fastest link to slowest link
    # Remember that T_transf = [link, (D_weights / link.value)]
    sorted_T_transf = sorted(local_T_transf, key=lambda x: x[1])
    
    # Do the same process as before to idenfity which T_procs and T_transfs I need to sum
    T_proc_to_sum = []
    T_transf_to_sum = []
    
    # If we don't have enough clients, then send an error code
    if len(sorted_T_proc) < min_clients:
        # Finish the current state
        previous_state = 0
        # Go to state that adequately finishes the program
        state = 5
        return
    
    for i in range(min_clients):
        T_transf_to_sum.append(sorted_T_transf[i])
        for element in local_T_proc:
            if element[0].ID == T_transf_to_sum[i][0].origin:
                T_proc_to_sum.append(element)
    
    # Now to the calculation for the best T_train based on highest BANDWIDTH!
#     T_train_link = (current_aggregator.G_agg / current_aggregator.power
#                     + sum(i[1] for i in T_proc_to_sum) 
#                     + sum(i[1] for i in T_transf_to_sum))
    
    # Remember that I don't have to add ALL of the times, because everything is happening in PARALLEL! If I added
    # all of the times one after the other, then it would essentially be a sequential algorithm!! By the time the
    # BIGGEST time (slowest client) is done, THAT is when everyone is done!
#     T_train_link = (current_aggregator.G_agg / current_aggregator.power
#                     + max(T_proc_to_sum, key = itemgetter(1))[1] * batch_size * total_batches_FL
#                     + max(T_transf_to_sum, key = itemgetter(1))[1] * batch_size * total_batches_FL
#                    )
    T_train_link = (current_aggregator.G_agg / current_aggregator.power
                    + max(T_proc_to_sum, key = itemgetter(1))[1] * total_batches_FL
                    + max(T_transf_to_sum, key = itemgetter(1))[1]
                   )
    
    # Create list of clients used in thise addition
    clients_link = []
    for element in T_transf_to_sum:
        clients_link.append("processor_" + str(element[0].origin))
    
    # Add it to the current T_train. T_train_link is currently in SECONDS and considers only ONE epoch. Hence, 
    # we want the total train time to consider all "epochs" defined in config.ipynb AND have its output be in 
    # HOURS, not seconds
    T_train.append([current_aggregator.name, clients_link, (T_train_link * epochs) / time_factor])
    
    
    ############################################################################################################
    # Third, we only care about the fastest path, so we will only choose ONE of the two available paths
    minimum_train_time = min(T_train, key=itemgetter(2))
    
    ############################################################################################################
    # Fourth, append the path to the paths list
    paths.append(minimum_train_time)
    
    # Up to this point, we have determined the best path, whether it be based on processor power or on available
    # bandwidth, from one of the potential aggregators. Now we must go back and repeat this with ALL other
    # potential aggregators from our original list of aggregators
    # To do so, we first remove our current aggregator from the aggregator list:
    aggregators.remove(current_aggregator)
    
    # Then we just start this state all over again until a path for all aggregators has been reached
    if len(aggregators) > 0:
        # First we finish the current state
        previous_state = 1
        # And move on to the next state
        state = 1
        return (clients, aggregators, T_proc, T_transf, T_agg, paths)
    else:
        # Finish the current state
        previous_state = 1
        # And move on to the final state
        state = 2
        return paths


In [4]:
# def state2_FL(clients, aggregators, T_proc, T_transf, T_agg, paths):
def state2_FL(paths):
    
    global previous_state
    global state
    
    # This state is only in charge of obtaining the best path out of all the calculated paths so far
    # Remember that:
    # paths is a nested list of finalized individual paths
    # and each individual path is a list containing [aggregator, client list, train time]
    
    best_path = min(paths, key=itemgetter(2))
            
    # Program is done
    previous_state = 2
    state = 3
    return (best_path) 