# Prefect

In the previous module, we learned about experiment tracking and model registry.
In particular, we discussed how to get a candidate model and promote it from staging to production.
In this module, we learn how to automate this process, and having this scheduled with workflow orchestration using [Prefect 2.0](https://orion-docs.prefect.io/).
Prefect allows us to orchestrate and observe workflows. One of the primary goals of Prefect is to minimize time spent on **negative engineering**, i.e. coding against all possible causes of failure, by having fault-tolerating data pipelines. Prefect provides tools for setting up pipelines using code, logging, scheduling, retries, caching, notifications, visualizing dependencies, as well as ad hoc execution and parametrization of scheduled tasks.

In [None]:
!pip list | grep prefect

## Prefect flows

A **flow** in Prefect is simply a Python function. This consists of **tasks** which can be thought of as a minimally observable unit of work. Flows can also be called inside flows as subflows. Consider the following example. Here we simulate getting data from an unreliable API, augmenting the fetched data, and writing the resulting data into a database. 

```{margin}
[Getting Started with Prefect 2](https://www.prefect.io/guide/blog/getting-started-prefect-2/)
```

In [None]:
import time
import random

from prefect import flow, task 


@task(retries=3, retry_delay_seconds=5)
def call_unreliable_api():
    choices = [{"data": 42}, {"data": -1}, {"data": 0}]
    res = random.choice(choices)
    if res["data"] <= 0:
        raise Exception("Our unreliable service failed.")
    else:
        time.sleep(10)
        return res

@task
def augment_data(data: dict, msg: str):
    data["message"] = msg
    return data

@task
def write_to_database(data: dict):
    print(f"Wrote {data} to database successfully!")
    return "Success!"

@flow(log_prints=True)
def pipeline(msg: str):
    api_result = call_unreliable_api()
    augmented_data = augment_data(data=api_result, msg=msg)
    write_to_database(augmented_data)


pipeline(msg="0")

## Prefect Orion UI

Starting the server:

Notice that this failed before pushing through. We can start the UI by calling `prefect orion start` in any directory (`.prefect` is saved in the system's root directory). This starts the Prefect Orion server in port 4200.

```bash
$ prefect orion start
Starting...

 ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
| _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
|  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
|_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

Check out the dashboard at http://127.0.0.1:4200
```

We navigate around the UI to find the `pipeline` flow and its most recent which, as we have seen in the logs, was able to complete its execution. Here we see that this flow started on `2022/06/10 11:05:51 PM` and ended on `2022/06/10 11:05:52 PM`. We also see the logs has the details of the exception when the API call failed. In the second tab, we can see the tasks that make up this flow. There is also the subflow tab which shows that we can call flows from a parent flow.

```{figure} ../../../img/hello-world-2.png
---
---
```

One of the more interesting features of the dashboard is **Radar** on the right. This shows the dependence between tasks. Notice the linear dependence of the tasks, e.g. `write_to_database` depends on `augment_data` task but not on `call_unreliable_api`. Hovering on the tasks show the backward and forward data dependencies. Having tasks arranged in concentric circles allow for a heirarchy of dependence. Note that the runtime for each task is also conveniently displayed.

```{figure} ../../../img/hello-world-1.png
---
---
```

Finally, let us look at a flow which failed to complete all its tasks. Here all calls to the API failed despite the retries. The radar plot nicely shows where the flow has failed. This is really useful, especially when we have a dozens task and multiple subflows happening in our data pipeline.


```{figure} ../../../img/hello-world-3.png
```

<br>

**Remark.** Note also that geometrically there is more space available to grow the dependence tree compared to top-down or left-right approaches due to nodes being farther apart as we move radially with a fixed angle, this also allows Radar to minimize edge crossing by combining radial and circumferential movement for the edges between task nodes. 

Furthermore, Radar dynamically updates as tasks complete (or fails). The mini-map, edge tracing, and node selection tools make workflow inspection doable even for highly complex graphs. See [*Introducing Radar*](https://www.prefect.io/guide/blog/introducing-radar/) for further reading.



## MLflow runs as flow

In this section, we will write Prefect flows for running modelling experiments as a flow in Prefect. Our idea is to define a `main` flow which consists of one subflow `preprocess_data` and two tasks which will execute MLflow runs. Note that the output of the `preprocess_data` subflow will be used by the two MLflow runs, so there will be some data dependency. 

```{margin}
[`utils.py`](https://github.com/particle1331/ok-transformer/blob/85993956250601edeccf0d1bd5f192bb20873677/docs/nb/mlops/3-prefect/utils.py)
```
```python
@task
def load_training_dataframe(file_path, y_min=1, y_max=60):
    """Load data from disk and preprocess for training."""
    
    # Load data from disk
    data = pd.read_parquet(file_path)

    # Create target column and filter outliers
    data['duration'] = data.lpep_dropoff_datetime - data.lpep_pickup_datetime
    data['duration'] = data.duration.dt.total_seconds() / 60
    data = data[(data.duration >= y_min) & (data.duration <= y_max)]

    return data


@task
def fit_preprocessor(train_data):
    """Fit and save preprocessing pipeline."""

    # Unpack passed data
    y_train = train_data.duration.values
    X_train = train_data.drop('duration', axis=1)    

    # Initialize pipeline
    categorical = ['PU_DO']
    numerical = ['trip_distance']

    preprocessor = make_pipeline(
        PrepareFeatures(categorical, numerical),
        DictVectorizer(),
    )

    # Fit only on train set
    preprocessor.fit(X_train, y_train)
    joblib.dump(preprocessor, artifacts / 'preprocessor.pkl')
    
    return preprocessor


@task
def create_model_features(preprocessor, train_data, valid_data):
    """Fit feature engineering pipeline. Transform training dataframes."""

    # Unpack passed data
    y_train = train_data.duration.values
    y_valid = valid_data.duration.values
    X_train = train_data.drop('duration', axis=1)
    X_valid = valid_data.drop('duration', axis=1)
    
    # Feature engineering
    X_train = preprocessor.transform(X_train)
    X_valid = preprocessor.transform(X_valid)

    return X_train, y_train, X_valid, y_valid


@flow
def preprocess_data(train_data_path, valid_data_path):
    """Return feature and target arrays from paths. 
    Note: This just combines all the functions above in a single step."""

    train_data = load_training_dataframe(train_data_path)
    valid_data = load_training_dataframe(valid_data_path)
    preprocessor = fit_preprocessor(train_data)

    # X_train, y_train, X_valid, y_valid
    return create_model_features(preprocessor, train_data, valid_data).result()
```

Next, we will create the `main` flow described above. Here we are passing around a [`PrefectFuture`](https://orion-docs.prefect.io/api-ref/prefect/futures/) object instead of Python objects. Futures represent the execution of a task and allow retrieval of the task run's state. This so that Prefect is able to track data dependency between tasks &mdash; converting to Python objects, i.e. using `.result()`, breaks this lineage. Note that once a future has been passed into the function, then we can treat it as a usual Python object as the `task` wrapper has done work to unpack the future object into Python objects.

For the `main` flow, we execute the following sequentially: running a subflow for preprocessing the datasets for modelling, training a linear regression baseline model, and training XGBoost models with different hyperparameters sampled using [TPE](https://optunity.readthedocs.io/en/latest/user/solvers/TPE.html). Sequential execution ensures that all resources are allocated to a single learning algorithm at each point in the flow run.

```{margin}
[`main.py`](https://github.com/particle1331/ok-transformer/blob/85993956250601edeccf0d1bd5f192bb20873677/docs/nb/mlops/3-prefect/main.py)
```
```python
def objective(params, xgb_train, y_train, xgb_valid, y_valid):
    """Compute validation RMSE (one trial = one run)."""

    with mlflow.start_run():
        
        model = xgb.train(
            params=params,
            dtrain=xgb_train,
            num_boost_round=100,
            evals=[(xgb_valid, 'validation')],
            early_stopping_rounds=5,
            verbose_eval=False
        )

        # MLflow logging
        ...

    return {'loss': rmse_valid, 'status': STATUS_OK}


@task
def xgboost_runs(num_runs, data):
    """Run TPE algorithm on search space to minimize objective."""

    X_train, y_train, X_valid, y_valid = data
    xgb_train = xgb.DMatrix(X_train, label=y_train)
    xgb_valid = xgb.DMatrix(X_valid, label=y_valid)

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
        'learning_rate': hp.loguniform('learning_rate', -3, 0),
        'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
        'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
        'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
        'objective': 'reg:squarederror',
        'seed': 42
    }

    best_result = fmin(
        fn=partial(
            objective, 
            xgb_train=xgb_train, y_train=y_train, 
            xgb_valid=xgb_valid, y_valid=y_valid,
        ),
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_runs,
        trials=Trials()
    )


@task
def linreg_runs(data):
    """Run linear regression training."""

    X_train, y_train, X_valid, y_valid = data
    
    with mlflow.start_run():

        model = LinearRegression()
        model.fit(X_train, y_train)

        # MLflow logging
        ...

        
@flow(task_runner=SequentialTaskRunner())
def main(
    train_data_path, 
    valid_data_path, 
    num_xgb_runs, 
    experiment_name,
    tracking_uri,
):
    # Set and run experiment
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(experiment_name)

    data = preprocess_data(train_data_path, valid_data_path)
    linreg_runs(data)
    xgboost_runs(num_xgb_runs, data)
```


Looking at the dashboard, we can see a run of the `main` flow. As expected, this consists of 3 tasks that are executed sequentially as indicated in the timeline. 

```{figure} ../../../img/mlflow-runs-dashboard.png
---
---
```

Recall that the first task `preprocess-data` is a subflow. This has concurrent execution which we can see from overlapping lines in its timeline graph. This subflow consists of four tasks.

```{figure} ../../../img/radar_preprocessing.png
---
---
```

If we check out the radar of the `main` flow, we see the following. Here in an earlier screenshot, we see that `xgboost_runs` is currently running for 1 minute and 14 seconds. Note that both MLflow runs depending on the preprocessing subflow can be seen by the presence of edges. We can go down on the radar for the `preprocess-data` subflow by clicking on the `4 task runs` button.


```{figure} ../../../img/radar_xgb.png
---
---
```

Here we see the radar plot. Hovering on each task shows its data dependence on other tasks. For each task, the forward and backward data dependency edges are shown by the Radar graph. For example, in the figure below we can see the dependencies for the task `load_training_dataframe` that loads the train dataset. This sends data to the preprocessor (for training) and to the final task (for transformation) which returns all processed data for modelling, so we can see two forward dependencies.

```{figure} ../../../img/radar.png
---
---

```

## Deployment

In this section, we deploy a workflow that puts the best model in an MLflow experiment to staging in the model registry. This can be useful for regularly staging candidate models models trained on new data. The staged models can then be further checked if it should be deployed into production. The code below is covered in [Experiment Tracking and Model Management](https://particle1331.github.io/ok-transformer/nb/mlops/2-mlflow/2-mlflow.html#api-workflows).

### Model staging review

Below we will run MLflow on localhost with SQLite. This can be easily adapted to a remote tracking server we only have to change `MLFLOW_TRACKING_URI`. Connecting to the client:

In [None]:
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)


def print_experiment(experiment):
    print(f"(Experiment)")
    print(f"    experiment_id={experiment.experiment_id}")
    print(f"    name='{experiment.name}'")
    print(f"    artifact_location='{experiment.artifact_location}'")
    print()

for experiment in client.list_experiments():
    print_experiment(experiment)

Recall that the flow performs training a linear model and XGBoost models with different parameters. As sort of minimum requirements, we only consider those models with validation RMSE less than `6.5` and inference time less than `2e-5`.

In [None]:
candidates = client.search_runs(
    experiment_ids=1,
    filter_string='metrics.rmse_valid < 6.5 and metrics.inference_time < 20e-6',
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse_valid ASC"]
)

for run in candidates:
    print(f"run_id: {run.info.run_id}   rmse_valid: {run.data.metrics['rmse_valid']:.3f}   inference_time: {run.data.metrics['inference_time']:.4e}")


Having satisfied the operating requirements, we can take the model with lowest error as the new version and set it up for staging. Note that this flow can fail when there are no experiments in our tracker. But that is okay as it serves as notification for us to look into our models.

In [None]:
model_to_stage = candidates[0]

registered_model = mlflow.register_model(
    model_uri=f"runs:/{model_to_stage.info.run_id}/model", 
    name='NYCRideDurationModel'
)

client.transition_model_version_stage(
    name='NYCRideDurationModel',
    version=registered_model.version, 
    stage='Staging',
)

```{figure} ../../../img/mlflow-automatic-staging.png
---
---
Staged model from code cells above.
```

### MLflow staging flow

This looks good, so we collect the above code cells along into a workflow which will create a new experiment, perform the experiment runs, and filter the best model for staging. We will then schedule this workflow to be run at fixed intervals using Prefect.

```{margin}
[`main.py`](https://github.com/particle1331/ok-transformer/blob/06a1ecfea5456430538d13cdebbbe4c23dbbe93c/docs/nb/mlops/3-prefect/main.py)
```
```python
@task
def stage_model(tracking_uri, experiment_name):
    """Register and stage best model."""

    # Get best model from current experiment
    client = MlflowClient(tracking_uri=tracking_uri)
    candidates = client.search_runs(
        experiment_ids=client.get_experiment_by_name(experiment_name).experiment_id,
        filter_string='metrics.rmse_valid < 6.5 and metrics.inference_time < 20e-6',
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=5,
        order_by=["metrics.rmse_valid ASC"]
    )

    # Register and stage best model
    best_model = candidates[0]
    registered_model = mlflow.register_model(
        model_uri=f"runs:/{best_model.info.run_id}/model", 
        name='NYCRideDurationModel'
    )

    client.transition_model_version_stage(
        name='NYCRideDurationModel',
        version=registered_model.version, 
        stage='Staging',
    )

    # Update description of staged model
    client.update_model_version(
        name='NYCRideDurationModel',
        version=registered_model.version,
        description=f"[{datetime.now()}] The model version {registered_model.version} from experiment '{experiment_name}' was transitioned to Staging."
    )

...

@flow(name='mlflow-staging', task_runner=SequentialTaskRunner())
def mlflow_staging(
    train_data_path, 
    valid_data_path, 
    num_xgb_runs=1
):    
    # Setup experiment
    ctx = get_run_context()
    MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
    EXPERIMENT_NAME = f"nyc-taxi-experiment-{ctx.flow_run.expected_start_time}"

    # Make experiment runs
    main(
        train_data_path=train_data_path, 
        valid_data_path=valid_data_path, 
        num_xgb_runs=num_xgb_runs, 
        experiment_name=EXPERIMENT_NAME,
        tracking_uri=MLFLOW_TRACKING_URI,
    )
    
    # Stage best model
    stage_model(
        tracking_uri=MLFLOW_TRACKING_URI, 
        experiment_name=EXPERIMENT_NAME
    )
```

Here `mlflow_staging` creates a new experiment every time it runs with name based on the scheduled start time (obtained using the run context `ctx`). It's important that injecting the correct datetime parameter (for identifying the experiment) is delegated to the orchestrator instead of the executing machine. For example, the clock on the executing machine might be rouge, in a different timezone, or the job may be queued for some reason so that execution time is delayed. This is the case for the test run in the following figure.


```{figure} ../../../img/flow-context.png
---
---
Run context for a test flow that simply logs the scheduled time. For some reason, execution is delayed so that the execution time differs from the scheduled time. Also, the orchestrator time is in UTC (standard) while the other values are in UTC+8 (system time).
```


Later we see that we can specify a parameter dictionary for passing parameter values in the flow during deployment. So it is good practice to run a flow with a test parameters dict. Thus, we append the script with the following:

```python
if __name__ == "__main__":
    
    parameters={
        "train_data_path": './data/green_tripdata_2021-01.parquet',
        "valid_data_path": './data/green_tripdata_2021-02.parquet',
        "num_xgb_runs": 3,
    }

    mlflow_staging(**parameters)
```

In [None]:
!python main.py

Let's check out `flashy-dove` in the Prefect Orion UI. Looks good. Here we see the `main` subflow for training and the `stage_model` task for staging the best model in the experiment that was just performed. This can all be checked in MLflow.

```{figure} ../../../img/flashy-dove.png
---
---
```

LGTM, let's deploy!

### Local storage setup

Before creating a deployment in Prefect let us first setup a local storage for saving for persisting flow code for deployments, task results, and flow results. This is simple enough to do. Take note of the storage identifier as this will be used later.

```bash
$ prefect storage create

Found the following storage types:
0) Azure Blob Storage
    Store data in an Azure blob storage container.
1) File Storage
    Store data as a file on local or remote file systems.
2) Google Cloud Storage
    Store data in a GCS bucket.
3) Local Storage
    Store data in a run's local file system.
4) S3 Storage
    Store data in an AWS S3 bucket.
5) Temporary Local Storage
    Store data in a temporary directory in a run's local file system.
Select a storage type to create: 3
You've selected Local Storage. It has 1 option(s).
STORAGE PATH: ~/.prefect/local-storage
Choose a new name for this storage configuration: local-storage
Registered storage 'local-storage' with identifier
'33133d27-a83b-468c-bb72-a9f19bd0d157'.
```

### Deployment specification

For the deployment, we run the `mlflow-staging` flow every 5 minutes locally. We also set the parameters for each run. This adds a bit of flexibility. For now, the datasets are fixed, but it makes sense to run this on fresh data so that we actually get an updated model at each scheduled run. 

```{margin}
[`deployments.py`](https://github.com/particle1331/ok-transformer/blob/85993956250601edeccf0d1bd5f192bb20873677/docs/nb/mlops/3-prefect/deployments.py)
```
```python
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from prefect.flow_runners import SubprocessFlowRunner
from datetime import timedelta


DeploymentSpec(
    name="deploy-mlflow-staging",
    flow_name='mlflow-staging',
    schedule=IntervalSchedule(interval=timedelta(minutes=1)),
    flow_location="./main.py",
    flow_storage="33133d27-a83b-468c-bb72-a9f19bd0d157", # local storage id
    flow_runner=SubprocessFlowRunner(),
    parameters={
        "train_data_path": './data/green_tripdata_2021-01.parquet',
        "valid_data_path": './data/green_tripdata_2021-02.parquet',
        "num_xgb_runs": 10,
    },
    tags=["ml"]
)

```

Specifying `SubprocessFlowRunner()` as flow runner, means that this flow is executed locally, e.g. not on Kubernetes or Docker containers. Here we specify the local storage identifier that we have just created above. Note that we specify the flow by name (`'mlflow-staging'`). This is why we defined `name='mlflow-staging'` in the decorator of `mlflow_staging`. To push our deployment to Prefect, we execute the following in the terminal:

In [None]:
!prefect deployment create deployments.py

```{figure} ../../../img/deploy-mlflow-staging.png
---
---
100 runs are now scheduled in Prefect.
```

### Adding workers to deployed workflows

Flows are now scheduled in Prefect. Notice that there are late runs. This is because we haven't attached any workers that will run these tasks. Unlike CI/CD platforms, all compute happens outside of Prefect that users will have to provide to run the scheduled workflows. So we create a **work queue** and fire up a **Prefect agent** to execute our deployment with our local compute. Note that the following setting up can also be done with the help of the UI.  

In [None]:
!prefect deployment ls

In [None]:
# Copying the deployment ID above
!prefect work-queue create \
    --deployment '271734ba-6e5e-4fa1-bf03-ab6801f7b44f' \
    --flow-runner subprocess \
    mlflow-deploy-runner

The following shows the scheduled runs for this worker. This also confirms that we chose the correct flow runner and deployment. 

In [None]:
# Copying the worker ID above
!prefect work-queue preview 3f80fde3-4390-413f-be3a-3d8da1913163

Running the worker now, so it picks up scheduled runs.

In [None]:
# Copying the worker ID above
!prefect agent start 3f80fde3-4390-413f-be3a-3d8da1913163

We stop the run since this will take a while. But we already see that it has completed one flow, while some flows are still running, some are late, and many are scheduled. There are late flows because we had 1 minute deployments and it took a while for us to create a worker. Another nice thing to notice with the worker works concurrently on late flows.

```{figure} ../../../img/deployment-1.png
---
---
```

```{figure} ../../../img/deployment-2.png
---
---
```

From the log on `00:58:47.290` we see that the completed flow `gleaming-honeybee` created Version 4 of the model `NYCRideDurationModel`. Indeed, we have this version staged in the model registry:

In [None]:
client.get_latest_versions(name='NYCRideDurationModel')