In [1]:
import nest_asyncio

nest_asyncio.apply()

import pydra

In [2]:

@pydra.mark.task
def add_var(a, b) ->{'sum_a_b': int} :
    return a + b

task1 = add_var()

task1.inputs.a = 3
task1.inputs.b = 6

task1()

Result(output=Output(sum_a_b=9), runtime=None, errored=False)


## create workflows using pydra

First we will define the functions used within the workflows


In [3]:
@pydra.mark.task
def add_var(a, b):
    return a+b

@pydra.mark.task
def power(a, n=2):
    return a**n

@pydra.mark.task
def mult_var(a,b):
    return a*b

@pydra.mark.task
def add_two(a):
    return a+2


Now, we define some workflows to get a feeling of how to connect the different task within a workflow

In [4]:
wf1 = pydra.Workflow(name= 'wf1', input_spec=['x','y'], x=3,y=4)

wf1.add(add_var(name='sum_x_y', a=wf1.lzin.x, b=wf1.lzin.y))

wf1.set_output([('out', wf1.sum_x_y.lzout.out)])

with pydra.Submitter(plugin='cf') as sub:
    sub(wf1)


wf1.result()



Result(output=Output(out=7), runtime=None, errored=False)

Now, that we have created a simple worflow lets do something slightly more complex and interconnect different task within a workflow(output of one task should be the input of another task)

In [5]:
wf2=pydra.Workflow(name= 'wf2', input_spec=['x','y'], x=3,y=4)  
# Adding the two numbers
wf2.add(add_var(name='sum_x_y', a=wf2.lzin.x, b=wf2.lzin.y))
# Multiply the two numbers
wf2.add(mult_var(name='mult_x_y', a=wf2.lzin.x, b=wf2.lzin.y))
# take the values of the above operations and feed both of them into the power function using a splitter
#wf2.add(power(name='power', a=[wf2.sum_x_y.lzout.sum_a_b, wf2.mult_x_y.lzout.out]).split('a'))
wf2.add(power(name='power', a= wf2.sum_x_y.lzout.out))
# adding two to the given output
wf2.add(add_two(name='add_two', a=wf2.mult_x_y.lzout.out))

wf2.set_output([('out_p', wf2.power.lzout.out),('out_other', wf2.add_two.lzout.out)])


with pydra.Submitter(plugin='cf') as sub:
    sub(wf2)


wf2.result()



Result(output=Output(out_p=49, out_other=14), runtime=None, errored=False)

### create workflows containing shell commands

In [None]:
cmd = 'pwd'
shellCommand = pydra.ShellCommandTask(name= 'printwd', executable=cmd)

with pydra.Submitter(plugin='cf') as sub:
    sub(shellCommand)


shellCommand.result()
# produces weird output for working directory, Check and issue on Github!

In [6]:
cmd =['echo','hail','pydra'] 
shellCommand = pydra.ShellCommandTask(name= 'printwd', executable=cmd)
print('cmdline =', shellCommand.cmdline)

with pydra.Submitter(plugin='cf') as sub:
    sub(shellCommand)


shellCommand.result()

cmdline = echo hail pydra


Result(output=Output(return_code=0, stdout='hail pydra\n', stderr=''), runtime=None, errored=False)