In [None]:
from datalife import get_critical_path, remove_cpath_from_graph, caterpillar_tree, find_caterpillar_forest

import networkx as nx
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import numpy as np

G = nx.DiGraph(
    [
        ("a", "b"),
        ('d', 'e'),        
        ('g','h'),
        ('b','e'),
        ('b','c'),
        ('e','f'),
        ('e','i'),
        ('h','i')
    ]
)

pos= { 'a': [0,0],
     'b': [1,0],
     'c': [2,0],
     'd':[0,-1],
      'e': [1,-1],
     'f':[2,-1],
     'g':[0,-2],
     'h':[1,-2],
     'i':[2,-2]}
fig, ax = plt.subplots()
nx.draw_networkx(G, pos=pos, ax=ax)
fig.tight_layout()
plt.show()

In [None]:
default_weight = 1
for n in G.edges():
    G.add_edge(n[0], n[1], weight=1)

G.add_weighted_edges_from([('d','e',2),('e','f',3)])

In [None]:
cpath = get_critical_path(G)
cpath

In [None]:
ct = caterpillar_tree(G, cpath)

In [None]:
fig, ax = plt.subplots()
nx.draw_networkx(ct, pos=pos, ax=ax)
fig.tight_layout()
plt.show()

In [None]:
removed_g = remove_cpath_from_graph(G, cpath)

In [None]:
fig, ax = plt.subplots()
nx.draw_networkx(removed_g, pos=pos, ax=ax)
fig.tight_layout()
plt.show()

In [None]:
# find_caterpillar_forest in code
def find_caterpillar_forest(G):
    CT_s = []
    dependent_edges = []
    cf = nx.DiGraph()
    while(G.nodes or G.edges):
        # find a critical path
        cpath = get_critical_path(G)
        # extract the CT along the critical path
        CT_c = caterpillar_tree(G, cpath)
        # Remove the vertices and edges only along the critical path
        removed_g = remove_cpath_from_graph(G, cpath)        
        # (find dependencies across CT_s) for each of the vertices,
        # v_c on the critical path of the current caterpillar tree, CT_c
        for ct_i in CT_s:
            for node in CT_c.nodes:
                # if there is an edge between v_p and v_c, 
                # add a dependency edge between CT_c  and ct_i
                o_edges = ct_i.out_edges(node)
                i_edges = ct_i.in_edges(node)
                tmp = list(o_edges) + list(i_edges)
                dependent_edges += tmp
                if len(tmp) > 0:
                    if(len(o_edges) > 0):
                        cf.add_edge(CT_c, ct_i)
                    if(len(i_edges) > 0):
                        cf.add_edge(ct_i, CT_c)
                else:
                    cf.add_node(CT_c)
                    cf.add_node(ct_i)
        dependent_edges = list(set(dependent_edges))
        CT_s.append(CT_c)
        G = removed_g
    return (cf, dependent_edges)

In [None]:
cf, dependent_edges = find_caterpillar_forest(G)
cf, dependent_edges

In [None]:
[x.nodes for x in [x for x in cf.nodes]]

In [None]:
fig = plt.figure(figsize=(16,4))

gs = fig.add_gridspec(3,2)
ax={}
i=0
ax[i] = fig.add_subplot(gs[0, 0])
i+=1
ax[i] = fig.add_subplot(gs[1, 0])
i+=1
ax[i] = fig.add_subplot(gs[2, 0])
i+=1
ax[i] = fig.add_subplot(gs[:, 1])

i = 0
ax[i].set_title('Caterpillar Forest')
for n in cf.nodes:
    nx.draw_networkx(n, pos=pos, ax=ax[i])
    i+=1
    #cf.add_node(n, pos=(i,-i))
    cf.add_node(n, pos=(i,-i))

_pos = {}
for x in cf.nodes(data=True):
    _pos[x[0]] = list(x[1]['pos'])

ax[i].set_title('Shadow Graph')
nx.draw_networkx(cf, pos=_pos, with_labels=True, ax=ax[i], node_size=1000, node_color="skyblue", node_shape="s", alpha=0.5, linewidths=10)
fig.tight_layout()
plt.show()

In [None]:
for n in cf.nodes:
    i+=1
    #cf.add_node(n, pos=(i,-i))
    cf.add_node(n, pos=(0,-i))

_pos = {}
for x in cf.nodes(data=True):
    _pos[x[0]] = list(x[1]['pos'])

    
lvl_ = {}
level = 0
for snode in cf.nodes:
    lvl_[level] = snode
    level += 1
    
#root 1 
lvl0_copy = lvl_[0].copy()
lvl1_copy = lvl_[1].copy()
lvl2_copy = lvl_[2].copy()

cf.add_edge(lvl0_copy, lvl1_copy)
cf.add_edge(lvl1_copy, lvl_[2])

_pos[lvl0_copy] = [_pos[lvl_[0]][0] + 1, _pos[lvl_[0]][1]]
_pos[lvl1_copy] = [_pos[lvl_[1]][0] + 1, _pos[lvl_[1]][1]] 

# root 2
lvl0_copy2 = lvl_[0].copy()
lvl1_copy2 = lvl_[1].copy()
#lvl2_copy2 = lvl_[2].copy()

cf.add_edge(lvl0_copy2, lvl1_copy2)
cf.add_edge(lvl0_copy2, lvl1_copy)

_pos[lvl0_copy2] = [_pos[lvl_[0]][0] + 2, _pos[lvl_[0]][1]] 
_pos[lvl1_copy2] = [_pos[lvl_[1]][0] + 2, _pos[lvl_[1]][1]] 

In [None]:
fig, ax = plt.subplots()
ax.set_title('Extended Shadow Graph \n(with multiple root nodes)')
nx.draw_networkx(cf, with_labels=False, pos=_pos, node_size=1000, node_color="skyblue", node_shape="s", alpha=0.5, linewidths=10)
plt.text(1.2, 0.9, 'level 0', horizontalalignment='center',
     verticalalignment='center', transform=ax.transAxes)
plt.text(1.2, 0.5, 'level 1', horizontalalignment='center',
     verticalalignment='center', transform=ax.transAxes)
plt.text(1.2, 0.1, 'level 2', horizontalalignment='center',
     verticalalignment='center', transform=ax.transAxes)

### how many CTs depend on the current CT

In [None]:
def get_roots(G):
    roots = []
    for ct_node in G.nodes:
        pred_iter = G.predecessors(ct_node)
        pred_l = [x for x in pred_iter]
        if len(pred_l) == 0:
            roots.append(ct_node)
    return roots

In [None]:
roots = get_roots(cf)
roots

In [None]:
def get_dependent_ct_count(G, ct_list):
    res = {}
    for ct in ct_list:
        succ_iter = G.successors(ct)
        succ_l = [x for x in succ_iter]
        res[ct] = len(succ_l)
    return res

In [None]:
cts_with_cnt = get_dependent_ct_count(cf, roots)
idx = 0
for root in roots:
    print("root [{}] has dependent ct: {}".format(idx, cts_with_cnt[roots[idx]]))
    idx += 1

### Within a CT, how many tasks can be executed before waiting for a dependency to be fulfilled (no of hops before first stall)

In [None]:
def get_first_stalls(G, d_edges):
    d_edges_flattened = []
    for edge in d_edges:
        d_edges_flattened += edge
    d_edges_flattened = list(set(d_edges_flattened))
    found_node = None
    for node in d_edges_flattened:
        tmp = G.edges(node)
        if len(tmp) > 0:
            found_node = node
            break
    first_stalls = []            
    if found_node is not None:
        tmp = [found_node]
        while (tmp):
            curr = tmp.pop()
            pred_iter = G.predecessors(curr)
            pred_l = [x for x in pred_iter]
            if len(pred_l) > 0:
                tmp = pred_l
                first_stalls += pred_l
    return first_stalls

In [None]:
res = []
for root in roots:
    root_first_stalls = get_first_stalls(root, dependent_edges)
    res.append((root, len(root_first_stalls)))

In [None]:
idx = 0
for r in res:
    print ('first stall count of root {}: {}'.format(idx, r[1]))
    idx += 1

### Sum of the edge “metric” along the current CT edges (for example data volume)

In [None]:
def get_edge_attributes(G, metric='value'):
    return [x[2][metric] for x in G.edges(data=True)]
get_edge_attributes(roots[1], 'value')

### Associate vertex priority to each of the vertices in the shadow graph

In [None]:
def find_vertex_priority(cforest, G_list, dependent_edges, metric='value', op='sum'):
    cts_cnt = get_dependent_ct_count(cforest, G_list)
    res = {}
    for G in G_list:
        first_stalls = get_first_stalls(G, dependent_edges)
        attrs = get_edge_attributes(G, metric)
        func = getattr(np, op)
        mvalue = func(attrs)
        res[G] = (cts_cnt[G], len(first_stalls), mvalue)
    return res

In [None]:
vertex_priorities = find_vertex_priority(cf, roots, dependent_edges)
vertex_priorities

### vertices within the same level will be prioritized for resource allocation based on their priority

In [None]:
{k: v for k, v in sorted(vertex_priorities.items(), key=lambda item: item[1], reverse=True)}

### `current_queue` with all root nodes at beginning (q[0] from slide 6)

In [None]:
current_queue = [(k, v) for k, v in sorted(vertex_priorities.items(), key=lambda item: item[1], reverse=True)]

### Default resource set for task node, i.e., data volume and rate (as node attribute)

In [None]:
default_resource_set = {'data_size_local': 1024*1024, 'data_size_global': 1024*1024, 
                          'data_rate_local':3000, 'data_rate_global': 3000}

In [None]:
pd.DataFrame.from_dict(default_resource_set, orient='index')

Units are bytes  for `data_size` and byte/second for `data_rate`.

### Check how much resource is needed to schedule CT v, `check_resource_req`

In [None]:
def check_resource_req(G, resource_grp = default_resource_set):
    '''return total sum value of concurrent vertex resource attributes'''
    resource_max = resource_grp.copy()
    for ct_node in G.nodes:
        for n in nx.neighbors(G, ct_node):
            resource_tmp = {k: 0 for k, v in resource_grp.items()}
            for k, v in resource_grp.items():
                if k in G.nodes[n]:
                    resource_tmp[k] += G.nodes[n][k]
                else:
                    resource_tmp[k] += resource_grp[k] # default val
        for k, v in resource_grp.items():
            resource_max[k] = max(resource_max[k], resource_tmp[k])
    return resource_max     

In [None]:
import random
def rand_resource(G, resource_grp = {'data_size': 1024*1024, 'data_rate':2000}):
    resource_res = {}
    for ct_node in G.nodes:
        resource_tmp = {k: 0 for k, v in resource_grp.items()}
        for k, v in resource_grp.items():
            resource_tmp[k] = random.randint(1, int(v))
        resource_res[ct_node] = resource_tmp
    nx.set_node_attributes(G, resource_res)
#    print(resource_res)
    return G

In [None]:
def rand_exec_time(G, exec_time_grp = {'exec_time_list': [60, 61, 62, 63, 64], #second, 
                                       'upper_bound': 2, # addition,
                                       'lower_bound': 1 # deduction
                                      }):
    time_res = {}
    for ct_node in G.nodes:
        time_tmp = {}
        for k, v in exec_time_grp.items():
            if isinstance(v, list):
                min_v = min(v)
                max_v = max(v)
                len_v = len(v)
                rnd_list = [random.randint(int(min_v), int(max_v)) for x in range(len_v)]
                time_tmp[k] = rnd_list
            else:
                time_tmp[k] = random.randint(1, int(v))
        time_res[ct_node] = time_tmp
    nx.set_node_attributes(G, time_res)
    return G

### Compute resource pool (example of node 1, 2 and 3)

In [None]:
import pandas as pd

res_dict = {'node_1': [1024*1024*100, 1024*1024*10, 3000, 2000], 'node_2': [1024*1024*10, 1024*1024*5, 4000, 3000],
            'node_3': [1024*1024*50, 1024*1024*80, 2000, 3000]}
res_df = pd.DataFrame.from_dict(res_dict, orient='index',
                       columns=['data_size_local', 'data_size_global', 'data_rate_local', 'data_rate_global'])
res_capacity = res_df.copy()
res_df

## Request resource function rf

In [None]:
def get_resource_pool():
    return res_df

def update_resource_pool(df, rtype, rdf):
    for i in range(len(rdf)):
        rdata = rdf.iloc[i]
        if rtype == 'acquisition':
            for k, v in rdata.items():
                df.loc[rdata.name][k] -= rdata[k]
        elif rtype == 'release':
            for k, v in rdata.items():
                df.loc[rdata.name][k] += rdata[k]
    return df

def request_resource_func(df, rgroup):
    """
    Arguments:
        rgroup (dict): required resources
    Returns:
        res (DataFrame): available resource (row)
    """
    res = df
    for k, v in rgroup.items():
        res = res.loc[df[k] >= v]
    return res#res.iloc[0]

In [None]:
requested_rsize = {'data_size_local': 1024*1024*2, 'data_size_global': 1024*1024*1, 
                   'data_rate_local':1000, 'data_rate_global':1000}
columns = requested_rsize.keys()
values = list(requested_rsize.values())
requested_size = pd.DataFrame.from_dict(data={'node_2': values}, columns=columns, orient='index')
update_resource_pool(res_df, 'acquisition', requested_size)

In [None]:
update_resource_pool(res_df, 'release', requested_size)

In [None]:
def get_expected_exec_time(task_attr):
    median = np.median(task_attr['exec_time_list'])
    upper_b = task_attr['upper_bound']
    return median + upper_b
    
def spawn_ct_tasks(ct_node, rdf):
    res = []
    rdf_ = rdf.copy()
    row_cnt = len(rdf_)
    i = 0
    for task_node, task_attr in ct_node.nodes(data=True):
        rdf__ = rdf_
        for k, v in task_attr.items():
            if k in rdf_:
                rdf__ = rdf__.loc[rdf_[k] >= v]
        if len(rdf__) > 0:
            rdata = rdf__.iloc[0]
            resource_nodename = rdata.name
            exec_time = get_expected_exec_time(task_attr)
            task_scheduled = (resource_nodename, task_node, exec_time)
            for k, v in rdata.items():
                rdf__.loc[rdata.name][k] -= rdata[k]
            res.append(task_scheduled)
            i += 1
    return res

In [None]:
def bfs_level_sync(G, current_queue):
    while current_queue:
        for ct_node in current_queue:
            required_resources = check_resource_req(ct_node)
            #print(required_resources)
            res_df = get_resource_pool()
            res_avail = request_resource_func(res_df, required_resources)
            #print(res_avail)
            if len(res_avail) > 0:
                res_df_updated = update_resource_pool(res_df, 'acquisition', res_avail)
                # task assignment
                task_assigned = spawn_ct_tasks(ct_node, res_avail)
                header = "compute_nodename, task_nodename, execution_time (s)\n"
                print(header + \
                      '-' * len(header))
                print(task_assigned)
                #print(res_df_updated)
                res_df_updated = update_resource_pool(res_df, 'release', res_avail)
                #print(res_df_updated)
            break
        break

In [None]:
for x in current_queue:
    rand_resource(x[0])
    rand_exec_time(x[0])
bfs_level_sync(G, [x[0] for x in current_queue])

In [None]:
res_df = get_resource_pool()
res_df

In [None]:
required_resources = check_resource_req(roots[0])
res_avail = request_resource_func(res_df, required_resources)
required_resources, res_avail