### (Optional) Use in a continuous monitoring and re-training pipeline

Finally, let’s take a quick look at how we would finish “closing the loop” by using the labeled data in a continuous monitoring and re-training pipeline. We will want to automate the process of:

-   creating tasks in Label Studio out of data from production
-   evaluating performance on labeled data sampled from production
-   and re-training on data from production

There are a variety of ways in which we can realize this goal. We will use Apache Airflow, a workflow orchestrator, to manage this pipeline on a schedule. (Airflow is a good fit for a Docker environment; in a Kubernetes environment, we might prefer to use Argo Events + Argo Workflow.)

First, let’s bring up Airflow:

``` bash
# runs on node-eval-loop
docker compose -f eval-loop-chi/docker/docker-compose-airflow.yaml up -d
```

When it comes up, a web UI will be on port 8081. (Airflow runs a web server on port 8080 by default, but since we already have Label Studio on port 8080, we used the Docker compose to map it to port 8081 instead.)

In a browser, open

    http://A.B.C.D:8081

substituting the floating IP assigned to your instance in place of `A.B.C.D`. Log in with username `airflow@example.com` and password `airflow` (we have created an initial user with these credentials in our Docker compose file).

Airflow is a workflow orchestrator for running any pipeline that represented as a DAG - directed acyclic graph. Here’s an example of basic DAG for Airflow:

``` python
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="test_dag_hello_world",
    start_date=datetime.today() - timedelta(days=1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    t1 = EmptyOperator(task_id="start")
```

This Python script defines a DAG called “test_dag_hello_world”

-   that starts one day ago (e.g. it is allowed to run as of one day ago; it was not allowed to run before that date)
-   that is scheduled to run daily
-   and doesn’t “catch up” on past runs, e.g. if I set the start date to one year ago, it wouldn’t run 365 times to make up for the missing runs!)

The actual DAG just has one “node”, `t1`, and that runs Airflow’s built-in `EmptyOperator` which acts as a no-op placeholder task.

When we place a Python file in the Airflow DAGs folder, Airflow scans that file at regular intervals to discover DAG definitions. If a file has top-level variables that are instances of the DAG class, Airflow adds it to its metadata database and displays it in the web UI.

In the Airflow web UI, the `test_dag_hello_world` DAG should be visible in DAGs tab. It runs on a schedule, but we can also trigger it manually - press the ▶ button to trigger it now. Confirm that it runs sucessfully.

Now, let’s run a “real” pipeline. Our first pipeline will:

-   Get objects from the “production” bucket that have been uploaded in the intervening interval since the last DAG run.
-   Sample from them to get tasks to send to Label Studio, including: a random sample of all images, the low confidence images, the flagged images, and images that have been re-labeled by the user.
-   Move these to a “production-label-wait” bucket, then generate tasks for Label Studio.
-   and, move the remaining (not selected for labeling) high=confidence images from the list to a “production-noisy” bucket.

The “production-noisy” bucket will be used for model re-training. We consider it “noisy” because its labels are generated by the model itself, not by human; but we will add human-labeled data to it when available, in the next pipeline.

This DAG will take advantage of Airflow’s built in “data interval” idea, which lets the DAG know what time interval of data it is responsible for processing according to its schedule:

    start = context['data_interval_start']
    end = context['data_interval_end']

although if we trigger it manually from the web UI, that won’t apply, so then we would just use a recent half-hour window.

The DAG will also include a task that initializes the “production-label-wait” and “production-noisy” buckets if they do not yet exist, and it will create a Label Studio project for “Food11 Continuous X” if it does not yet exist.

Click through to this DAG, and look at it in both Code view and Graph view.

Then, upload 10-20 images to the Flask application. For at least a few images, correct the class label. Make sure you have uploaded a few images for which the model is known to have low confidence.

Trigger the DAG manually in the web interface. Observe the effect in MinIO and in Label Studio.

In Label Studio, label some of the images in the “Food11 Continuous X” project. Then, we can run the next stage, which:

-   Gets the new labels from Label Studio
-   Copy the newly labeled images to “production-clean” and “production-noisy” buckets, and remove them from “production-label-wait”
-   Compute the accuracy on the batch of data

Click through to this second stage DAG, and look at it in both Code view and Graph view.

Trigger the DAG manually in the web interface. Observe the effect in MinIO.

Airflow is an extremely capable platform, and we have barely scratched the surface of what we can do with it - but now, we have the basic pieces of a continuous monitoring and re-training pipeline in place!

We have a “production-clean” bucket suitable for evaluation (with reliable labels generated by our human annotator) and a “production-noisy” bucket suitable for re-training, with labels that many not be accurate (since most are labeled by our own model!)

We could extend this pipeline to -

-   push the batch accuracy to Prometheus, for continuous monitoring
-   trigger re-training

but we’ll stop here for now, since we have not set up our training and monitoring infrastructure in this experiment.