Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ JSON file:
"5": 2
},
"edges": [
{"tn": 0, "th": "x", "sn": 1, "sh": "x"},
{"tn": 1, "th": "x", "sn": 4, "sh": null},
{"tn": 1, "th": "y", "sn": 5, "sh": null},
{"tn": 0, "th": "y", "sn": 2, "sh": "y"},
{"tn": 2, "th": "x", "sn": 4, "sh": null},
{"tn": 2, "th": "y", "sn": 5, "sh": null},
{"tn": 0, "th": "z", "sn": 3, "sh": "z"},
{"tn": 3, "th": "x", "sn": 4, "sh": null},
{"tn": 3, "th": "y", "sn": 5, "sh": null}
{"target": 0, "targetPort": "x", "source": 1, "sourcePort": "x"},
{"target": 1, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 1, "targetPort": "y", "source": 5, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 2, "sourcePort": "y"},
{"target": 2, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 2, "targetPort": "y", "source": 5, "sourcePort": null},
{"target": 0, "targetPort": "z", "source": 3, "sourcePort": "z"},
{"target": 3, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 3, "targetPort": "y", "source": 5, "sourcePort": null}
]
}
```
The abbreviations in the definition of the edges are:
* `tn` - target node
* `th` - target handle - for a node with multiple input parameters the target handle specifies which input parameter to use.
* `sn` - source node
* `sh` - source handle - for a node with multiple output parameters the source handle specifies which output parameter to use.
* `target` - target node
* `targetPort` - target port - for a node with multiple input parameters the target port specifies which input parameter to use.
* `source` - source node
* `sourcePort` - source port - for a node with multiple output parameters the source port specifies which output parameter to use.

As the workflow does not require any additional resources, as it is only using built-in functionality of the Python standard
library.
Expand Down
26 changes: 13 additions & 13 deletions book/simple.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ JSON file:
"5": 2
},
"edges": [
{"tn": 0, "th": "x", "sn": 1, "sh": "x"},
{"tn": 1, "th": "x", "sn": 4, "sh": null},
{"tn": 1, "th": "y", "sn": 5, "sh": null},
{"tn": 0, "th": "y", "sn": 2, "sh": "y"},
{"tn": 2, "th": "x", "sn": 4, "sh": null},
{"tn": 2, "th": "y", "sn": 5, "sh": null},
{"tn": 0, "th": "z", "sn": 3, "sh": "z"},
{"tn": 3, "th": "x", "sn": 4, "sh": null},
{"tn": 3, "th": "y", "sn": 5, "sh": null}
{"target": 0, "targetPort": "x", "source": 1, "sourcePort": "x"},
{"target": 1, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 1, "targetPort": "y", "source": 5, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 2, "sourcePort": "y"},
{"target": 2, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 2, "targetPort": "y", "source": 5, "sourcePort": null},
{"target": 0, "targetPort": "z", "source": 3, "sourcePort": "z"},
{"target": 3, "targetPort": "x", "source": 4, "sourcePort": null},
{"target": 3, "targetPort": "y", "source": 5, "sourcePort": null}
]
}
```
The abbreviations in the definition of the edges are:
* `tn` - target node
* `th` - target handle - for a node with multiple input parameters the target handle specifies which input parameter to use.
* `sn` - source node
* `sh` - source handle - for a node with multiple output parameters the source handle specifies which output parameter to use.
* `target` - target node
* `targetPort` - target port - for a node with multiple input parameters the target port specifies which input parameter to use.
* `source` - source node
* `sourcePort` - source port - for a node with multiple output parameters the source port specifies which output parameter to use.

As the workflow does not require any additional resources, as it is only using built-in functionality of the Python standard
library.
40 changes: 20 additions & 20 deletions python_workflow_definition/src/python_workflow_definition/aiida.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,32 @@ def load_workflow_json(file_name):

# add links
for link in data["edges"]:
to_task = task_name_mapping[str(link["tn"])]
to_task = task_name_mapping[str(link["target"])]
# if the input is not exit, it means we pass the data into to the kwargs
# in this case, we add the input socket
if link["th"] not in to_task.inputs:
to_socket = to_task.add_input( "workgraph.any", name=link["th"])
if link["targetPort"] not in to_task.inputs:
to_socket = to_task.add_input( "workgraph.any", name=link["targetPort"])
else:
to_socket = to_task.inputs[link["th"]]
from_task = task_name_mapping[str(link["sn"])]
to_socket = to_task.inputs[link["targetPort"]]
from_task = task_name_mapping[str(link["source"])]
if isinstance(from_task, orm.Data):
to_socket.value = from_task
else:
try:
if link["sh"] is None:
link["sh"] = "result"
if link["sourcePort"] is None:
link["sourcePort"] = "result"
# because we are not define the outputs explicitly during the pythonjob creation
# we add it here, and assume the output exit
if link["sh"] not in from_task.outputs:
# if str(link["sh"]) not in from_task.outputs:
if link["sourcePort"] not in from_task.outputs:
# if str(link["sourcePort"]) not in from_task.outputs:
from_socket = from_task.add_output(
"workgraph.any",
name=link["sh"],
# name=str(link["sh"]),
name=link["sourcePort"],
# name=str(link["sourcePort"]),
metadata={"is_function_output": True},
)
else:
from_socket = from_task.outputs[link["sh"]]
from_socket = from_task.outputs[link["sourcePort"]]

wg.add_link(from_socket, to_socket)
except Exception as e:
Expand Down Expand Up @@ -84,10 +84,10 @@ def write_workflow_json(wg, file_name):
# if the from socket is the default result, we set it to None
if link_data["from_socket"] == "result":
link_data["from_socket"] = None
link_data["tn"] = node_name_mapping[link_data.pop("to_node")]
link_data["th"] = link_data.pop("to_socket")
link_data["sn"] = node_name_mapping[link_data.pop("from_node")]
link_data["sh"] = link_data.pop("from_socket")
link_data["target"] = node_name_mapping[link_data.pop("to_node")]
link_data["targetPort"] = link_data.pop("to_socket")
link_data["source"] = node_name_mapping[link_data.pop("from_node")]
link_data["sourcePort"] = link_data.pop("from_socket")
data["edges"].append(link_data)

for node in wg.tasks:
Expand All @@ -112,10 +112,10 @@ def write_workflow_json(wg, file_name):
else:
input_node_name = data_node_name_mapping[input.value.uuid]
data["edges"].append({
"tn": node_name_mapping[node.name],
"th": input._name,
"sn": input_node_name,
"sh": None
"target": node_name_mapping[node.name],
"targetPort": input._name,
"source": input_node_name,
"sourcePort": None
})
with open(file_name, "w") as f:
# json.dump({"nodes": data[], "edges": edges_new_lst}, f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_item(obj, key):


def _get_value(result_dict, nodes_new_dict, link_dict, exe):
source, source_handle = link_dict["sn"], link_dict["sh"]
source, source_handle = link_dict["source"], link_dict["sourcePort"]
if source in result_dict.keys():
result = result_dict[source]
elif source in nodes_new_dict.keys():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ def _get_nodes_dict(function_dict):

def _get_edge_from_dict(target, key, value_dict, nodes_mapping_dict):
if len(value_dict['attributes']) == 1:
return {'tn': target, 'th': key, "sn": nodes_mapping_dict[value_dict['uuid']], 'sh': value_dict['attributes'][0][1]}
return {"target": target, "targetPort": key, "source": nodes_mapping_dict[value_dict["uuid"]], "sourcePort": value_dict["attributes"][0][1]}
else:
return {'tn': target, 'th': key, "sn": nodes_mapping_dict[value_dict['uuid']], 'sh': None}
return {"target": target, "targetPort": key, "source": nodes_mapping_dict[value_dict["uuid"]], "sourcePort": None}


def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
edges_lst = []
for job in flow_dict['jobs']:
for k, v in job['function_kwargs'].items():
if isinstance(v, dict) and '@module' in v and '@class' in v and '@version' in v:
for job in flow_dict["jobs"]:
for k, v in job["function_kwargs"].items():
if isinstance(v, dict) and "@module" in v and "@class" in v and "@version" in v:
edges_lst.append(_get_edge_from_dict(
target=nodes_mapping_dict[job["uuid"]],
key=k,
value_dict=v,
nodes_mapping_dict=nodes_mapping_dict,
))
elif isinstance(v, dict) and any([isinstance(el, dict) and '@module' in el and '@class' in el and '@version' in el for el in v.values()]):
elif isinstance(v, dict) and any([isinstance(el, dict) and "@module" in el and "@class" in el and "@version" in el for el in v.values()]):
node_dict_index = len(nodes_dict)
nodes_dict[node_dict_index] = get_dict
for kt, vt in v.items():
if isinstance(vt, dict) and '@module' in vt and '@class' in vt and '@version' in vt:
if isinstance(vt, dict) and "@module" in vt and "@class" in vt and "@version" in vt:
edges_lst.append(_get_edge_from_dict(
target=node_dict_index,
key=kt,
Expand All @@ -59,13 +59,13 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
nodes_dict[node_index] = vt
else:
node_index = {str(tv): tk for tk, tv in nodes_dict.items()}[str(vt)]
edges_lst.append({'tn': node_dict_index, 'th': kt, "sn": node_index, 'sh': None})
edges_lst.append({'tn': nodes_mapping_dict[job["uuid"]], 'th': k, "sn": node_dict_index, 'sh': None})
elif isinstance(v, list) and any([isinstance(el, dict) and '@module' in el and '@class' in el and '@version' in el for el in v]):
edges_lst.append({"target": node_dict_index, "targetPort": kt, "source": node_index, "sourcePort": None})
edges_lst.append({"target": nodes_mapping_dict[job["uuid"]], "targetPort": k, "source": node_dict_index, "sourcePort": None})
elif isinstance(v, list) and any([isinstance(el, dict) and "@module" in el and "@class" in el and "@version" in el for el in v]):
node_list_index = len(nodes_dict)
nodes_dict[node_list_index] = get_list
for kt, vt in enumerate(v):
if isinstance(vt, dict) and '@module' in vt and '@class' in vt and '@version' in vt:
if isinstance(vt, dict) and "@module" in vt and "@class" in vt and "@version" in vt:
edges_lst.append(_get_edge_from_dict(
target=node_list_index,
key=str(kt),
Expand All @@ -78,15 +78,15 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
nodes_dict[node_index] = vt
else:
node_index = {str(tv): tk for tk, tv in nodes_dict.items()}[str(vt)]
edges_lst.append({'tn': node_list_index, 'th': kt, "sn": node_index, 'sh': None})
edges_lst.append({'tn': nodes_mapping_dict[job["uuid"]], 'th': k, "sn": node_list_index, 'sh': None})
edges_lst.append({"target": node_list_index, "targetPort": kt, "source": node_index, "sourcePort": None})
edges_lst.append({"target": nodes_mapping_dict[job["uuid"]], "targetPort": k, "source": node_list_index, "sourcePort": None})
else:
if v not in nodes_dict.values():
node_index = len(nodes_dict)
nodes_dict[node_index] = v
else:
node_index = {tv: tk for tk, tv in nodes_dict.items()}[v]
edges_lst.append({'tn': nodes_mapping_dict[job["uuid"]], 'th': k, "sn": node_index, 'sh': None})
edges_lst.append({"target": nodes_mapping_dict[job["uuid"]], "targetPort": k, "source": node_index, "sourcePort": None})
return edges_lst, nodes_dict


Expand All @@ -99,7 +99,7 @@ def _resort_total_lst(total_dict, nodes_dict):
for ind in sorted(total_dict.keys()):
connect = total_dict[ind]
if ind not in ordered_lst:
source_lst = [sd["sn"] for sd in connect.values()]
source_lst = [sd["source"] for sd in connect.values()]
if all([s in ordered_lst or s in nodes_without_dep_lst for s in source_lst]):
ordered_lst.append(ind)
total_new_dict[ind] = connect
Expand All @@ -109,11 +109,11 @@ def _resort_total_lst(total_dict, nodes_dict):
def _group_edges(edges_lst):
total_dict = {}
for ed_major in edges_lst:
target_id = ed_major["tn"]
target_id = ed_major["target"]
tmp_lst = []
if target_id not in total_dict.keys():
for ed in edges_lst:
if target_id == ed["tn"]:
if target_id == ed["target"]:
tmp_lst.append(ed)
total_dict[target_id] = get_kwargs(lst=tmp_lst)
return total_dict
Expand All @@ -139,8 +139,8 @@ def get_attr_helper(obj, source_handle):
else:
fn = job(method=v)
kwargs = {
kw: input_dict[vw['sn']] if vw['sn'] in input_dict else get_attr_helper(
obj=memory_dict[vw['sn']], source_handle=vw['sh'])
kw: input_dict[vw["source"]] if vw["source"] in input_dict else get_attr_helper(
obj=memory_dict[vw["source"]], source_handle=vw["sourcePort"])
for kw, vw in total_dict[k].items()
}
memory_dict[k] = fn(**kwargs)
Expand All @@ -160,15 +160,15 @@ def load_workflow_json(file_name):

edges_new_lst = []
for edge in content["edges"]:
if edge['sh'] is None:
if edge["sourcePort"] is None:
edges_new_lst.append(edge)
else:
edges_new_lst.append(
{
'tn': edge['tn'],
'th': edge['th'],
'sn': edge['sn'],
'sh': str(edge['sh']),
"target": edge["target"],
"targetPort": edge["targetPort"],
"source": edge["source"],
"sourcePort": str(edge["sourcePort"]),
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ def resort_total_lst(total_lst, nodes_dict):
while len(total_new_lst) < len(total_lst):
for ind, connect in total_lst:
if ind not in ordered_lst:
source_lst = [sd["sn"] for sd in connect.values()]
source_lst = [sd["source"] for sd in connect.values()]
if all([s in ordered_lst or s in nodes_without_dep_lst for s in source_lst]):
ordered_lst.append(ind)
total_new_lst.append([ind, connect])
return total_new_lst


def group_edges(edges_lst):
edges_sorted_lst = sorted(edges_lst, key=lambda x: x["tn"], reverse=True)
edges_sorted_lst = sorted(edges_lst, key=lambda x: x["target"], reverse=True)
total_lst, tmp_lst = [], []
target_id = edges_sorted_lst[0]["tn"]
target_id = edges_sorted_lst[0]["target"]
for ed in edges_sorted_lst:
if target_id == ed["tn"]:
if target_id == ed["target"]:
tmp_lst.append(ed)
else:
total_lst.append((target_id, get_kwargs(lst=tmp_lst)))
target_id = ed["tn"]
target_id = ed["target"]
tmp_lst = [ed]
total_lst.append((target_id, get_kwargs(lst=tmp_lst)))
return total_lst


def _get_value(result_dict, nodes_new_dict, link_dict):
source, source_handle = link_dict["sn"], link_dict["sh"]
source, source_handle = link_dict["source"], link_dict["sourcePort"]
if source in result_dict.keys():
result = result_dict[source]
elif source in nodes_new_dict.keys():
Expand Down
Loading
Loading