Skip to content

Commit 42ae595

Browse files
committed
add more complicated example
1 parent 64de8b6 commit 42ae595

File tree

6 files changed

+171
-30
lines changed

6 files changed

+171
-30
lines changed

example_workflows/nested/get_jobflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from python_workflow_definition.jobflow import load_workflow_json
22

3-
flow=load_workflow_json("main.pwd.json")
3+
flow=load_workflow_json("main2.pwd.json")
44

55
from jobflow import run_locally
66
flow.draw_graph(figsize=(3, 3)).show()

example_workflows/nested/jobflow.ipynb

Lines changed: 59 additions & 4 deletions
Large diffs are not rendered by default.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"version": "0.1.1",
3+
"nodes": [
4+
{ "id": 0, "value": 3, "type": "input", "name": "a" },
5+
{ "id": 1, "value": 2, "type": "input", "name": "b" },
6+
{ "id": 2, "value": 4, "type": "input", "name": "c" },
7+
{ "id": 3, "type": "function", "value": "workflow.get_prod_and_div" },
8+
{ "id": 4, "type": "workflow", "value": "prod_div.json" },
9+
{ "id": 5, "type": "function", "value": "workflow.get_sum" },
10+
{ "id": 6, "type": "output", "name": "final_result" }
11+
],
12+
"edges": [
13+
{ "target": 3, "targetPort": "x", "source": 0, "sourcePort": null },
14+
{ "target": 3, "targetPort": "y", "source": 2, "sourcePort": null },
15+
{ "target": 4, "targetPort": "x", "source": 3, "sourcePort": "prod" },
16+
{ "target": 4, "targetPort": "y", "source": 3, "sourcePort": "div" },
17+
{ "target": 5, "targetPort": "x", "source": 4, "sourcePort": "result" },
18+
{ "target": 5, "targetPort": "y", "source": 1, "sourcePort": null },
19+
{ "target": 6, "targetPort": null, "source": 5, "sourcePort": null }
20+
]
21+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"version": "0.1.1",
3+
"nodes": [
4+
{ "id": 0, "type": "workflow", "value": "prod_div.json" },
5+
{ "id": 1, "value": 4, "type": "input", "name": "a" },
6+
{ "id": 2, "value": 3, "type": "input", "name": "b" },
7+
{ "id": 3, "type": "workflow", "value": "prod.json" },
8+
{ "id": 4, "type": "output", "name": "final_result" }
9+
],
10+
"edges": [
11+
{ "target": 0, "targetPort": "x", "source": 1, "sourcePort": null },
12+
{ "target": 0, "targetPort": "y", "source": 2, "sourcePort": null },
13+
{ "target": 3, "targetPort": "x", "source": 0, "sourcePort": "result" },
14+
{ "target": 4, "targetPort": "null", "source": 3, "sourcePort": "result" }
15+
]
16+
}

example_workflows/nested/prod.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"version": "0.1.1",
3+
"nodes": [
4+
{ "id": 0, "type": "function", "value": "workflow.get_square" },
5+
{ "id": 1, "type": "input", "value": 1, "name": "x" },
6+
{ "id": 2, "type": "output", "name": "result" }
7+
],
8+
"edges": [
9+
{ "target": 0, "targetPort": "x", "source": 1, "sourcePort": null },
10+
{ "target": 2, "targetPort": null, "source": 1, "sourcePort": null }
11+
]
12+
}

src/python_workflow_definition/jobflow.py

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -235,21 +235,33 @@ def _get_workflow(
235235
nodes_dict: dict, input_dict: dict, total_dict: dict, source_handles_dict: dict
236236
) -> list:
237237
def get_attr_helper(obj, source_handle):
238-
if source_handle is None:
239-
return getattr(obj, "output")
238+
print("here2")
239+
print(obj)
240+
if type(obj) is not list:
241+
if source_handle is None:
242+
return getattr(obj, "output")
243+
else:
244+
return getattr(getattr(obj, "output"), source_handle)
240245
else:
241-
return getattr(getattr(obj, "output"), source_handle)
246+
if source_handle is None:
247+
return getattr(obj[-1], "output")
248+
else:
249+
return getattr(getattr(obj[-1], "output"), source_handle)
242250

243251
memory_dict = {}
244-
print(total_dict)
252+
253+
output_reference=None
245254
for k in total_dict.keys():
246255
v = nodes_dict[k]
247256
if type(v) is list:
257+
# i need to export the uuid and output references
258+
# from workflows and jobs
248259
fn=v
249260
job1_dict=v[0].as_dict()
250261
uuid=job1_dict["uuid"]
251262
mod = import_module(job1_dict["function"]["@module"])
252263
method=getattr(mod, job1_dict["function"]["@callable"])
264+
253265
if k in source_handles_dict.keys():
254266
new_job1 = job(
255267
method=method,
@@ -258,39 +270,64 @@ def get_attr_helper(obj, source_handle):
258270
)
259271
else:
260272
new_job1 = job(method=method, uuid=uuid)
261-
kwargs = {
262-
kw: (
263-
input_dict[vw[SOURCE_LABEL]]
264-
if vw[SOURCE_LABEL] in input_dict
265-
else get_attr_helper(
266-
obj=memory_dict[vw[SOURCE_LABEL]],
267-
source_handle=vw[SOURCE_PORT_LABEL],
268-
)
269-
)
270-
for kw, vw in total_dict[k].items()
271-
}
273+
kwargs = {}
274+
275+
for kw, vw in total_dict[k].items():
276+
if output_reference is not None:
277+
# not sure this works?
278+
kwargs[kw] = output_reference
279+
output_reference = None
280+
else:
281+
if vw[SOURCE_LABEL] in input_dict:
282+
kwargs[kw] = input_dict[vw[SOURCE_LABEL]]
283+
else:
284+
kwargs[kw] = get_attr_helper(
285+
obj=memory_dict[vw[SOURCE_LABEL]],
286+
source_handle=vw[SOURCE_PORT_LABEL],
287+
)
288+
output_reference=v[-1].output
289+
290+
272291
memory_stuff = new_job1(**kwargs)
292+
273293
fn = [memory_stuff]+v[1:]
294+
274295
memory_dict[k] =Flow(fn)
296+
275297
if isfunction(v):
298+
#if output_reference is None:
299+
300+
# output_reference=None
301+
276302
if k in source_handles_dict.keys():
277303
fn = job(
278304
method=v,
279305
data=[el for el in source_handles_dict[k] if el is not None],
280306
)
281307
else:
282308
fn = job(method=v)
283-
kwargs = {
284-
kw: (
285-
input_dict[vw[SOURCE_LABEL]]
286-
if vw[SOURCE_LABEL] in input_dict
287-
else get_attr_helper(
309+
310+
kwargs={}
311+
312+
for kw, vw in total_dict[k].items():
313+
if output_reference is not None:
314+
# not sure this works?
315+
kwargs[kw] = output_reference
316+
output_reference=None
317+
else:
318+
if vw[SOURCE_LABEL] in input_dict:
319+
kwargs[kw]=input_dict[vw[SOURCE_LABEL]]
320+
else:
321+
kwargs[kw]= get_attr_helper(
288322
obj=memory_dict[vw[SOURCE_LABEL]],
289323
source_handle=vw[SOURCE_PORT_LABEL],
290-
)
291-
)
292-
for kw, vw in total_dict[k].items()
293-
}
324+
)
325+
326+
print("here")
327+
print(kwargs)
328+
329+
330+
294331
memory_dict[k] = fn(**kwargs)
295332

296333
return list(memory_dict.values())
@@ -312,7 +349,7 @@ def load_workflow_json(file_name: str) -> Flow:
312349
source_handles_dict=source_handles_dict,
313350
)
314351
print(task_lst)
315-
return Flow(task_lst)
352+
return Flow(task_lst, output=task_lst[-1].output)
316353

317354

318355
def recursive_load_workflow_json(file_name: str) -> list:

0 commit comments

Comments
 (0)