In [17]:
import os
import math
import time
import string
import random
import itertools
import requests
from pprint import pprint
from collections import defaultdict

In [3]:
def launch_hydra_job(epoch, worker, mg, mg_id):
    print("Scheduling epoch {} of model_group {} with id {} on worker {}".format(epoch, mg, mg_id, worker))
    
    exec_id = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(32))
    data = {
        "epoch": epoch,
        "models": str(mg),
        "exec_id": str(exec_id),
        "model_group_id": str(mg_id)
    }
    worker_ip = "http://localhost:" + str(8000 + worker) + "/hydra"
    requests.post(url=worker_ip, json=data)

    return exec_id

In [4]:
def find_combinations(param_grid):
    param_keys = list(param_grid.keys())

    params_list = [param_grid[key] for key in param_keys]
    combinations = list(itertools.product(*params_list))

    param_combinations = []
    for comb in combinations:
        d = {}
        for i in range(len(comb)):
            d[param_keys[i]] = comb[i]
        param_combinations.append(d)

    return param_combinations

In [5]:
def create_model_groups(models, is_large_model, group_sz=2):
    large_models = []
    small_models = []
    for i in range(len(models)):
        if is_large_model[i]:
            large_models.append(models[i])
        else:
            small_models.append(models[i])
    
    nlm = len(large_models)
    nsm = len(small_models)
    ngroups = int(math.floor(nlm / group_sz))
    model_groups = []
    
    for i in range(ngroups):
        if i == ngroups - 1:
            model_groups.append(tuple(large_models[i*group_sz:]))
        else:
            model_groups.append(tuple(large_models[i*group_sz:(i + 1) * group_sz]))
        
    for i in range(nsm):
        model_groups.append(tuple(small_models[i]))
    
    return model_groups

In [6]:
def create_workers(nodes, train_shard_path="root_path"):
    # create a virtual worker for every GPU in the cluster
    worker_on_node = defaultdict(tuple)
    single_workers = []
    train_partitions = []
    
    worker_id = 0
    for id, gpus in nodes.items():
        ngpus = len(gpus)
        if ngpus == 1:
            single_workers.append(worker_id)
        
        for gpu_id in range(ngpus):
            train_partitions.append(os.path.join(train_shard_path, "gpu" + str(gpu_id)))
            worker_on_node[worker_id] = (id, gpu_id)
            worker_id += 1

    return worker_on_node, train_partitions, single_workers

In [7]:
def check_finished(worker, exec_id):
    with open("check_finished_" + str(worker) + ".txt", "r") as f:
        s = f.read().split("\n")
    return exec_id in s

In [8]:
def init_epoch(n_workers, param_grid, is_large_model, group_sz=2):
    initial_msts = find_combinations(param_grid)
    
    model_id_to_mst_mapping = {}
    current_msts = [(mst_id, mst) for mst_id, mst in enumerate(initial_msts)]
    for (mst_id, mst) in current_msts:
        model_id_to_mst_mapping[mst_id] = mst

    model_list = list(model_id_to_mst_mapping.keys())
    model_groups = create_model_groups(model_list, is_large_model)
    print("Model Groups:", pprint(model_groups))
    
    s = "Model ID: Model msts\n"
    for i in range(len(models_list)):
        s += str(models_list[i]) + " : " + pprint.pformat(initial_msts[i]) + "\n"
    print("Initial model configurations:", s)
    
    model_group_nworkers_trained = []
    model_group_on_workers = []
    
    for mgid, mg in enumerate(model_groups):
        if len(mg) > 1:
            model_group_on_workers.append(None)
        else:
            model_group_on_workers.append(None)
        model_group_nworkers_trained.append(0)

    mgw_pair = []
    for mgid in range(len(model_groups)):
        lis = []
        for j in range(n_workers):
            lis.append(False)
        mgw_pair.append(lis)

    worker_running_model_group = [None] * n_workers
    exec_id_on_worker = [None] * n_workers
    
    return model_groups, model_group_on_workers, worker_running_model_group, mgw_pairs, model_group_nworkers_trained, exec_id_on_worker


In [9]:
def get_idle_workers(worker_on_node, worker_running_model_group, single_workers, group_sz=2):
    idle_worker_groups = set()
    idle_worker_singles = set()
    
    for w, m in enumerate(worker_running_model_group):
        if m == -1:
            idle_worker_singles.add((w, ))
    
    for i in idle_worker_singles:
        for j in idle_worker_singles:
            if i >= j:
                continue
            # if 2 workers belong to the same node, only then group together.
            if worker_on_node[i][0] == worker_on_node[j][0]:
                idle_worker_groups.add((i, j))
    
    # add single GPU nodes to idle_worker_groups
    for w in single_workers:
        if (w,) in idle_worker_singles:
            idle_worker_groups.add((w,))
    
    return idle_worker_groups, idle_worker_singles

In [None]:
def get_runnable_mgw_pair(idle_worker_groups, idle_worker_singles, model_groups, model_group_on_workers, mgw_pairs):    
    runnable_mgw_pair = None
    random.shuffle(model_groups)

    for mgid, mg in enumerate(model_groups):
        # mg is running on some other worker(s)
        if model_group_on_workers[mgid] != None:
            continue

        if len(mg) > 1:
            # large model group
            for wg in idle_worker_groups:
                # check if mg has already been trained on any worker part of wg.
                trained_on_wg = False
                for w in idle_worker_groups:
                    if mgw_pairs[mgid][w] == True:
                        trained_on_wg = True
                        break
                # we found our runnable model_group worker_group pair
                if not trained_on_wg:
                    runnable_mgw_pair = (mgid, wg)
                    return runnable_mgw_pair

        else:
            # small model group
            for wg in idle_worker_singles:
                w = wg[0]
                if mgw_pairs[mgid][w] == False:
                    runnable_mgw_pair = (mgid, wg)
                    return runnable_mgw_pair

    return runnable_mgw_pair

In [148]:
def update_status():
    pass

In [12]:
def scheduler(self, epoch, workers, model_groups, train_partitions, valid_partitions, virtual_worker_on_worker, model_id_to_mst_mapping, model_group_on_worker,
              mgw_pair, exec_id_on_worker, model_groups_nworkers_trained, worker_running_model_group):
        
    n_virtual_workers = len(virtual_worker_on_worker)

#         model_id_ckpt_mapping = self.get_model_on_checkpoint()

    model_groups_to_build = set(range(len(model_groups)))

    controller_logger.info("Beginning model scheduling...")
    model_worker_logger.info("Starting epoch...")

    while (len(model_to_build) > 0):
        for vw in range(n_virtual_workers):
            # get the real worker and gpu associated with virtual worker vw
            worker_id = virtual_worker_on_worker[vw][0]
            gpu_id = virtual_worker_on_worker[vw][1]
            # model_worker_logger.info(str((vw, worker_running_model[vw])))
            if worker_running_model[vw] == -1:
                m = self.get_runnable_model(vw, models_list, model_on_worker, mw_pair)
                if m != -1:
                    is_last_worker = model_nworkers_trained[m] == n_virtual_workers - 1
                    exec_id = self.launch_job(epoch, worker_id, gpu_id, workers[worker_id],
                                              train_partitions[vw],
                                              valid_partitions,
                                              m,
                                              model_id_ckpt_mapping[m],
                                              train_fn_string,
                                              valid_fn_string,
                                              model_id_to_mst_mapping[m],
                                              is_last_worker,
                                              kwargs_str
                                            )
                    model_on_worker[m] = vw
                    worker_running_model[vw] = m
                    exec_id_on_worker[vw] = exec_id

                    self.set_model_on_worker(model_on_worker)
                    self.set_worker_running_model(worker_running_model)
                    self.set_execid_on_worker(exec_id_on_worker)

                    print("Sent model {} to build on worker {} on GPU {} with config {}".format(
                        str(m), str(worker_id), str(gpu_id), str(model_id_to_mst_mapping[m])))
                    model_worker_logger.info("Sent model {} to build on worker {} on GPU {} with config {}".format(
                        str(m), str(worker_id), str(gpu_id), str(model_id_to_mst_mapping[m])))
            else:
                # poll since this particular worker is busy
                m = worker_running_model[vw]
                if m != -1:
                    exec_id = exec_id_on_worker[vw]
                    completed, status = self.check_finished(workers[worker_id], exec_id)

                    if completed:
                        print("Received Model {} built on worker {} on GPU {}".format(str(m), str(worker_id), str(gpu_id)))
                        model_worker_logger.info("Received Model {} built on worker {} on GPU {}".format(str(m), str(worker_id), str(gpu_id)))
                        # models[m].n = status["result"]
                        model_on_worker[m] = -1
                        worker_running_model[vw] = -1
                        exec_id_on_worker[vw] = None
                        model_nworkers_trained[m] += 1
                        mw_pair[m][vw] = True
                        model_done = True
                        for i in range(n_virtual_workers):
                            if not mw_pair[m][i]:
                                model_done = False
                                break
                        if model_done:
                            model_to_build.remove(m)

                self.set_model_on_worker(model_on_worker)
                self.set_worker_running_model(worker_running_model)
                self.set_model_worker_pairs(mw_pair)
                self.set_model_nworkers_trained(model_nworkers_trained)
                self.set_execid_on_worker(exec_id_on_worker)

            # TODO: write out execution order in standard format: and also replay schedule(to replay any given scheduler)
            sleep(1)
    model_worker_logger.info("Ending epoch...")

In [31]:
small_model_runtime = 5
large_model_runtime = 20
nlarge = 11
nsmall = 5

is_large_model = ([True]*nlarge) + ([False]*nsmall)

In [None]:
workers = {0: ["GPU0", "GPU1","GPU2","GPU3"],
           1: ["GPU0", "GPU1", "GPU2"],
           2: ["GPU0", "GPU1"],
           3: ["GPU0", "GPU1"]
        }

In [32]:
param_grid = {
        'learning_rate': [1e-2, 1e-3],
        'embed_size': [256, 512],
        'hidden_size': [256, 512],
        'batch_size': [128, 256]
    }

In [150]:
def grid_search():
    num_epochs = 2
        
    for i in range(num_epochs):
        print("EPOCH: " + str(i+1))
        model_group_num_map, model_group_on_worker, worker_running_model_group, mgw_pairs = init_stuff()
        scheduler(i, model_group_num_map, workers, model_group_on_worker, mgw_pairs, worker_running_model_group)

In [25]:
grid_search()

In [11]:
a = set((1,2,3,4))

In [12]:
b = set((1,2))

In [13]:
a - b

{3, 4}