In [1]:
from importlib import import_module

In [2]:
from inspect import isfunction

In [3]:
from jobflow import job, Flow
from jobflow.managers.local import run_locally

In [4]:
def get_kwargs(lst):
    return {t['targetHandle']: {'source': t['source'], 'sourceHandle': t['sourceHandle']} for t in lst}

In [5]:
def group_edges(edges_lst):
    # edges_sorted_lst = sorted(edges_lst, key=lambda x: x['target'], reverse=True)     
    total_dict = {}
    tmp_lst = []
    target_id = edges_lst[0]['target'] 
    for ed in edges_lst:
        if target_id == ed["target"]:
            tmp_lst.append(ed)
        else:
            total_dict[target_id] = get_kwargs(lst=tmp_lst)
            target_id = ed["target"]
            tmp_lst = [ed]
    total_dict[target_id] = get_kwargs(lst=tmp_lst)
    return total_dict

In [6]:
def get_input_dict(nodes_dict):
    return {k:v for k, v in nodes_dict.items() if not isfunction(v)}

In [7]:
def get_workflow(nodes_dict, input_dict, total_dict):
    memory_dict = {}
    for k, v in nodes_dict.items():
        if isfunction(v):
            fn = job(v)
            kwargs = {
                kw: input_dict[vw['source']] if vw['source'] in input_dict else getattr(memory_dict[vw['source']], "output") 
                for kw, vw in total_dict[k].items()
            }
            memory_dict[k] = fn(**kwargs)
    return list(memory_dict.values())

In [8]:
def get_source_handles(edges_lst):
    source_handle_dict = {}
    for ed in edges_lst: 
        if ed['source'] not in source_handle_dict.keys():
            source_handle_dict[ed['source']] = [ed['sourceHandle']]
        else:
            source_handle_dict[ed['source']].append(ed['sourceHandle'])
    return source_handle_dict

In [9]:
def remove_source_handles(nodes_dict, edges_lst):
    nodes_updated_dict = {}
    translate_dict = {}
    i = 0
    for k, v in nodes_dict.items():
        if not isfunction(v):
            nodes_updated_dict[i] = v
            translate_dict[k] = i
            i +=1

    edges_updated_lst = []
    i = len(nodes_updated_dict)
    source_handle_dict = get_source_handles(edges_lst=edges_lst)
    for k, v in nodes_dict.items():
        if k not in translate_dict.keys():
            edges_to_be_updated_dict = {}
            for ed in edges_lst:
                if ed["target"] == k and ed['sourceHandle'] is None:
                    edges_updated_lst.append({'target': i, 'targetHandle': ed['targetHandle'], 'source': translate_dict[ed['source']], 'sourceHandle': None})
                elif ed["target"] == k and ed['sourceHandle'] is not None:
                    nodes_updated_dict[i] = ed['sourceHandle']
                    source_handle_index = i
                    i += 1
                    new_item = source_handle_dict[ed['source']]
                    if new_item not in nodes_updated_dict.values():
                        nodes_updated_dict[i] = new_item
                        output_label_index = i
                        i += 1
                    else:
                        for kn, vn in nodes_updated_dict.items():
                            if vn == new_item:
                                output_label_index = kn
                    nodes_updated_dict[i] = get_item_from_tuple
                    edges_updated_lst.append({'target': i, 'targetHandle': 'input_obj', 'source': translate_dict[ed['source']], 'sourceHandle': None})
                    edges_updated_lst.append({'target': i, 'targetHandle': 'index', 'source': source_handle_index, 'sourceHandle': None})
                    edges_updated_lst.append({'target': i, 'targetHandle': 'index_lst', 'source': output_label_index, 'sourceHandle': None})
                    edges_to_be_updated_dict[i] = ed
                    i += 1
            nodes_updated_dict[i] = v
            translate_dict[k] = i
            for k, v in edges_to_be_updated_dict.items():
                edges_updated_lst.append({'target': i, 'targetHandle': v['targetHandle'], 'source': k, 'sourceHandle': None})
            i += 1

    return nodes_updated_dict, edges_updated_lst

In [10]:
def universal_reduce(nodes_dict, edges_lst):
    edges_with_source_handles_lst = [ed for ed in edges_lst if ed['sourceHandle'] is not None]
    if len(edges_with_source_handles_lst) > 0:
        return remove_source_handles(nodes_dict=nodes_dict, edges_lst=edges_lst)
    else:
        return nodes_dict, edges_lst

In [11]:
def get_item_from_tuple(input_obj, index, index_lst):
    if isinstance(input_obj, dict):  
        return input_obj[index]
    else:  # input_obj is a tuple
        return list(input_obj)[index_lst.index(index)]

In [12]:
def add_xy(x, y):
    return x + y, x, y

In [13]:
def add_xyz(x, y, z):
    return x + y + z

In [14]:
nodes_dict = {
    0: add_xy,
    1: add_xyz,
    2: 1,
    3: 2,
}

In [15]:
edges_lst = [
    {'target': 1, 'targetHandle': 'x', 'source': 0, 'sourceHandle': 'x'},
    {'target': 1, 'targetHandle': 'y', 'source': 0, 'sourceHandle': 'y'},
    {'target': 1, 'targetHandle': 'z', 'source': 0, 'sourceHandle': 'z'},
    {'target': 0, 'targetHandle': 'x', 'source': 2, 'sourceHandle': None},
    {'target': 0, 'targetHandle': 'y', 'source': 3, 'sourceHandle': None},
]

In [16]:
nodes_updated_dict, edges_updated_lst = universal_reduce(nodes_dict=nodes_dict, edges_lst=edges_lst)
nodes_updated_dict, edges_updated_lst

({0: 1,
  1: 2,
  2: <function __main__.add_xy(x, y)>,
  3: 'x',
  4: ['x', 'y', 'z'],
  5: <function __main__.get_item_from_tuple(input_obj, index, index_lst)>,
  6: 'y',
  7: <function __main__.get_item_from_tuple(input_obj, index, index_lst)>,
  8: 'z',
  9: <function __main__.get_item_from_tuple(input_obj, index, index_lst)>,
  10: <function __main__.add_xyz(x, y, z)>},
 [{'target': 2, 'targetHandle': 'x', 'source': 0, 'sourceHandle': None},
  {'target': 2, 'targetHandle': 'y', 'source': 1, 'sourceHandle': None},
  {'target': 5,
   'targetHandle': 'input_obj',
   'source': 2,
   'sourceHandle': None},
  {'target': 5, 'targetHandle': 'index', 'source': 3, 'sourceHandle': None},
  {'target': 5,
   'targetHandle': 'index_lst',
   'source': 4,
   'sourceHandle': None},
  {'target': 7,
   'targetHandle': 'input_obj',
   'source': 2,
   'sourceHandle': None},
  {'target': 7, 'targetHandle': 'index', 'source': 6, 'sourceHandle': None},
  {'target': 7,
   'targetHandle': 'index_lst',
   's

In [17]:
total_dict = group_edges(edges_lst=edges_updated_lst)
input_dict = get_input_dict(nodes_dict=nodes_updated_dict)

In [18]:
task_lst = get_workflow(
    nodes_dict=nodes_updated_dict, 
    input_dict=input_dict, 
    total_dict=total_dict,
)

In [19]:
flow = Flow(task_lst)

In [20]:
result = run_locally(flow)
result

2024-11-22 13:18:36,513 INFO Started executing jobs locally
2024-11-22 13:18:36,981 INFO Starting job - add_xy (a761851e-a5da-40a7-ba98-6339fcb0d8a3)
2024-11-22 13:18:36,982 INFO Finished job - add_xy (a761851e-a5da-40a7-ba98-6339fcb0d8a3)
2024-11-22 13:18:36,983 INFO Starting job - get_item_from_tuple (74437106-cdfb-4173-a490-8e7988e003d8)
2024-11-22 13:18:36,985 INFO Finished job - get_item_from_tuple (74437106-cdfb-4173-a490-8e7988e003d8)
2024-11-22 13:18:36,987 INFO Starting job - get_item_from_tuple (970150ce-d6b8-4788-a299-45b1912eb488)
2024-11-22 13:18:36,989 INFO Finished job - get_item_from_tuple (970150ce-d6b8-4788-a299-45b1912eb488)
2024-11-22 13:18:36,990 INFO Starting job - get_item_from_tuple (445e7f92-360a-42a4-823b-edee632d909e)
2024-11-22 13:18:36,993 INFO Finished job - get_item_from_tuple (445e7f92-360a-42a4-823b-edee632d909e)
2024-11-22 13:18:36,994 INFO Starting job - add_xyz (2b506807-75a7-41c0-9f2c-86de74449e0a)
2024-11-22 13:18:36,998 INFO Finished job - add_xyz

{'a761851e-a5da-40a7-ba98-6339fcb0d8a3': {1: Response(output=(3, 1, 2), detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/home/jan/notebooks/2024/2024-11-21-python-workflow-definition'))},
 '74437106-cdfb-4173-a490-8e7988e003d8': {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/home/jan/notebooks/2024/2024-11-21-python-workflow-definition'))},
 '970150ce-d6b8-4788-a299-45b1912eb488': {1: Response(output=1, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/home/jan/notebooks/2024/2024-11-21-python-workflow-definition'))},
 '445e7f92-360a-42a4-823b-edee632d909e': {1: Response(output=2, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/home/jan/notebooks/2024/2024-11-21-python-workflow-