# Creating dynamic flows

In this tutorial, you will:

- Learn how to create dynamic workflows.
- Understand the `detour`, `addition`, and `replace` options in the `Response` object.

The ability create dynamic workflows (i.e. jobs or workflows that launch other jobs or workflows) is
a particularly powerful usage pattern in Jobflow.


## The `Response(replace)` option


The main mechanism for creating dynamic jobs in Jobflow is through the `Response` object. We will demonstrate this below for a toy example where we:

1. Generate a list of numbers whose length is only determined at runtime.
2. Perform a toy operation on each number in the list.

While this is a trivial example, a similar usage is common in computational materials science (e.g. you might perform a calculation on a bulk structure, carve all possible surface slabs, and then perform a calculation on each slab). What makes this dynamic is that the number of jobs is only determined at runtime.


In [2]:
import warnings

warnings.filterwarnings("ignore", "Using `tqdm.autonotebook.tqdm`")

In [3]:
from random import randint
from jobflow import job, Flow, Response
from jobflow.managers.local import run_locally

@job
def make_list(a):
    return [a] * randint(2, 5)

@job
def add(a, b):
    return a + b

@job
def add_distributed(list_a):
    jobs = []
    for val in list_a:
        jobs.append(add(val, 1))
    
    flow = Flow(jobs)
    return Response(replace=flow)

job1 = make_list(2)
job2 = add_distributed(job1.output)
flow = Flow([job1, job2])

responses = run_locally(flow)

2023-11-23 23:46:57,177 INFO Started executing jobs locally
2023-11-23 23:46:57,179 INFO Starting job - make_list (16fc2fd0-c9c7-420d-b0e1-3e6b5802c744)
2023-11-23 23:46:57,182 INFO Finished job - make_list (16fc2fd0-c9c7-420d-b0e1-3e6b5802c744)
2023-11-23 23:46:57,183 INFO Starting job - add_distributed (536e3c78-7d59-43ed-9f4f-72ce9b3793ef)
2023-11-23 23:46:57,191 INFO Finished job - add_distributed (536e3c78-7d59-43ed-9f4f-72ce9b3793ef)
2023-11-23 23:46:57,193 INFO Starting job - add (44007968-8394-442f-ba96-d52dc30a80ce)
2023-11-23 23:46:57,195 INFO Finished job - add (44007968-8394-442f-ba96-d52dc30a80ce)
2023-11-23 23:46:57,196 INFO Starting job - add (1af57ed7-3594-4180-b550-787c5f029d9f)
2023-11-23 23:46:57,198 INFO Finished job - add (1af57ed7-3594-4180-b550-787c5f029d9f)
2023-11-23 23:46:57,199 INFO Starting job - add (f7ea9158-40f8-4439-9d18-b6b6fb3e9289)
2023-11-23 23:46:57,201 INFO Finished job - add (f7ea9158-40f8-4439-9d18-b6b6fb3e9289)
2023-11-23 23:46:57,202 INFO Start



In [4]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")

16fc2fd0-c9c7-420d-b0e1-3e6b5802c744 -> {1: Response(output=[2, 2, 2, 2], detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
536e3c78-7d59-43ed-9f4f-72ce9b3793ef -> {1: Response(output=None, detour=None, addition=None, replace=Flow(name='Flow', uuid='1d51c5df-174a-41c8-b017-98494c8411f1')
1. Job(name='add', uuid='44007968-8394-442f-ba96-d52dc30a80ce')
2. Job(name='add', uuid='1af57ed7-3594-4180-b550-787c5f029d9f')
3. Job(name='add', uuid='f7ea9158-40f8-4439-9d18-b6b6fb3e9289')
4. Job(name='add', uuid='72c752d3-208a-45ce-bbb6-625e4debed73'), stored_data=None, stop_children=False, stop_jobflow=False)}
44007968-8394-442f-ba96-d52dc30a80ce -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
1af57ed7-3594-4180-b550-787c5f029d9f -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
f7ea9158-40f

As seen above, there are several jobs that were run --- certainly more than the two we started with. The first job generates a list of 2's with a random length. The second job in the flow is what launches a job on each entry in the list. It is replaced by one job for each entry, hence it has no direct output. Then each newly generated job is run.


## The `Response(addition)` option


Beyond replacing a job with downstream jobs, there is also the option to add jobs to the current flow on-the-fly.

Here we will create a simple flow that:

1. Adds a value to a given number.
2. If the output is less than 10, do the addition again. Otherwise, stop.


In [5]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(addition=add(a, b))
    
job1 = add(1, 2)
job2 = add_with_logic(job1.output, 2)
flow = Flow([job1, job2])

responses = run_locally(flow)

2023-11-23 23:46:57,406 INFO Started executing jobs locally
2023-11-23 23:46:57,408 INFO Starting job - add (34ba86a3-99cd-461e-bc17-511fdf6e08ec)
2023-11-23 23:46:57,411 INFO Finished job - add (34ba86a3-99cd-461e-bc17-511fdf6e08ec)
2023-11-23 23:46:57,412 INFO Starting job - add_with_logic (349afb69-d732-44b8-b770-2500f44511f9)
2023-11-23 23:46:57,421 INFO Finished job - add_with_logic (349afb69-d732-44b8-b770-2500f44511f9)
2023-11-23 23:46:57,422 INFO Starting job - add (64e623da-cf31-485e-9ed1-1f996a955868)
2023-11-23 23:46:57,424 INFO Finished job - add (64e623da-cf31-485e-9ed1-1f996a955868)
2023-11-23 23:46:57,425 INFO Finished executing jobs locally


In [6]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")

34ba86a3-99cd-461e-bc17-511fdf6e08ec -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
349afb69-d732-44b8-b770-2500f44511f9 -> {1: Response(output=None, detour=None, addition=Flow(name='Flow', uuid='20581dd8-b00e-424b-94ad-7f2b2f4a240b')
1. Job(name='add', uuid='64e623da-cf31-485e-9ed1-1f996a955868'), replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
64e623da-cf31-485e-9ed1-1f996a955868 -> {1: Response(output=5, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}


As you can see above, the addition job is correctly run twice. Now let's confirm that the addition job is only run once if the output of the first job is greater than 10.


In [7]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(addition=add(a, b))
    
job1 = add(1, 20)
job2 = add_with_logic(job1.output, 20)
flow = Flow([job1, job2])

responses = run_locally(flow)

2023-11-23 23:46:57,479 INFO Started executing jobs locally
2023-11-23 23:46:57,481 INFO Starting job - add (0ae20220-b69c-4e6b-827a-2f046b00d504)
2023-11-23 23:46:57,484 INFO Finished job - add (0ae20220-b69c-4e6b-827a-2f046b00d504)
2023-11-23 23:46:57,485 INFO Starting job - add_with_logic (145dea09-be6c-47fc-ac46-c9a6908323e7)
2023-11-23 23:46:57,490 INFO Finished job - add_with_logic (145dea09-be6c-47fc-ac46-c9a6908323e7)
2023-11-23 23:46:57,491 INFO Finished executing jobs locally


In [8]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")

0ae20220-b69c-4e6b-827a-2f046b00d504 -> {1: Response(output=21, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
145dea09-be6c-47fc-ac46-c9a6908323e7 -> {1: Response(output=None, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}


Now, we see that the `Response(addition)` does not launch a new job.


In this way, one can also compute the Fibonacci numbers:

In [9]:
"""A dynamic workflow that calculates the Fibonacci sequence."""
from jobflow import Response, job, run_locally


@job
def fibonacci(smaller, larger, stop_point=1000):
    """Calculate the next number in the Fibonacci sequence.

    If the number is larger than stop_point, the job will stop the workflow
    execution, otherwise, a new job will be submitted to calculate the next number.
    """
    total = smaller + larger

    if total > stop_point:
        return total

    new_job = fibonacci(larger, total, stop_point=stop_point)
    return Response(output=total, addition=new_job)


fibonacci_job = fibonacci(1, 1)

# run the job; responses will contain the output from all jobs
responses = run_locally(fibonacci_job)
print(responses)


2023-11-23 23:46:57,625 INFO Started executing jobs locally
2023-11-23 23:46:57,626 INFO Starting job - fibonacci (81d7944a-bd5a-4edd-a67f-36d2312e506b)
2023-11-23 23:46:57,630 INFO Finished job - fibonacci (81d7944a-bd5a-4edd-a67f-36d2312e506b)
2023-11-23 23:46:57,631 INFO Starting job - fibonacci (3219e168-b719-409a-8dbd-94370c03e162)
2023-11-23 23:46:57,634 INFO Finished job - fibonacci (3219e168-b719-409a-8dbd-94370c03e162)
2023-11-23 23:46:57,636 INFO Starting job - fibonacci (98308b42-817b-4ee5-bf68-fefcf1bd0de3)
2023-11-23 23:46:57,639 INFO Finished job - fibonacci (98308b42-817b-4ee5-bf68-fefcf1bd0de3)
2023-11-23 23:46:57,640 INFO Starting job - fibonacci (57c73c23-9fbf-4f08-8e0f-7698027b632b)
2023-11-23 23:46:57,643 INFO Finished job - fibonacci (57c73c23-9fbf-4f08-8e0f-7698027b632b)
2023-11-23 23:46:57,644 INFO Starting job - fibonacci (33c81f02-e87b-4848-8780-a09de20041b1)
2023-11-23 23:46:57,647 INFO Finished job - fibonacci (33c81f02-e87b-4848-8780-a09de20041b1)
2023-11-23

## The `Response(detour)` option


The `Response(detour)` option behaves similarly to `Response(addition)`. The difference is that `Response(addition)` will add a job (or flow) to the current flow, while `Response(detour)` will no longer run the current flow and will switch to a parallel job or flow.


In [10]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(detour=add(a, b))
    
job1 = add(1, 2)
job2 = add_with_logic(job1.output, 2)
flow = Flow([job1, job2])

responses = run_locally(flow)

2023-11-23 23:46:57,774 INFO Started executing jobs locally
2023-11-23 23:46:57,775 INFO Starting job - add (55465994-efcd-4835-b92e-f7ea36765268)
2023-11-23 23:46:57,778 INFO Finished job - add (55465994-efcd-4835-b92e-f7ea36765268)
2023-11-23 23:46:57,779 INFO Starting job - add_with_logic (d6cf8229-dbcc-422c-8504-177c433f48ac)
2023-11-23 23:46:57,786 INFO Finished job - add_with_logic (d6cf8229-dbcc-422c-8504-177c433f48ac)
2023-11-23 23:46:57,788 INFO Starting job - add (996e6af8-c56f-4779-8135-00493dbf9298)
2023-11-23 23:46:57,790 INFO Finished job - add (996e6af8-c56f-4779-8135-00493dbf9298)
2023-11-23 23:46:57,791 INFO Finished executing jobs locally


In [11]:
responses

{'55465994-efcd-4835-b92e-f7ea36765268': {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 'd6cf8229-dbcc-422c-8504-177c433f48ac': {1: Response(output=None, detour=Flow(name='Flow', uuid='98311314-8087-4787-b2ea-c89478ff2999')
  1. Job(name='add', uuid='996e6af8-c56f-4779-8135-00493dbf9298'), addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 '996e6af8-c56f-4779-8135-00493dbf9298': {1: Response(output=5, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}}

For this toy example, both `Response(addition)` and `Response(detour)` behave identically.
