In [1]:
import nest_asyncio
nest_asyncio.apply()

In [2]:
import pydra

# functions used later in the notebook:

@pydra.mark.task
def add_two(x):
    return x + 2

@pydra.mark.task
def power(a, n=2):
    return a**n

@pydra.mark.task
def mult_var(a, b):
    return a * b

In [3]:
wf1 = pydra.Workflow(name="wf1", input_spec=["x"], x=3)

In [4]:
wf1.add(add_two(name="sum", x=wf1.lzin.x))

<pydra.engine.core.Workflow at 0x7fc0305fa7d0>

In [5]:
wf1.sum

<pydra.engine.task.FunctionTask at 0x7fbfe0852d90>

In [6]:
wf1.set_output([("out", wf1.sum.lzout.out)])

In [7]:
with pydra.Submitter(plugin="cf") as sub:
    sub(wf1)

wf1.result()

Result(output=Output(out=5), runtime=None, errored=False)

In [8]:
wf2 = pydra.Workflow(name="wf2", input_spec=["x"], x=3)
wf2.add(add_two(name="add_two", x=wf2.lzin.x))
wf2.add(power(name="power", a=wf2.lzin.x))

# setting multiple workflow output
wf2.set_output([("out_s", wf2.add_two.lzout.out),
                ("out_p", wf2.power.lzout.out)
               ])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf2)

wf2.result()

Result(output=Output(out_s=5, out_p=9), runtime=None, errored=False)

In [9]:
wf3 = pydra.Workflow(name="wf3", input_spec=["x"], x=3)
wf3.add(add_two(name="sum", x=wf3.lzin.x))
# by setting a=wf3.sum.lzout.out we create a connection
wf3.add(power(name="power", a=wf3.sum.lzout.out))

wf3.set_output([("out_s", wf3.sum.lzout.out),
                ("out_p", wf3.power.lzout.out)
               ])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf3)

wf3.result()

Result(output=Output(out_s=5, out_p=25), runtime=None, errored=False)

In [10]:
wf3.power.inputs.a

LF('sum', 'out')

In [11]:
wf4 = pydra.Workflow(name="wf4", input_spec=["x"], x=3)
wf4.add(add_two(name="add_two", x=wf4.lzin.x))
wf4.add(power(name="power", a=wf4.lzin.x))
wf4.add(mult_var(name="mult", a=wf4.add_two.lzout.out, b=wf4.power.lzout.out))

wf4.set_output([("out", wf4.mult.lzout.out)])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf4)

wf4.result()

Result(output=Output(out=45), runtime=None, errored=False)

In [12]:
wf2a = pydra.Workflow(name="wf2a", input_spec=["x"])
wf2a.add(add_two(name="add_two", x=wf2a.lzin.x))
wf2a.add(power(name="power", a=wf2a.lzin.x))

wf2a.set_output([("out_s", wf2a.add_two.lzout.out),
                ("out_p", wf2a.power.lzout.out)
               ])


wf5 = pydra.Workflow(name="wf5", input_spec=["x"], x=3)
wf5.add(wf2a)
# connecting wfa to the input from the main workflow
wf2a.inputs.x = wf5.lzin.x
wf5.add(mult_var(name="mult", a=wf5.wf2a.lzout.out_s, b=wf5.wf2a.lzout.out_p))

wf5.set_output([("out", wf5.mult.lzout.out)])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf5)

wf5.result()

Result(output=Output(out=45), runtime=None, errored=False)

In [13]:
wf6 = pydra.Workflow(name="wf6", input_spec=["x"])
# setting a plitter for the entire workflow
wf6.split("x", x=[3, 5])
wf6.add(add_two(name="add_two", x=wf6.lzin.x))
wf6.add(power(name="power", a=wf6.lzin.x))
wf6.add(mult_var(name="mult", a=wf6.add_two.lzout.out, b=wf6.power.lzout.out))

wf6.set_output([("wf_out", wf6.mult.lzout.out)])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf6)

wf6.result()

[Result(output=Output(wf_out=45), runtime=None, errored=False),
 Result(output=Output(wf_out=175), runtime=None, errored=False)]

In [14]:
wf7 = pydra.Workflow(name="wf7", input_spec=["x", "y"])
wf7.split(["x", "y"], x=[3, 5], y=[2, 3])
wf7.add(add_two(name="sum", x=wf7.lzin.x))
wf7.add(power(name="power", a=wf7.lzin.y))
wf7.add(mult_var(name="mult", a=wf7.sum.lzout.out, b=wf7.power.lzout.out))

wf7.set_output([("out", wf7.mult.lzout.out)])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf7)

wf7.result()

[Result(output=Output(out=20), runtime=None, errored=False),
 Result(output=Output(out=45), runtime=None, errored=False),
 Result(output=Output(out=28), runtime=None, errored=False),
 Result(output=Output(out=63), runtime=None, errored=False)]

In [15]:
wf7.combine("x")

with pydra.Submitter(plugin="cf") as sub:
    sub(wf7)

wf7.result()

[[Result(output=Output(out=20), runtime=None, errored=False),
  Result(output=Output(out=28), runtime=None, errored=False)],
 [Result(output=Output(out=45), runtime=None, errored=False),
  Result(output=Output(out=63), runtime=None, errored=False)]]

In [16]:
@pydra.mark.task
def mean(x_list):
    return sum(x_list)/len(x_list)

wf8 = pydra.Workflow(name="wf8", input_spec=["x"], x=[3, 5, 7])
wf8.add(mean(name="mean", x_list=wf8.lzin.x))
# adding a task that has its own splitter
wf8.add(power(name="power", a=wf8.lzin.x).split("a"))

wf8.set_output([("out_m", wf8.mean.lzout.out),
                ("out_p", wf8.power.lzout.out)])

with pydra.Submitter(plugin="cf") as sub:
    sub(wf8)

wf8.result()

Result(output=Output(out_m=5.0, out_p=[9, 25, 49]), runtime=None, errored=False)