In [1]:
%env FLOW_JUPYTER_ASYNC=FALSE
from radical.flow import WorkflowEngine, ResourceEngine, Task

engine = ResourceEngine({'resource': 'local.localhost'})
flow = WorkflowEngine(engine=engine)

@flow
def task1(*args):
    return Task(executable='/bin/echo $RP_TASK_NAME')

@flow
def task2(*args):
    return Task(executable='/bin/echo $RP_TASK_NAME')

@flow
def task3(*args):
    return Task(executable='/bin/echo $RP_TASK_NAME')

def run_wf(wf_id):

    print(f'Starting workflow {wf_id}')
    
    t3 = task3(task1(), task2())
    print(t3.result())

    print(f'Workflow {wf_id} completed')

for i in range(5):
    run_wf(i)

engine.shutdown()

env: FLOW_JUPYTER_ASYNC=FALSE
Resource Engine started successfully

Loop found, this is Jupyter SYNC mode
New loop is created
Event-Loop is set successfully
Starting workflow 0
Registered task: 'task1' with id of task.0000
Registered task: 'task2' with id of task.0001
Registered task: 'task3' with id of task.0002
Ready to submit: task1 with resolved dependencies: []
Ready to submit: task2 with resolved dependencies: []
Submitting ['task1', 'task2'] for execution
task.0000 is DONE
task.0001 is DONE
Ready to submit: task3 with resolved dependencies: ['task1', 'task2']
Submitting ['task3'] for execution
task.0002 is DONE
task3

Workflow 0 completed
Starting workflow 1
Registered task: 'task1' with id of task.0003
Registered task: 'task2' with id of task.0004
Registered task: 'task3' with id of task.0005
Ready to submit: task1 with resolved dependencies: []
Ready to submit: task2 with resolved dependencies: []
Submitting ['task1', 'task2'] for execution
task.0003 is DONE
task.0004 is DONE


In [2]:
%env FLOW_JUPYTER_ASYNC=TRUE
import asyncio
from radical.flow import WorkflowEngine, ResourceEngine, Task
import time
async def main():
    # Create engine and workflow
    engine = ResourceEngine({'resource': 'local.localhost'})
    flow = WorkflowEngine(engine=engine)

    @flow
    async def task1(*args):
        return Task(executable='/bin/echo "I got executed at" && /bin/date') 
    
    @flow
    async def task2(*args):
        return Task(executable='/bin/echo "I got executed at" && /bin/date') 
    
    @flow
    async def task3(*args):
        return Task(executable='/bin/echo "I got executed at" && /bin/date') 
    
    @flow
    async def task4(*args):
        return Task(executable='/bin/echo "I got executed at" && /bin/date') 
    
    @flow
    async def task5(*args):
        return Task(executable='/bin/echo "I got executed at" && /bin/date') 
    
    async def run_wf(wf_id):
        print(f'\nStarting workflow {wf_id} at {time.time()}')
        t1 = task1()
        t2 = task2(t1)
        t3 = task3(t1, t2)
        t4 = task4(t3)

        if await t4:
            t5 = task5() 
            await t5

        return f'Workflow {wf_id} completed at {time.time()}'

    # Run workflows concurrently
    results = await asyncio.gather(*[run_wf(i) for i in range(2)])

    for result in results:
        print(result)

    engine.shutdown()


await main()

env: FLOW_JUPYTER_ASYNC=TRUE
Resource Engine started successfully

Loop found, this is Jupyter ASYNC mode
Existing loop is re-used
Event-Loop is set successfully

Starting workflow 0 at 1744239572.1532664

Starting workflow 1 at 1744239572.1533566
Registered task: 'task1' with id of task.0015
Registered task: 'task2' with id of task.0016
Registered task: 'task3' with id of task.0017
Registered task: 'task4' with id of task.0018
Registered task: 'task1' with id of task.0019
Registered task: 'task2' with id of task.0020
Registered task: 'task3' with id of task.0021
Registered task: 'task4' with id of task.0022
Ready to submit: task1 with resolved dependencies: []
Ready to submit: task1 with resolved dependencies: []
Submitting ['task1', 'task1'] for execution
task.0015 is DONE
task.0019 is DONE
Ready to submit: task2 with resolved dependencies: ['task1']
Ready to submit: task2 with resolved dependencies: ['task1']
Submitting ['task2', 'task2'] for execution
task.0020 is DONE
task.0016 is