# Execute async/sync workflows with Radical and Dask execution backends.

**The goal of this notebook is to:**

1. Demonstrate that, in order to use `radical.asyncflow` within Jupyter, you must set the appropriate environment variables.  
2. Showcase the ability to use both synchronous and asynchronous execution within Jupyter.  
3. Highlight the support for different execution backends.  
4. Compare the performance when submitting 5 workflows synchronously versus asynchronously.

> **Note:** Asynchronous execution does not necessarily imply full parallelism. True parallelism ultimately depends on the capabilities and configuration of the execution backend you are using.


### Sync workflows with Dask
We will execute 5 simple workflows in sync approach (sequentially) using Dask Distributed execution backend.

In [1]:
%env FLOW_JUPYTER_ASYNC=FALSE
import time

from radical.asyncflow import WorkflowEngine
from radical.asyncflow import DaskExecutionBackend

backend = DaskExecutionBackend({'n_workers': 2,
                               'threads_per_worker': 1})

flow = WorkflowEngine(backend=backend)

@flow.function_task
def task1(*args):
    return time.time()

@flow.function_task
def task2(*args):
    return time.time()

@flow.function_task
def task3(*args):
    return time.time()



def run_wf(wf_id):

    print(f'Starting workflow {wf_id} at {time.time()}')
    t3 = task3(task1(), task2())
    print(t3.result())
    print (f'Workflow {wf_id} completed at {time.time()}\n')

start_time = time.time()
for i in range(5):
    run_wf(i)
end_time = time.time()

print(f'Total time running synchronously is: {end_time - start_time}')

flow.shutdown()

env: FLOW_JUPYTER_ASYNC=FALSE
Dask backend initialized with dashboard at http://127.0.0.1:8787/status
Starting workflow 0 at 1749510548.2828848
Successfully submitted sync task task.0001
Successfully submitted sync task task.0000
Successfully submitted sync task task.0002
1749510548.8966858
Workflow 0 completed at 1749510548.909121

Starting workflow 1 at 1749510548.9091806
Successfully submitted sync task task.0003
Successfully submitted sync task task.0004
Successfully submitted sync task task.0005
1749510549.8899374
Workflow 1 completed at 1749510549.8928144

Starting workflow 2 at 1749510549.8928246
Successfully submitted sync task task.0007
Successfully submitted sync task task.0006
Successfully submitted sync task task.0008
1749510550.90357
Workflow 2 completed at 1749510550.9149003

Starting workflow 3 at 1749510550.9149365
Successfully submitted sync task task.0009
Successfully submitted sync task task.0010
Successfully submitted sync task task.0011
1749510551.9057448
Workflow 

### Async workflows with Dask
We will execute 5 simple workflows in an async approach (concurrently) using Dask Distributed execution backend.

In [2]:
%env FLOW_JUPYTER_ASYNC=TRUE
import time
import asyncio

from radical.asyncflow import WorkflowEngine
from radical.asyncflow import DaskExecutionBackend

backend = DaskExecutionBackend({'n_workers': 2,
                               'threads_per_worker': 1})

flow = WorkflowEngine(backend=backend)

@flow.function_task
async def task1(*args):
    return time.time()

@flow.function_task
async def task2(*args):
    return time.time()

@flow.function_task
async def task3(*args):
    return time.time()



async def run_wf(wf_id):

    print(f'Starting workflow {wf_id} at {time.time()}')
    t3 = task3(task1(), task2())
    print(await t3)
    print (f'Workflow {wf_id} completed at {time.time()}\n')

start_time = time.time()
results = await asyncio.gather(*[run_wf(i) for i in range(5)])
end_time = time.time()

print(f'Total time running asynchronously is: {end_time - start_time}')

# We are in an async context so we have to use **await**
await flow.shutdown()

env: FLOW_JUPYTER_ASYNC=TRUE
Dask backend initialized with dashboard at http://127.0.0.1:8787/status
Starting workflow 0 at 1749510556.4625962
Starting workflow 1 at 1749510556.4627974
Starting workflow 2 at 1749510556.4629452
Starting workflow 3 at 1749510556.4631262
Starting workflow 4 at 1749510556.463257
Successfully submitted async task task.0015
Successfully submitted async task task.0018
Successfully submitted async task task.0027
Successfully submitted async task task.0016
Successfully submitted async task task.0019
Successfully submitted async task task.0022
Successfully submitted async task task.0021
Successfully submitted async task task.0028
Successfully submitted async task task.0025
Successfully submitted async task task.0024
Successfully submitted async task task.0026
Successfully submitted async task task.0029
Successfully submitted async task task.0023
Successfully submitted async task task.0020
Successfully submitted async task task.0017
1749510557.0891185
Workflow 3 

### Async workflows with RadicalPilot 
We will execute 5 workflows in async approach (in prallel) using execution backend.
Note that, you can use any other backends with the same approach like: ThreadPool, PorcessPool, or DaskParallel

In [3]:
%env FLOW_JUPYTER_ASYNC=TRUE
import time
import asyncio

from radical.asyncflow import WorkflowEngine
from radical.asyncflow import RadicalExecutionBackend


async def main():
    backend = RadicalExecutionBackend({'resource': 'local.localhost'})
    flow = WorkflowEngine(backend=backend)
    
    @flow.executable_task
    async def task1(*args):
        return '/bin/echo "I got executed at" && /bin/date'
    
    @flow.executable_task
    async def task2(*args):
        return '/bin/echo "I got executed at" && /bin/date'
    
    @flow.executable_task
    async def task3(*args):
        return '/bin/echo "I got executed at" && /bin/date'
    
    @flow.executable_task
    async def task4(*args):
        return '/bin/echo "I got executed at" && /bin/date'
    
    @flow.executable_task
    async def task5(*args):
        return '/bin/echo "I got executed at" && /bin/date'
    
    async def run_wf(wf_id):
        print(f'Starting workflow {wf_id} at {time.time()}')
        t1 = task1()
        t2 = task2(t1)
        t3 = task3(t1, t2)
        t4 = task4(t3)

        res = await t4
        if res:
            print(f'task4 from {wf_id} got result {res}')
            t5 = task5()
            print('submitted task5')
            await t5

        print (f'Workflow {wf_id} completed at {time.time()}\n')

    # Run workflows concurrently
    results = await asyncio.gather(*[run_wf(i) for i in range(5)])

    # We are in an async context, so we have to use **await**
    await flow.shutdown()

await main()

env: FLOW_JUPYTER_ASYNC=TRUE
RadicalPilot execution backend started successfully

Starting workflow 0 at 1749510566.4281347
Starting workflow 1 at 1749510566.428254
Starting workflow 2 at 1749510566.4283452
Starting workflow 3 at 1749510566.4284348
Starting workflow 4 at 1749510566.4284985
task4 from 0 got result I got executed at
Mon Jun  9 11:09:41 PM UTC 2025

submitted task5
task4 from 1 got result I got executed at
Mon Jun  9 11:09:41 PM UTC 2025

submitted task5
task4 from 4 got result I got executed at
Mon Jun  9 11:09:41 PM UTC 2025

submitted task5
task4 from 2 got result I got executed at
Mon Jun  9 11:09:41 PM UTC 2025

submitted task5
task4 from 3 got result I got executed at
Mon Jun  9 11:09:41 PM UTC 2025

submitted task5
Workflow 0 completed at 1749510582.55685

Workflow 4 completed at 1749510582.5577853

Workflow 3 completed at 1749510582.5578773

Workflow 2 completed at 1749510582.5579698

Workflow 1 completed at 1749510582.5580425

