Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,28 @@ def get_prod_and_div(x: float, y: float) -> dict:
```
These two Python functions are combined in the following example workflow:
```python
tmp_dict = get_prod_and_div(x=1, y=2)
result = get_sum(x=tmp_dict["prod"], y=tmp_dict["div"])
def combined_workflow(x=1, y=2):
tmp_dict = get_prod_and_div(x=x, y=y)
return get_sum(x=tmp_dict["prod"], y=tmp_dict["div"])
```
For the workflow representation of these Python functions the Python functions are stored in the [example_workflows/arithmetic/workflow.py](example_workflows/arithmetic/workflow.py)
Python module. The connection of the Python functions are stored in the [example_workflows/arithmetic/workflow.json](example_workflows/arithmetic/workflow.json)
JSON file:
```
{
"nodes": [
{"id": 0, "function": "simple_workflow.get_prod_and_div"},
{"id": 1, "function": "simple_workflow.get_sum"},
{"id": 2, "value": 1},
{"id": 3, "value": 2}
{"id": 0, "type": "function", "value": "workflow.get_prod_and_div"},
{"id": 1, "type": "function", "value": "workflow.get_sum"},
{"id": 2, "type": "input", "value": 1, "name": "x"},
{"id": 3, "type": "input", "value": 2, "name": "y"},
{"id": 4, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "x", "source": 2, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 3, "sourcePort": null},
{"target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod"},
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"}
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"},
{"target": 4, "targetPort": null, "source": 1, "sourcePort": null}
]
}
```
Expand Down
17 changes: 10 additions & 7 deletions documentation/arithmetic.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,28 @@ def get_prod_and_div(x: float, y: float) -> dict:
```
These two Python functions are combined in the following example workflow:
```python
tmp_dict = get_prod_and_div(x=1, y=2)
result = get_sum(x=tmp_dict["prod"], y=tmp_dict["div"])
def combined_workflow(x=1, y=2):
tmp_dict = get_prod_and_div(x=x, y=y)
return get_sum(x=tmp_dict["prod"], y=tmp_dict["div"])
```
For the workflow representation of these Python functions the Python functions are stored in the [workflow.py](example_workflows/arithmetic/workflow.py)
Python module. The connection of the Python functions are stored in the [workflow.json](example_workflows/arithmetic/workflow.json)
JSON file:
```
{
"nodes": [
{"id": 0, "function": "workflow.get_prod_and_div"},
{"id": 1, "function": "workflow.get_sum"},
{"id": 2, "value": 1},
{"id": 3, "value": 2}
{"id": 0, "type": "function", "value": "workflow.get_prod_and_div"},
{"id": 1, "type": "function", "value": "workflow.get_sum"},
{"id": 2, "type": "input", "value": 1, "name": "x"},
{"id": 3, "type": "input", "value": 2, "name": "y"},
{"id": 4, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "x", "source": 2, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 3, "sourcePort": null},
{"target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod"},
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"}
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"},
{"target": 4, "targetPort": null, "source": 1, "sourcePort": null}
]
}
```
Expand Down
6 changes: 4 additions & 2 deletions example_workflows/arithmetic/workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
{"id": 0, "type": "function", "value": "workflow.get_prod_and_div"},
{"id": 1, "type": "function", "value": "workflow.get_sum"},
{"id": 2, "type": "input", "value": 1, "name": "x"},
{"id": 3, "type": "input", "value": 2, "name": "y"}
{"id": 3, "type": "input", "value": 2, "name": "y"},
{"id": 4, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "x", "source": 2, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 3, "sourcePort": null},
{"target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod"},
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"}
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"},
{"target": 4, "targetPort": null, "source": 1, "sourcePort": null}
]
}
6 changes: 4 additions & 2 deletions example_workflows/nfdi/workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{"id": 3, "type": "function", "value": "workflow.plot_over_line"},
{"id": 4, "type": "function", "value": "workflow.substitute_macros"},
{"id": 5, "type": "function", "value": "workflow.compile_paper"},
{"id": 6, "type": "input", "value": 2.0, "name": "domain_size"}
{"id": 6, "type": "input", "value": 2.0, "name": "domain_size"},
{"id": 7, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "domain_size", "source": 6, "sourcePort": null},
Expand All @@ -19,6 +20,7 @@
{"target": 4, "targetPort": "ndofs", "source": 2, "sourcePort": "numdofs"},
{"target": 4, "targetPort": "domain_size", "source": 6, "sourcePort": null},
{"target": 5, "targetPort": "macros_tex", "source": 4, "sourcePort": null},
{"target": 5, "targetPort": "plot_file", "source": 3, "sourcePort": null}
{"target": 5, "targetPort": "plot_file", "source": 3, "sourcePort": null},
{"target": 7, "targetPort": null, "source": 5, "sourcePort": null}
]
}
6 changes: 4 additions & 2 deletions example_workflows/quantum_espresso/workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
{"id": 28, "type": "input", "value": "strain_4", "name": "working_directory_5"},
{"id": 29, "type": "function", "value": "python_workflow_definition.shared.get_dict"},
{"id": 30, "type": "function", "value": "python_workflow_definition.shared.get_list"},
{"id": 31, "type": "function", "value": "python_workflow_definition.shared.get_list"}
{"id": 31, "type": "function", "value": "python_workflow_definition.shared.get_list"},
{"id": 32, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "element", "source": 9, "sourcePort": null},
Expand Down Expand Up @@ -92,6 +93,7 @@
{"target": 31, "targetPort": "2", "source": 5, "sourcePort": "energy"},
{"target": 31, "targetPort": "3", "source": 6, "sourcePort": "energy"},
{"target": 31, "targetPort": "4", "source": 7, "sourcePort": "energy"},
{"target": 8, "targetPort": "energy_lst", "source": 31, "sourcePort": null}
{"target": 8, "targetPort": "energy_lst", "source": 31, "sourcePort": null},
{"target": 32, "targetPort": null, "source": 8, "sourcePort": null}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from python_workflow_definition.shared import (
convert_nodes_list_to_dict,
update_node_names,
remove_result,
set_result_node,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
Expand All @@ -21,7 +23,7 @@

def load_workflow_json(file_name: str) -> WorkGraph:
with open(file_name) as f:
data = json.load(f)
data = remove_result(workflow_dict=json.load(f))

wg = WorkGraph()
task_name_mapping = {}
Expand Down Expand Up @@ -136,6 +138,10 @@ def write_workflow_json(wg: WorkGraph, file_name: str) -> dict:
)
with open(file_name, "w") as f:
# json.dump({"nodes": data[], "edges": edges_new_lst}, f)
json.dump(update_node_names(content=data), f, indent=2)
json.dump(
set_result_node(workflow_dict=update_node_names(workflow_dict=data)),
f,
indent=2,
)

return data
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_kwargs,
get_source_handles,
convert_nodes_list_to_dict,
remove_result,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
Expand Down Expand Up @@ -38,7 +39,7 @@ def _get_value(result_dict: dict, nodes_new_dict: dict, link_dict: dict, exe: Ex

def load_workflow_json(file_name: str, exe: Executor):
with open(file_name, "r") as f:
content = json.load(f)
content = remove_result(workflow_dict=json.load(f))

edges_new_lst = content[EDGES_LABEL]
nodes_new_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
get_source_handles,
update_node_names,
convert_nodes_list_to_dict,
remove_result,
set_result_node,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
Expand Down Expand Up @@ -271,7 +273,7 @@ def _get_item_from_tuple(input_obj, index, index_lst):

def load_workflow_json(file_name: str) -> Flow:
with open(file_name, "r") as f:
content = json.load(f)
content = remove_result(workflow_dict=json.load(f))

edges_new_lst = []
for edge in content[EDGES_LABEL]:
Expand Down Expand Up @@ -332,8 +334,10 @@ def write_workflow_json(flow: Flow, file_name: str = "workflow.json"):

with open(file_name, "w") as f:
json.dump(
update_node_names(
content={NODES_LABEL: nodes_store_lst, EDGES_LABEL: edges_lst}
set_result_node(
workflow_dict=update_node_names(
workflow_dict={NODES_LABEL: nodes_store_lst, EDGES_LABEL: edges_lst}
)
),
f,
indent=2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def plot(file_name: str):
k + "=result[" + v[SOURCE_PORT_LABEL] + "]"
)
for k, v in edge_label_dict.items():
graph.add_edge(str(k), str(target_node), label=", ".join(v))
if len(v) == 1 and v[0] is not None:
graph.add_edge(str(k), str(target_node), label=", ".join(v))
else:
graph.add_edge(str(k), str(target_node))

svg = nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")
display(SVG(svg))
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
get_kwargs,
get_source_handles,
convert_nodes_list_to_dict,
remove_result,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
Expand Down Expand Up @@ -67,7 +68,7 @@ def _get_value(result_dict: dict, nodes_new_dict: dict, link_dict: dict):

def load_workflow_json(file_name: str):
with open(file_name, "r") as f:
content = json.load(f)
content = remove_result(workflow_dict=json.load(f))

edges_new_lst = content[EDGES_LABEL]
nodes_new_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
get_source_handles,
convert_nodes_list_to_dict,
update_node_names,
remove_result,
set_result_node,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
Expand Down Expand Up @@ -229,7 +231,7 @@ def load_workflow_json(file_name: str, project: Optional[Project] = None):
project = Project(".")

with open(file_name, "r") as f:
content = json.load(f)
content = remove_result(workflow_dict=json.load(f))

edges_new_lst = content[EDGES_LABEL]
nodes_new_dict = {}
Expand Down Expand Up @@ -293,8 +295,13 @@ def write_workflow_json(

with open(file_name, "w") as f:
json.dump(
update_node_names(
content={NODES_LABEL: nodes_store_lst, EDGES_LABEL: edges_new_lst}
set_result_node(
workflow_dict=update_node_names(
workflow_dict={
NODES_LABEL: nodes_store_lst,
EDGES_LABEL: edges_new_lst,
}
)
),
f,
indent=2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,20 @@ def get_source_handles(edges_lst: list) -> dict:

def convert_nodes_list_to_dict(nodes_list: list) -> dict:
return {
str(el["id"]): el["value"] for el in sorted(nodes_list, key=lambda d: d["id"])
str(el["id"]): el["value"] if "value" in el else el["name"]
for el in sorted(nodes_list, key=lambda d: d["id"])
}


def update_node_names(content: dict) -> dict:
def update_node_names(workflow_dict: dict) -> dict:
node_names_final_dict = {}
input_nodes = [n for n in content[NODES_LABEL] if n["type"] == "input"]
input_nodes = [n for n in workflow_dict[NODES_LABEL] if n["type"] == "input"]
node_names_dict = {
n["id"]: list(
set(
[
e[TARGET_PORT_LABEL]
for e in content[EDGES_LABEL]
for e in workflow_dict[EDGES_LABEL]
if e[SOURCE_LABEL] == n["id"]
]
)
Expand All @@ -71,7 +72,44 @@ def update_node_names(content: dict) -> dict:
else:
node_names_final_dict[k] = v

for n in content[NODES_LABEL]:
for n in workflow_dict[NODES_LABEL]:
if n["type"] == "input":
n["name"] = node_names_final_dict[n["id"]]
return content
return workflow_dict


def set_result_node(workflow_dict):
node_id_lst = [n["id"] for n in workflow_dict[NODES_LABEL]]
source_lst = list(set([e[SOURCE_LABEL] for e in workflow_dict[EDGES_LABEL]]))

end_node_lst = []
for ni in node_id_lst:
if ni not in source_lst:
end_node_lst.append(ni)

node_id = len(workflow_dict[NODES_LABEL])
workflow_dict[NODES_LABEL].append(
{"id": node_id, "type": "output", "name": "result"}
)
workflow_dict[EDGES_LABEL].append(
{
TARGET_LABEL: node_id,
TARGET_PORT_LABEL: None,
SOURCE_LABEL: end_node_lst[0],
SOURCE_PORT_LABEL: None,
}
)

return workflow_dict


def remove_result(workflow_dict):
node_output_id = [
n["id"] for n in workflow_dict[NODES_LABEL] if n["type"] == "output"
][0]
return {
NODES_LABEL: [n for n in workflow_dict[NODES_LABEL] if n["type"] != "output"],
EDGES_LABEL: [
e for e in workflow_dict[EDGES_LABEL] if e[TARGET_LABEL] != node_output_id
],
}
Loading