## Workflow Orchestration

![img](https://miro.medium.com/max/1400/1*VTRH5WAotHAz_yKD-1XqWg.png)

Workflow orchestration frameworks are primarily used to coordinate, monitor and observe the movement of data in production applications. 

Such frameworks typically include a family of independent features that collectively make modern data pipelines fault-tolerant and robust. These features include:

* scheduling and triggering jobs
* retrying failed work
* dependency and state management
* caching expensive tasks
* resource management
* observability

These allow us to gracefully handle failure events, including scenarios beyond our control like cloud outages or API failures. Without explicitly tracking states in data pipelines, they become prone to triggering premature jobs, re-running already completed work, or even failing haphazardly. 

The features workflow orchestration provides are not limited to supporting the scheduled movement of data from a source to a destination. 

These features are also heavily applied in other domains such as machine learning and parameterized report generation. Presently, workflow orchestration is getting simple enough for hobbyists to adopt for personal projects. 


## Workflows without orchestration

Below we're going to demonstrate a very basic example of what happens when you don't take orchestration into account.

If you've ever written code before, this will not be new to you, but it is worth making explicit.

The code mimics a simple data pipeline, which makes a call to an API service, augments the data, and then writes the results to our database.

The major difference is that the API call that we are making will fail half the time. This is hopefully much more frequently than your API calls will fail in production, but is useful for demonstration purposes.

In [None]:
import random 

def call_unreliable_api():
    choices = [{"data": 42}, "failure"]
    res = random.choice(choices)
    if res == "failure":
        raise Exception("Our unreliable service failed")
    else:
        return res

def augment_data(data: dict, msg: str):
    data["message"] = msg
    return data

def write_results_to_database(data: dict):
    print(f"Wrote {data} to database successfully!")
    return "Success!"

def pipeline(msg: str):
    api_result = call_unreliable_api()
    augmented_data = augment_data(data=api_result, msg=msg)
    write_results_to_database(augmented_data)


Try running the pipeline a few times

In [None]:
pipeline(msg="Super Special Message")

## Negative Engineering

This is obviously a trivial example, and as engineers, we know to expect these things and to deal with them. BUT, dealing with ways code fails is NOT what we set out to do. We set out to write a data pipeline.

The process of writing code that deals with failures, instead of writing code that performs the actions that we want done, is something that we refer to as `Negative Engineering`. 

> Negative Engineering happens when engineers write defensive code to make sure the positive code acutally runs. Writing code that anticipates the infinite number of possible failures.

Prefect aims to eliminate as much of that negative engineering as possible for you.


## Using a Prefect Flow

It's easier to show than it is to tell, so let's run this next block and then we'll explain what is happening.

#### Installation 
First, make sure that `Prefect 2.0` is installed. 

You can run `pip install -U "prefect>=2.0b"` in your command line
or you can run `!pip install -U "prefect>=2.0b"` in a python cell in your Jupyter Noteook

#### Creating a flow
To create a flow, we simply import `flow` from `prefect` and then add it as a decorator to our pipeline function

In [None]:
import random
from prefect import flow    # NEW **** 

def call_unreliable_api():
    choices = [{"data": 42}, "failure"]
    res = random.choice(choices)
    if res == "failure":
        raise Exception("Our unreliable service failed")
    else:
        return res

def augment_data(data: dict, msg: str):
    data["message"] = msg
    return data

def write_results_to_database(data: dict):
    print(f"Wrote {data} to database successfully!")
    return "Success!"

@flow   # NEW ****
def pipeline(msg: str):
    api_result = call_unreliable_api()
    augmented_data = augment_data(data=api_result, msg=msg)
    write_results_to_database(augmented_data)

Now let's try running our pipeline again a few times.

In [None]:
pipeline("Trying a flow!")

Our flow still sometimes succeeds and sometimes fails, but something interesting happened. We now have logs, and we see that the error is caught instead of halting execution.

## The Prefect Database

These logs aren't just printed. These are actually persisted to a database without any effort on your part. This give you visibility into how your code fails or succeeds.

By default, the database is a SQLite database that lives at `~/.prefect`.

If you execute the `!ls ~/.prefect` command below, you should see `orion.db`.

You may or may not also see a `profiles.toml` file. For now ignore it, whether or not you see it.

In [None]:
# Hit shift + enter to execute command
!ls ~/.prefect

## Examining the contents of our database

I like to use an application called `DB Browser for SQLite` to examine my SQLite databases. You can download it [here](https://sqlitebrowser.org/).

We can see the tables that Prefect created in our database below. The table that we care about right now is `log`.

<img src="images/db1.png" width=200>

If we look at the data in our log table, we can see that our logs were, in fact, saved!

<img src="images/db2.png" width=400>

## Making our flows better with tasks

Flows are only the first step of orchestrating our data pipelines. The next step is adding Prefect Tasks. 

A Task can be thought of as a discreet unit of work. If this sounds like a similar description to a function to you, you'd be correct. In practice, you'll often simply convert the functions that make up your flow into tasks.

Like Flows, Tasks are created by adding a decorator. We'll demonstrate below

In [None]:
import random
from prefect import flow, task    # NEW **** 

@task   # NEW ****
def call_unreliable_api():
    choices = [{"data": 42}, "failure"]
    res = random.choice(choices)
    if res == "failure":
        raise Exception("Our unreliable service failed")
    else:
        return res

@task   # NEW ****
def augment_data(data: dict, msg: str):
    data["message"] = msg
    return data

@task   # NEW ****
def write_results_to_database(data: dict):
    print(f"Wrote {data} to database successfully!")
    return "Success!"

@flow 
def pipeline(msg: str):
    api_result = call_unreliable_api()
    augmented_data = augment_data(data=api_result, msg=msg)
    write_results_to_database(augmented_data)

Let's run our pipeline again!

In [None]:
pipeline("We're using flows and tasks now!")

## Understanding the power of tasks
Again, we're still getting errors, but our logs are clearer, and we have better visibility into our data pipeline.

### Adding names
The first feature of tasks that we're going to show are names. This allows you to add descriptive names to your tasks, independent of the underlying functions name.

We do with by passing the `name` parameter to the task decorator. (Note: you can also add names to flow using the `name` parameter)

```python
@task(name="Get data from API")
def call_unreliable_api():
```

### Adding retries
The next feature that we will demo is the ability to retry a task. We know that tasks will inevitably fails. Sometimes this requires complex behavior, but other times we simply need to try again after a brief delay. We can do this with the `retries` and `retry_delay_seconds` parameters.

This will be very helpful for our unreliable API call.

```python
@task(name="Get data from API", retries=4, retry_delay_seconds=2)
def call_unreliable_api():
```

Let's update our code and give it a try!

In [None]:
import random
from prefect import flow, task

@task(name="Get data from API", retries=4, retry_delay_seconds=2)   # NEW ****
def call_unreliable_api():
    choices = [{"data": 42}, "failure"]
    res = random.choice(choices)
    if res == "failure":
        raise Exception("Our unreliable service failed")
    else:
        return res

@task(name="Add message to data")   # NEW ****
def augment_data(data: dict, msg: str):
    data["message"] = msg
    return data

@task(name="Write results to database")   # NEW ****
def write_results_to_database(data: dict):
    print(f"Wrote {data} to database successfully!")
    return "Success!"

@flow(name="Previously unreliable pipeline")    # NEW ****
def pipeline(msg: str):
    api_result = call_unreliable_api()
    augmented_data = augment_data(data=api_result, msg=msg)
    write_results_to_database(augmented_data)

In [None]:
pipeline("Now we're cooking!")

If you ran it a few times, you can see that we still encountered an exception, and that the exception was logged. But this time we were also able to overcome it by retrying, allowing the Flow to complete in the manner we intended. 

## Moving on to production-level orchestration

So far we have covered trivial examples inside of a Jupyter Notebook. To demonstrate the power of Prefect, we'll need to begin moving outside of the notebook environment. 

We'll stick with the trivial example for now so that we can focus on concepts, but there will be a more complex flow demonstrated at the end. 

We've moved the code from the last example to the file `trivial-flow.py`.

## Deployments

To perform any of the complex orchestration actions that are coming up, we'll need to create Deployments. These sound a lot more complex than they are.

To create a Deployment you need just two things:
    * A flow
    * A deployment specification

We already have a flow, so let's create a deployment specification. The code will be provided below, but it needs to live in a `.py` file. The same code can be found in `trivial-spec.py`

In [None]:
from prefect.deployments import DeploymentSpec, SubprocessFlowRunner

DeploymentSpec(
    name="my-first-deployment",
    flow_location="./trivial-flow.py",
    flow_name="Previously unreliable pipeline",
    parameters={'msg':'Hello from my first deployment!'},
    tags=['ETL'],
    flow_runner=SubprocessFlowRunner()
)

## Understanding the DeploymentSpec
Our flows tell us what we want to do, our deployment specifications tell us how we're going to do it.

Specifically, we use the `DeploymentSpec` class to handle specifying the details of how our flow is going to run.

#### Understanding the parameters
`name` is the name that we our giving our deployment

`flow_location` is where the file containing the code that we want to run is located. We can use a relative file path for this.

`flow_name` is the name that we provided in our flow decorator

`parameters` is the parameters that we want to pass to our flow. If we look back at our flow code, we can see that the function signature is `pipeline(msg: str)`. So we pass a dictionary to our Deployment Spec `{'msg':'Hello from my first deployment!'}` where the key `'msg'` corresponds to our flow's parameter `msg` and the value in the dictonary is the value that we will pass to the flow.

`tags` are used to organize deployments. Maybe you have machine learning flows and ETL flows, and you want to quickly be able to separate them. You can tag your machine learning flows with `"ml"` and your ETL flows with `"ETL"`. Tags are also used for work-queues, which we will explain soon.

`flow_runner` specifies the way that you want this flow run. We're going to hand wave this a bit for now so that it's not too distracting. The important thing to know for now is that `SubprocessFlowRunner` is used to run a flow on your local machine.

## Breather?

That's a lot to cover. All of this will make more sense after we cover the next few several steps, so don't panic if you're feeling fuzzy. Take a moment and then continue forward.

## Creating a deployment

Most fun things happen in Prefect using the `prefect` command line tool. The general syntax is `prefect <thing-you-are-dealing-with> <command>`.

In our case, we're dealing with deployments, and we want to create one using the deployment specification named `trivial-deployment.py`.

We can that by running `prefect deployment create "trivial-deployment.py"`

## What happens when we create a deployment?

You should notice that your console printed a few messages. One of them might have been a warning. Ignore that for now. The stuff that we're most interested in is what comes at the end.

<img src="images/deployment1.png" width=600>

#### Storage
When you create a deployment, Prefect saves your code in your storage, and notes its location. Prefect also saves the information from your `DeploymentSpec` in its database. This distinction - that your code goes in YOUR storage - is incredibly important. Prefect is able to orchestrate the execution of your code without ever seeing it, allowing for greater security and privacy.

If you navigate to `~/.prefect`, you should see both the database that we have seen already, and storage.

<img src="images/deployment2.png" width=400>

If we look in the storage directory, we'll see a file with name that is a mix of letters and numbers. 

<img src="images/deployment3.png" width=200>

This is the file that contains our flow code. If we use the `cat` command to examine it, we'll see a copy of our code in the state that it was when we created our deployment.

<img src="images/deployment4.png" width=400>

Because we've made a copy, if we modify our code in our `trivial-flow.py` file, it will not change the code that the deployment runs until we create a new deployment. This keeps our deployments stable while we continue to develop our flows.

#### The Deployment database table
While our code is saved to storage, information about deployment is saved to our database. Using the database explorer, we can examine the `deployment` table, where we will see the deployment that we just created.

<img src="images/deployment5.png" width=700>

You can see that all of our information is there - the name, the tags, the paramters, etc.


## The Prefect UI

We now have flows and deployments. It's time to introduce the Prefect UI. This will allow us to visually inspect our flows and deployments, schedule them, and handle a variety of other important tasks. Right now it will have limited functionality, but in only a few more steps, we'll have access to its full capabilities.

To access the UI, type the command `prefect orion start` in your terminal. This will launch a long-running Prefect application. You should see `Check out the dashboard at http://127.0.0.1:4200`. If you scroll up in your terminal a bit, your output should look similar to the picture below:

<img src="images/ui1.png" width=300>

Click on the link or paste the URL into your web browser to the see the Prefect UI. You should now be able to see the UI, including red and green bars showing failed and succeeded runs, and our flows. 

<img src="images/ui2.png" width=500>

The UI is able to show us the information that Prefect has been saving to our database. If you click around, you will also find the logs for individual flows and tasks. Take a moment and explore.

#### Why are my deployments at 0?
You may notice that our `Deployments` tab shows `0` even though we just showed that our deployment table in our database has our deployment information. This is because we haven't run any deployments yet. If instead you 
* 1. click on the `Flows` tab, and then 
* 2. click on `Previously unreliable pipeline` flow, you will 
* 3. see `my-first-deployment` under the `Deployments` heading in the sidebar. 

<img src="images/ui3.png" width=500>


## Work queues

Work queues are one of the last two pieces that we need to begin full orchestration of our flows.

Remember tags from our DeploymentSpec? The examples that we used were `ml` for machine learning and `ETL` for, well, ETLs.

Work queues are ways for us to organize flows that we want to run, based on things like tags. You can think of it as a fancy filter if you'd like.

To create a work queue we use the command `prefect work-queue create -t <tag> <queue-name>` where `<queue-name>` is the name that you want to give the work-queue. When you run the command the console will output will be the work queue's unique ID. 

Open a new console (because your other console is running the Prefect application) and run the command `prefect work-queue create -t "ETL" etl-queue`. Your output should look similar to the image below:

<img src="images/workq1.png" width=400>

If we check the `work_queue` table of our database we will see `etl-queue` there.

<img src="images/workq2.png" width=500>

If you ever forget which work queues you have available, you can run the command `prefect work-queue ls` to view them

<img src="images/workq5.png" width=400>


## Adding flow runs to work queues

Return to the Prefect UI, click on the `Flows` tab, and then click on the flow `Previously unreliable pipeline`, you will open the sidebar. Once the sidebar is open, look for the `Deployments` heading. To the right you will see the `Quick Run` button. Go ahead and click it. You should briefly see a pop-up that says "Flow Run Scheduled".

<img src="images/workq3.png" width=500>

Your run history should now have a new addition - a yellow bar. If you hover over it, you will see that it says `Late flow runs`. This is because we clicked `Quick Run`, which schedule a flow to be run immediately, but the flow has not started running.

<img src="images/workq4.png" width=500>

We can check the flows in our work queue by running the command `prefect work-queue preview <work-queue's unique ID>`

<img src="images/workq6.png" width=500>

So we definitely have a flow lined up in our work queue, but it is late. What's wrong now?

## Agents - the final piece of the puzzle!

We mentioned elsewhere that your code stays with your infrastructure. This means that the Prefect application never has direct access to it, so it can't run your code for you. All it can do is say which code need to be run, and pass information for your DeploymentSpec. This is the Prefect Hybrid Execution model. You supply the code, we supply the orchestration.

In order to start a flow on your computer from the application, we need a Prefect Agent.

An Agent is just a long running process that lives on your compute infrastructure. When we create the agent, we provide it a work-queue to monitor. It then queries the Prefect application every 5 seconds, checking if there is code that it should run. When a flow is added to the work-queue that the agent is responsible for, the API sends the agent the flow information. The agent then kicks of the Flow.

#### Starting an agent
You'll want to open up a new terminal window, because the agent is a long-running process. Then run the magic command `prefect agent start <work-queue-id>`. If you've forgotten your work-queue's unique ID, you can check with `prefect work-queue ls`.

<img src="images/agent1.png" width=400>

Your agent will immediately begin executing the flow that is in your etl-queue. If you check your work queue again using `prefect work-queue preview <work-queue-id>` you will see that it is empty.

<img src="images/agent2.png" width=500>

If you check your Run History in the UI again, you will now see that the bar that was previously yellow is now green.

<img src="images/agent3.png" width=500>

Now if you click `Quick Run` again, the flow will immediately execute.

## Next Steps

Congratulations! You now have all of the basic components needed to orchestrate flows with Prefect. 

TODO