## **Understanding dynamic flows**

You may have come across the term “DAG” when researching workflow orchestration. You may have even used a DAG before. Or you may have never heard the word before and you have no clue what I’m referring too. 

Well, in the eyes of Prefect 2.0, you are all equal! Prefect 2.0 does not use DAGs for its flows. For those of you who have been previously constrained by DAGs, you’ll be happy to know that Prefect allows you to have branching logic and loops using essentially pure Python syntax. For those of you unfamiliar with DAGs, the key takeaway is that if you can write the logic in Python, you can apply that logic to a Prefect flow.

It remains easier to show than tell, so let’s jump into it!

In [None]:
import random
from prefect import flow, task

@task(name="Get a letter")   
def get_letter():
    choices = ["A", "B"]
    res = random.choice(choices)
    return res

@task(name="Task A")   
def a_stuff():
    return "We are doing Task A"

@task(name="Task B")  
def b_stuff():
    return "We are doing Task B"

@task(name="Print Result")
def print_result(letter: str, result: str):
    print("*"*20)
    print(f"Letter: {letter}")
    print(f"Result: {result}")
    print("*"*20)

@flow(name="Branching Flow")    
def pipeline():
    letter = get_letter()

    if letter.result() == "A":
        res = a_stuff()
    else:
        res = b_stuff()
    
    print_result(letter, res)


if __name__ == "__main__":
    pipeline()

### **So what’s going on here?**

We have a task `get_letter()` that will return either `"A"` or `"B"`. If look at our flow, we see that we have an `if/else` block, and we will execute one task if `get_letter` returns `"A"` and different task otherwise. 

This looks almost exactly like the Python you’re used to, with one exception: we call `get_result()` on the output of our function `get_letter()`. This is the only piece of Prefect syntax that you have memorize, so bear with me while I explain. 

**Explanation for those familiar with async**

Prefect tasks return futures (specifically a `PrefectFuture`). So when we say `letter = get_letter()` , `letter` is a future. We can’t use its value for the `if` statement until it resolves. By calling `letter.result()` we await the result of the future before beginning the `if/else` logic.

**A brief explanation of futures those unfamiliar with async**

#TODO

### **Passing results to other tasks**

You might have noticed that when we pass results to other tasks, like `print_result(letter, res)` we don’t have to call `.result()` on either `letter` or `res`. This is because Prefect knows that these are futures, and automatically calls it for you when you pass the futures to other tasks. 

### **Passing results to functions that aren’t tasks**

Prefect tasks will call `.result()` because they are Prefect tasks. Regular functions won’t do this for you, so you should call `.result()` when passing the results of Prefect tasks to regular Python functions. 

In short, if you will be performing Python logic on the results of a Prefect task, and it’s happening inside of a flow, but not inside of a task, you need to call `.result()`.

In [None]:
from prefect import flow, task

def regular_func_add_one(n: int):
    return n + 1

@task
def task_return_two():
    return 2

@flow
def my_flow():
    two = task_return_two()
    print(
        regular_func_add_one(
            two.result()
        )
    )

if __name__ == "__main__":
    my_flow()

### **Looping in your flows**

Looping is trivial to do in Prefect. Just write a loop. That’s literally it.

In [3]:
@flow(name="Branching Flow")    
def pipeline():
    letter_box = []
    for i in range(10):
        letter = get_letter()
        letter_box.append(letter.result())

    print("*"*20) 
    print(letter_box)
    print("*"*20) 

A note on the flow above. We are getting the letter future from get_letter() and then appending letter.result() to our list. We can skip the middle man and just call letter_box.append(get_letter().result() as shown below:

### **But what about While loops?**
Also totally okay in Prefect 2.0!

In [4]:
@flow(name="Infinite Loop Flow")    
def pipeline():
    letter_box = []
    while True:
        letter_box.append(get_letter().result())
        print(letter_box)
        time.sleep(1)

## Subflows

So we now know that we can use `if/else` logic, loops, and regular Python functions in our flows. The final frontier is flows. Spoiler alert: yes, you absolutely can use other flows inside of your Prefect flows. Let’s check out an example:

In [None]:
import random
from prefect import flow, task

@task(name="Get a letter")   
def get_letter():
    choices = ["A", "B"]
    res = random.choice(choices)
    return res

@flow(name="Letterbox Flow")    
def letterbox_pipeline():
    letter_box = []
    for i in range(10):
        letter_box.append(get_letter().result())

    return letter_box

@task(name="Count letters")
def count_letters(letterbox):
    counts = {"A": 0, "B": 0}
    for letter in letterbox:
        if letter == "A":
            counts["A"] += 1
        else:
            counts["B"] += 1

    return counts

@flow(name="Multi-flow!")
def multi_flow():
    letters = letterbox_pipeline()
    counts = count_letters(letters)
    print(counts.result())

if __name__ == "__main__":
    multi_flow()

### Understanding the new flow
We have separated our pipeline into new components. 
* We have the flow `Letterbox Flow` which gets a list of letters by looping over the `get_letter` task
* We have another flow `Multi-flow` which 
    * runs `Letterbox Flow`
    * pass the those results to a task that counts in the incidences of each letter
    * then finally prints the results

We call a flow that is being run inside of a flow a `sub-flow`. This sub-flow is created the same way that any other flow is created, and can be run as its own flow. What makes it a sub-flow is simply the fact that we are running it inside of another flow.

As you can see from the example, we are able to use a flow just like we would a task. We can pass the results of tasks and flows to other tasks and flows as we see fit.

### Using branching logic with sub-flows
There are many situations where you might want the results of one of your tasks to trigger a flow under a specific set of circumstances. To take an example from my previous work: maybe you have a task that checks for distribution shift and kicks off of an expensive machine learning training job only if the results are above a certain threshold. 

Using branching logic with sub-flows is no different than for tasks. In the example below, we modify if the previous example to only run `Letterbox Flow` if we randomly select an even number. Otherwise we set the value of letters to be a different value.

In [12]:
@flow(name="Multi-flow-branching")
def multi_flow():
    rand_res = random.randint(1, 10)
    if rand_res % 2 == 0:
        letters = letterbox_pipeline()
    else:
        letters = ["A", "B", "A"]
    counts = count_letters(letters)
    print(counts.result())

if __name__ == "__main__":
    multi_flow()

10:39:24.915 | INFO    | prefect.engine - Created flow run 'competent-mantis' for flow 'Multi-flow-branching'
10:39:24.916 | INFO    | Flow run 'competent-mantis' - Using task runner 'ConcurrentTaskRunner'
10:39:24.965 | INFO    | Flow run 'competent-mantis' - Created subflow run 'lyrical-avocet' for flow 'Letterbox Flow'
10:39:24.987 | INFO    | Flow run 'lyrical-avocet' - Created task run 'Get a letter-d430fbfc-50' for task 'Get a letter'
10:39:25.010 | INFO    | Task run 'Get a letter-d430fbfc-50' - Finished in state Completed()
10:39:25.023 | INFO    | Flow run 'lyrical-avocet' - Created task run 'Get a letter-d430fbfc-51' for task 'Get a letter'
10:39:25.047 | INFO    | Task run 'Get a letter-d430fbfc-51' - Finished in state Completed()
10:39:25.058 | INFO    | Flow run 'lyrical-avocet' - Created task run 'Get a letter-d430fbfc-52' for task 'Get a letter'
10:39:25.080 | INFO    | Task run 'Get a letter-d430fbfc-52' - Finished in state Completed()
10:39:25.091 | INFO    | Flow run 

{'A': 4, 'B': 6}


## Examining the UI with sub-flows
Start the Prefect app using `prefect orion start` and then navigate to the UI and follow these steps:
1. Click the Prefect icon on the left to get to the home screen 
2. Click the `Flows` tab beneath the `Run History` chart
3. One of your options should be `Multi-flow-branching` if you copied our code directly, or whatever you named the last example before you ran it. Click on its name. This will open up a sidebar
4. Find the box on the sidebar that says `2 COMPLETED` (or however many times you ran it). Click on that box, which will take you to the next screen.
5. In the new screen, click on the `Flow Runs` tab
6. Click on any of those runs to investigate them further
7. Click the expand icon on the Radar chart

<img src="images/steps-1.png" width=600>
<br>
<img src="images/steps-2.png" width=600>
<br>
<img src="images/steps-3.png" width=600>

### Viewing dynamic flows with the Radar Chart
The ability to create dynamic flows is incredibly powerful, but it means that a traditional DAG graph will not work for visualizing our execution. Some tasks may or may not run based on branching logic, or may run multiple times based on loop logic. Prefect 2.0 uses the Radar Chart to visualize this dynamic execution. 

As we mentioned in Prefect 101, Radar Chart execution is visualized with the innermost part of a circle representing the earliest-running tasks, and the outermost part of the circle showing the last tasks.

We will demonstrate the use of the Radar Chart with the last flow that we ran - `Multi-flow-branching`. Remember, the sub-flow only ran if our random number was even. Let's look at the case where it ran successfully:

<img src="images/both-run-far.png" width=600>

Zoomed in, it looks like:

<img src="images/both-run-close.png" width=600>

We can see that the inner-most item is the sub-flow, which ran 10 tasks. It is then connected to the task `Count letters` by a green line. This line means that the `Count letters` task depends on the sub-flow. It can be easier to conceptualize it as "the sub-flow passes its results to `Count letters`".

If we click on `10 task runs` it will take us to the Radar Chart for for the sub-flow. 

<img src="images/sub-flow-zoom.png" width=400>

We can see that all ten tasks are located in the same ring, with no inward or outward arrows. This is because the tasks did not depend on any other tasks, and had no other tasks that depended on them.

<img src="images/letterbox-flow.png" width=600>

Finally in the case that `Multi-flow-branching` had an odd number, we will see a Radar Chart with a single task. If we look back at the Flow we will see that only the task `Count letters` runs in the case of an odd number, so this correctly captures our flow's execution.

<img src="images/one-ran.png" width=600>