# Workflow orchestration with Prefect

## 1. Introduction

**Workflow orchestration** is a set of tools that schedule and monitor the work you want to accomplish. If you have a computational pipeline that you want to run, the orchestration tool can schedule it and tell you where and when the pipeline fails.

In this chapter, we will work with the orchestration tool called [Prefect](https://www.prefect.io/).

<img src="media/prefect-logo.png" width="400"/>

### Pipeline example

The image below shows an example of a computational pipeline. Let's say we have a PostgreSQL database from which we extract data into Parquet files. We then use Pandas to ingest those Parquet files, combined with some of the API data we fetch. Then we pass it to Scikit-Learn to train the model, and then we register artifacts and experiments in MLFlow. Finally, we deploy the model using Flask.

<img src="media/pipeline-example.png" width="700"/>

Some steps can be grouped together into computational blocks highlighted with blue boxes. These blocks are interconnected. For example, what do we do if the database is under maintenance and it can not output the parquet files? This will affect the downstream tasks.

The goal of workflow orchestration is to minimize the impact of errors and fail "gracefully" when it happens. 

### Negative engineering

**Negative engineering** protects the outcomes of your code by defending them from possible failures. In other words, it is coding against failure. It takes about 90% of all engineering time in a project.

Workflow orchestration includes a set of features aimed at eliminating negative engineering. Some of these features are

- Notifications
- Observability into Failure 
- Conditional Failure Logic
- Timeouts

Nice [article](https://future.com/negative-engineering-and-the-art-of-failing-successfully/#:~:text=Negative%20engineering%20is%20the%20time,success%20of%20their%20primary%20objectives.) about negative engineering.

### Why do we need to retrain a model in production? 

The model accuracy usually decreases over time. This phenomenon is called drift. To avoid this, it is necessary to regularly retrain models. After retraining, you can push your new model to production keeping the performance high. 
Since it requires running machine learning pipeline multiple times, it makes sence to automate it with workflow orchestrating tool:

<img src="media/model-performance-evolution.png" width="700"/>

[Image source](https://evidentlyai.com/blog/machine-learning-monitoring-data-and-concept-drift)

## 2. Prefect framework

### Prefect Versions

Prefect is an Open-Source Workflow Orchestration Framework. There are two version: **Prefect Core** (or Prefect 1) and **Prefect Orion** (or Prefect 2).

**Prefect One** requires the user to define a **static graph (DAG)**, which means that you must specify in advance all the possible steps your workflow can take. Then at runtime, with help of triggers, you already know which part should not be executed in case of certain failures.

**Prefect Two**, on the other hand, is more **dynamic** and minimally invasive in Python code. The latter means that you have to make minimal changes to your code in order to build the workflow on top of it.

### Prefect in a code

The image below shows a simple code getting the number of stars for a GitHub repository:

<img src="media/prefect-example.png" width="700"/>

The bottom line of code starts a flow consisting of a single task called multiple times from a loop. Tasks and flows are defined with appropriate decorators that include some execution parameters in their definition (number of tries *etc*). These decorators are the only Prefect dependencies here.


## 3. Prefect flow basics

### Installing Prefect

By default, pip installs Prefect One. To install Prefect Two, you need to explicitly specify the version:
```
pip install prefect==2.0b5
```

### Adding flow and tasks

Let's add a flow decorator to the main function of [our script](https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/03-orchestration/prefect_flow.py):

```
from prefect import flow

@flow
def main():
    ....
```
When running the script with the flow decorator, you can notice that we get more logs.  

```
from prefect import task

@task
def train_best_model():
    ...
```

By calling a Task we make a Future, and by calling a Future we have to add `.result()`:

```
@task
def add_features(df_train, df_val):
    ...

X_train, X_val, y_train, y_val, dv = add_features(X_train, X_val).result()
```

### Prefect's UI

