# "MLOps project - part 2a: Machine Learning Workflow Orchestration using Prefect"
> "Machine learning workflow orchestration using Prefect."

- toc: True
- branch: master
- badges: true
- comments: true
- categories: [mlops]
- image: images/some_folder/your_image.png
- hide: false
- search_exclude: true

In the previous blog post, we learned how to use MLflow to train a model and track experiments.
In the second post of this series, we will convert the code from the previous phase into a machine learning pipeline. I'll demonstrate how to complete the task using two popular tools: Prefect and ZenML. There are several incredible tools that we cannot include in this article, such as Flyte, Kale, Aro, etc.

We begin with Prefect and demonstrate how to utilize it locally and on Google Cloud. The same is then performed using ZenML.

But why do our machine learning services need a pipeline? The ZenML manual describes it in detail [[source](https://github.com/zenml-io/zenbytes)]:

> As an ML practitioner, you are probably familiar with building ML models using Scikit-learn, PyTorch, TensorFlow, or similar. An **[ML Pipeline](https://docs.zenml.io/developer-guide/steps-and-pipelines)** is simply an extension, including other steps you would typically do before or after building a model, like data acquisition, preprocessing, model deployment, or monitoring. The ML pipeline essentially defines a step-by-step procedure of your work as an ML practitioner. Defining ML pipelines explicitly in code is great because: <br>
> - We can easily rerun all of our work, not just the model, eliminating bugs and making our models easier to reproduce.
> - Data and models can be versioned and tracked, so we can see at a glance which dataset a model was trained on and how it compares to other models.
> - If the entire pipeline is coded up, we can automate many operational tasks, like retraining and redeploying models when the underlying problem or data changes or rolling out new and improved models with CI/CD workflows.



We may have extensive preprocessing that we do not want to repeat every time we train a model, such as in the last blog post where we generated the 'corpus' list.
We may also need to compare the performance of different models, or wish to deploy the model and monitor data and model performance. Here, ML pipelines come into play, allowing us to specify our workflows as a series of modular processes that can subsequently be combined.

Additionally, we may have a machine learning pipeline that we would like to execute every week. We can put it on a timetable, and if the machine learning model fails or the incoming data fails, we can analyze and resolve the issues.

Let's consider a standard machine learning pipeline:

![](images/workflow-orchestration/1.png)
*[source](https://www.youtube.com/watch?v=eKzCjNXoCTc&list=PL3MmuxUbc_hIUISrluw_A7wDSmfOhErJK&index=22)*

Initially, we have a postgresql database and possibly a process that writes data to a parquet file. Next, we use pandas to consume the parquet file and merge it with the API data we're pulling.
After training the model, we register the artifact and conduct experiments with MLflow; if specific conditions are met, we may deploy the model using Flask, for example.
Clearly, these phases are interconnected; if one fails, the entire pipeline will be impacted.
Failure can occur in even unforeseen ways. For example, inbound data is flawed, the API fails to connect at random, and the same is true for MLflow. Problems may arise if you are using a database to store MLflow artifacts, such as experiments. Workflow orchestration is intended to mitigate the impact of these problems and assist in their resolution.

![](images/workflow-orchestration/2.png)
*[source](https://www.youtube.com/watch?v=eKzCjNXoCTc&list=PL3MmuxUbc_hIUISrluw_A7wDSmfOhErJK&index=22)*


All of these will aid the organization and its developers in completing their tasks and locating issues more quickly, allowing them to devote their attention to something more vital.

Great! let's see how we can do it in practice. First, let's see how Prefect can help us.

# Prefect

>Prefect is air traffic control for the modern data stack. Monitor, coordinate, and orchestrate dataflows between and across your applications. Build pipelines, deploy them anywhere, and configure them remotely. You might just love your workflows again.
> If you move data, you probably need the following functionality:
> - schedules
> - retries
> - logging
> - caching
> - notifications
> - observability <br>
> 
> Implementing all of these features for your dataflows is a huge pain that takes a lot of time — time that could be better used for functional code.
> That's why Prefect 2.0 offers all this functionality and more!


We will use Prefect 2.0 (Orion) here. You can easily install prefect using:

```bash
pip install prefect
```

We will add Prefect to a Python script containing our code. I will continue with the Keras code from the previous post, although the method is identical for other Scikit-Learn packages. Essentially, we obtain the prior code including all MLflow-related information and convert it into functions as our pipeline steps. Converting the python functions to Prefect steps and flow is as easily as wrapping the function using `@task` and `@flow` decorators. In our case, the code for training the model might look as follows:

```python
import numpy as np
import pandas as pd
import os
import nltk
import re
if os.path.exists('./corpora'):
    os.environ["NLTK_DATA"] = "./corpora"
else:
    nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import mlflow
import pickle
from prefect import flow, task

## data loading
@task
def read_data(filename='Womens Clothing E-Commerce Reviews.csv'):
    data = pd.read_csv(filename,index_col =[0])
    print("Data loaded.\n\n")
    return data

## preprocess text
@task
def preprocess_data(data):
    #check if data/corpus is created before or not
    if not os.path.exists('data/corpus_y.pickle'):
        print("Preprocessed data not found. Creating new data. \n\n")
        data = data[~data['Review Text'].isnull()]  #Dropping columns which don't have any review
        X = data[['Review Text']]
        X.index = np.arange(len(X))

        y = data['Recommended IND']

        corpus =[]
        for i in range(len(X)):
            review = re.sub('[^a-zA-z]',' ',X['Review Text'][i])
            review = review.lower()
            review = review.split()
            ps = PorterStemmer()
            review =[ps.stem(i) for i in review if not i in set(stopwords.words('english'))]
            review =' '.join(review)
            corpus.append(review)

        with open('data/corpus_y.pickle', 'wb') as handle:
            pickle.dump((corpus, y), handle)
    else:
        print("Preprocessed data found. Loading data. \n\n")
        with open('data/corpus_y.pickle', 'rb') as handle:
            corpus, y = pickle.load(handle)

    print("Data preprocessed.\n\n")

    return corpus, y

## tokenization and dataset creation
@task
def create_dataset(corpus, y, test_size=0.2, random_state=0):
    tokenizer = Tokenizer(num_words = 3000)
    tokenizer.fit_on_texts(corpus)

    sequences = tokenizer.texts_to_sequences(corpus)
    padded = pad_sequences(sequences, padding='post')

    X_train, X_test, y_train, y_test = train_test_split(padded, y, test_size = 0.20, random_state = 0)

    print("Dataset created.\n\n")
    return X_train, X_test, y_train, y_test, tokenizer

# mlflow.tensorflow.autolog()
@task
def model_training(X_train, y_train, X_test, y_test, tokenizer):
    for embedding_dim, batch_size in zip([32, 64, 128], [32, 64, 128]):
        with mlflow.start_run():
            ## model definition
            model = tf.keras.Sequential([
                tf.keras.layers.Embedding(3000, embedding_dim),
                tf.keras.layers.GlobalAveragePooling1D(),
                tf.keras.layers.Dense(6, activation='relu'),
                tf.keras.layers.Dense(1, activation='sigmoid')
            ])

            ## training
            num_epochs = 50
            callback = tf.keras.callbacks.EarlyStopping(
                monitor="val_loss",
                min_delta=0,
                patience=2,
                verbose=0,
                mode="auto",
                baseline=None,
                restore_best_weights=False,
            )

            model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy'])

            mlflow.set_tag("developer", "Isaac")
            mlflow.set_tag("algorithm", "Deep Learning")
            mlflow.log_param("train-data", "Womens Clothing E-Commerce Reviews")
            mlflow.log_param("embedding-dim", embedding_dim)

            print("Fit model on training data")
            model.fit(
                X_train,
                y_train,
                batch_size=batch_size,
                epochs=num_epochs,
                callbacks=callback,
                # We pass some validation for
                # monitoring validation loss and metrics
                # at the end of each epoch
                validation_data=(X_test, y_test),
            )

            ## save model and tokenizer
            # model.save('models/model_dl.h5')
            mlflow.keras.log_model(model, 'models/model_dl')

            with open('models/tf_tokenizer.pickle', 'wb') as handle:
                pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)
            
            mlflow.log_artifact(local_path="models/tf_tokenizer.pickle", artifact_path="tokenizer_pickle")

            # Evaluate the model on the test data using `evaluate`
            print("Evaluate on test data")
            results = model.evaluate(X_test, y_test, batch_size=128)
            print("test loss, test acc:", results)
            mlflow.log_metric("loss", results[0])
            mlflow.log_metric("accuracy", results[1])

    print("Model training completed.\n\n")

@flow
def main():
    tracking_uri = "sqlite:///mlflow.db"
    model_name = "customer-sentiment-analysis"
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(model_name)
    
    data = read_data()
    corpus, y = preprocess_data(data)
    X_train, X_test, y_train, y_test, tokenizer = create_dataset(corpus, y)
    model_training(X_train, y_train, X_test, y_test, tokenizer)

if __name__ == '__main__':
    main()
```

When you trained the mode, you can go to MLflow UI and decide if you want to change the model in the `Production` stage or not. Then you can easily load the model in the `Production` stage using a code snippet like this and evaluate it as you wish (maybe creating another pipeline for that):

```python
from mlflow.tracking import MlflowClient

def get_best_model(model_name, client):
    model = mlflow.keras.load_model(f"models:/{model_name}/production", dst_path=None)
    for mv in client.search_model_versions(f"name='{model_name}'"):
        if dict(mv)['current_stage'] == 'Production':
            run_id = dict(mv)['run_id']

    artifact_folder = "models_pickle" #tokenizer_pickle
    client.download_artifacts(run_id=run_id, path=artifact_folder, dst_path='.')
    with open(f"{artifact_folder}/tf_tokenizer.pickle", 'rb') as handle:
        tokenizer = pickle.load(handle)
    
    print("Model and tokenizer loaded.\n\n")
    return model, tokenizer

def test_model(model, X_test, tokenizer):
    # Generate predictions (probabilities -- the output of the last layer)
    # on new data using `predict`
    print("Generate predictions for 3 samples")
    predictions = model.predict(X_test[:3])
    print("predictions shape:", predictions.shape)

    sample_string = "I Will tell my friends for sure"
    sample = tokenizer.texts_to_sequences(sample_string)
    padded_sample = pad_sequences(sample, padding='post').T
    sample_predict = model.predict(padded_sample)
    print(f"model prediction for input: {sample_string} \n {sample_predict}")
    
if __name__ == '__main__':
    tracking_uri = "sqlite:///mlflow.db"
    model_name = "customer-sentiment-analysis"
    client = MlflowClient(tracking_uri=tracking_uri)
    model, tokenizer = get_best_model(model_name, client)
    test_model(model, X_test, tokenizer)
```

With wrapping the functions into Prefect, you will get more logs, which helps to observe and debug the pipeline. You can also set number of retries for some tasks using `@task(retries=3)`, in case something happens and the task cannot be completed.

You can then run the following command to see the Prefect UI dashboard:

```bash
prefect orion start
```

For our code, here is the screenshot:

![](images/workflow-orchestration/4.png)
<!-- *[source](https://www.youtube.com/watch?v=eKzCjNXoCTc&list=PL3MmuxUbc_hIUISrluw_A7wDSmfOhErJK&index=22)* -->


You can see the logs for different tasks and the flow for our code. You can also get much more information from the dashboard. So don't hesitate to play around. 

There are a lot of interesting features in Prefect. I really like the concurrency, parallelization, and the async support. There is also a concept of task runner in Prefect [[source](https://docs.prefect.io/concepts/task-runners/)]:

> Calling a task function from within a flow, using the default task settings, executes the function sequentially. Execution of the task function blocks execution of the flow until the task completes. This means, by default, calling multiple tasks in a flow causes them to run in order. <br>
However, that's not the only way to run tasks! <br>
You can use the `.submit()` method on a task function to submit the task to a task runner. Using a task runner enables you to control whether tasks run sequentially, concurrently, or if you want to take advantage of a parallel or distributed execution libraries such as Dask or Ray.

>Prefect currently provides the following built-in task runners:
> - **SequentialTaskRunner** can run tasks sequentially.
> - **ConcurrentTaskRunner** can run tasks concurrently, allowing tasks to switch when blocking on IO. Tasks will be submitted to a thread pool maintained by anyio. <br>
>
> In addition, the following Prefect-developed task runners for parallel or distributed task execution may be installed as Prefect Collections.
> - **DaskTaskRunner** can run tasks requiring parallel execution using dask.distributed.
> - **RayTaskRunner** can run tasks requiring parallel execution using Ray.


In our case, I don't want to use this features and just want to run tasks sequentially, which is the default setting.

After doing all of these steps locally, we can also deploy Prefect on Cloud. I will show how to do deployment and scheduling and maybe more in the next blog post. That's enough for this blog post. Hope it was useful.




> youtube: https://youtu.be/eKzCjNXoCTc

> youtube: https://youtu.be/Yb6NJwI7bXw

> youtube: https://youtu.be/MCFpURG506w
