Pseudo Code for **GREEDY** algorithm

In [1]:
import pandas as pd
from multiprocessing import Pool
from threading import Thread, Barrier, Lock, get_ident
from queue import Queue
import copy
raw = pd.read_pickle("lb_algo_dataframe.bz2")
nprocs = 480

With a single AllGather MPI operation all load_imbalances for all processors are synced across all processors. Processors update this ```load_imbalance``` list consistently during the partitioning

In [2]:
avg_weight = raw[' weight'].sum()/nprocs
load_imbalance = [ (raw[raw[' source_proc'] == x][' weight'].sum()-avg_weight)/avg_weight for x in range(nprocs) ] 

The first part of the algorithm determines how many block are computed locally, for the remaining blocks we execute the iterative **GREEDY** algoritm

In [2]:
"""
Calculate for each process which grid cells do not need to be offloaded and can be computed locally.
Set the compute_proc property to -1 for each cell that can be offloaded
"""
def initial_calc(x):
  proc = raw[raw[' source_proc'] == x].copy()
  proc = proc.reset_index(drop=True)
  proc[' compute_proc'] = x
  weight = 0
  for y in range(proc.shape[0]):
    weight += proc.iloc[y][' weight']
    if weight > avg_weight:
      proc.loc[y+1:,' compute_proc'] = -1
      print("x",end="", flush=True)
      break
  return proc, proc.shape[0]-(y+1), weight
  
processed = Pool(8).map(initial_calc,range(nprocs))
print("\ninitial stage done!")

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
initial stage done!


This is the iterative greedy algorithm, MPI is simulated by point-to-point queues between all threads. The result is stored in a shared array, in the actual algorithm this is kept locally

In [3]:
#lists (one for each processor)
weights, to_offload, cur_weight = map(list,zip(*processed))

#point to point 'mpi' communication
MPI = [[Queue()for _ in range(nprocs)]for _ in range(nprocs)]

#resulting partitioning after loadbalancing
new_weights = [None]*nprocs

"""
*L (load imbalance):* is consistent between all processors during the algorithm
*weights:* weight and compute location of resident cells, 
*cur_weight:* cumulative weight of cells that will be computed on this processor
*to_offload:* number of resident cells that have not yet been assigned to a processor
"""
def GREEDY(our_proc,L,weights, cur_weight, to_offload):
  # In the worst case we communicate with everyone
  for i in range(1,nprocs): 
    our_val = L[our_proc]
    # We try to offload to next_proc, and try to receive from prev_proc
    next_proc = (our_proc+i)%nprocs
    prev_proc = (our_proc-i+nprocs)%nprocs

    # Send information about offloadable blocks to the next process
    MPI[our_proc][next_proc].put(to_offload)
    MPI[our_proc][next_proc].put(weights.loc[weights.shape[0]-to_offload:,' weight'])
   
    # Accept cells if we have room
    weight_size = MPI[prev_proc][our_proc].get()
    their_weights = MPI[prev_proc][our_proc].get()
    offloaded = 0
    for y in range(weight_size):
      if cur_weight > avg_weight:
        break
      offloaded += 1
      cur_weight += their_weights.iloc[y]
    MPI[our_proc][prev_proc].put(offloaded)

    # Update information about offloaded cells
    offloaded = MPI[next_proc][our_proc].get()
    weights.loc[weights.shape[0]-to_offload:weights.shape[0]-to_offload+offloaded,' compute_proc'] = next_proc
    to_offload -= offloaded

    # update L (load imbalance) for every process
    for j in range(nprocs):
      k = (j+i)%nprocs
      if L[j] <= 0:
        continue
      if (L[j] * L[k]) >= 0:
        continue
      m = min(abs(L[j]),abs(L[k]))
      L[j] -= m
      L[k] += m
        
    # Stop Conditions 
    if (max(L) < 0.02):
      new_weights[our_proc] = weights
      return 
      
  new_weights[our_proc] = weights
  
#instead of MPI, for this example we use threads  
threads = [Thread(target=GREEDY,args=( x, copy.deepcopy(load_imbalance), copy.deepcopy(weights[x])
                                     , copy.deepcopy(cur_weight[x]), copy.deepcopy(to_offload[x]))) for x in range(nprocs)]
for i in range(len(threads)):
  threads[i].start()
for i in range(len(threads)):
  threads[i].join()

Now we can print the resulting partitioning per process

In [8]:
new_weights[3]

Unnamed: 0,id,weight,source_proc,compute_proc
0,8193,113200.0,3,3
1,8194,58780.0,3,3
2,8195,47210.0,3,3
3,8196,47140.0,3,3
4,8197,50580.0,3,3
...,...,...,...,...
3067,11260,654100.0,3,8
3068,11261,539500.0,3,8
3069,11262,537900.0,3,8
3070,11263,333900.0,3,8
