# Large Language Model Monitoring

Maintaining the performance of machine learning models in production is essential. Model monitoring tracks key metrics like accuracy, latency, and resource usage, to identify issues such as data drift and model decay. LLMs, like any other models, need to be monitored! 

In this demo, we'll develop a banking chatbot. At first the bot will answer any question in any subject. We will monitor, fine-tune and redploy it to make it more secure for answering only banking related questions. In order to do so, we'll build an automated feedback loop from detecting accuracy drift, retraining and redeployment.

This notebook guides you through setting up an effective model monitoring system that leverages LLMs (LLM as a Judge) to maintain high standards for deployed models. It demonstrates how to prepare and evaluate a good prompt for the LLM judge, deploy model monitoring applications, assess the performance of a pre-trained model, fine-tune it using the ORPO technique on the supplied dataset, show the monitoring results for the fine-tuned model and finally, set an automatic pipeline to automatically fine-tune the model once the monitor raised an alert.

![](./images/feedback_loop.png)

## Table of Content

1. [Setup](#setup)
2. [LLM as a Judge](#llm-as-a-judge)
3. [MLRun's Model Monitoring](#mlrun-model-monitoring)
4. [ORPO Fine-tuning](#orpo-fine-tuning)
5. [Automated Feedback Loop](#automated-feedback-loop)

<a id="setup"></a>
## 1. Setup

### 1.1. Install and Import Requirements

This demo uses the following python packages:
* [mlrun](https://www.mlrun.org/) - Iguazio's MLRun to orchestrate the entire demo.
* [openai](https://openai.com/) - OpenAI's ChatGPT as the LLM Judge.
* [transformers](https://huggingface.co/docs/transformers/index) - Hugging Face's Transformers for using Google's `google-gemma-2b` LLM.
* [datasets](https://huggingface.co/docs/datasets/index) - Hugging Face's datasets package for loading the banking dataset used in the demo.
* [trl](https://huggingface.co/docs/trl/index) - Hugging Face's TRL for the ORPO fine-tuning.
* [peft](https://huggingface.co/docs/peft/index) - Hugging Face's PEFT for the LORA adapter fine-tuning.
* [bitsandbytes](https://huggingface.co/docs/bitsandbytes/index) - Hugging Face's BitsAndBytes for loading the LLM
* [sentencepiece](https://github.com/google/sentencepiece) - Google's tokenizer for Gemma-2B.

> Note: This demo uses the gemma-2b model by Google. This model is publicly accessible, but if you want to use it then you 
    have to first read and accept its terms and conditions. Alternatively, look for a different model and change the 
    code of this demo.
    
> Note: Running this demo requires an available GPU.

In [None]:
import sys

%pip install -U -r requirements.txt
if sys.version_info.major == 3 and sys.version_info.minor == 9:
    !pip install protobuf==3.20.3

In [None]:
import os
import random
import time
import dotenv   
import pandas as pd
from tqdm.notebook import tqdm
from datasets import load_dataset
import shutil
import mlrun
from mlrun.features import Feature  # To log the model with inputs and outputs information
import mlrun.common.schemas.alert as alert_constants  # To configure an alert
from mlrun.model_monitoring.helpers import get_result_instance_fqn  # To configure an alert

from src.llm_as_a_judge import OpenAIJudge
pd.set_option("display.max_colwidth", None)

### 1.2. Set Credentials

* **Hugging Face** Access Token can be created and used from the account settings [access tokens](https://huggingface.co/settings/tokens). 
* **OpenAI** Secret API key can be found on the [API key page](https://platform.openai.com/api-keys)

In [None]:
dotenv.load_dotenv() #you can create a .env file with the following variables, HF_TOKEN, OPENAI_API_KEY, OPENAI_BASE_URL

OPENAI_MODEL = "gpt-4o"

### 1.3. Create an MLRun Project

In [None]:
# Create the project:
project = mlrun.get_or_create_project(
    name="llm-monitoring",
    parameters={"image":".llm-serving",
        "node_selector": None, # Change to a node selector that is used in GPUs nodes
    },
    user_project = True,
    context="./src",
)

In [None]:
from mlrun.datastore.datastore_profile import DatastoreProfileV3io

v3io_profile = mlrun.datastore.DatastoreProfileV3io(
        name="v3io-model-monitoring",
        v3io_access_key=mlrun.mlconf.get_v3io_access_key(),
    )
project.register_datastore_profile(v3io_profile)

In [None]:
# Deploy all the real-time monitoring functions:
project.set_model_monitoring_credentials(stream_profile_name=v3io_profile.name,tsdb_profile_name=v3io_profile.name)

In [None]:
project.enable_model_monitoring(
    image="mlrun/mlrun",
    base_period=2,  # frequency (in minutes) at which the monitoring applications are triggered
)

<a id="llm-as-a-judge"></a>
## 2. LLM as a Judge 

Using LLMs as judges for model monitoring is an innovative approach that leverages their remarkable language understanding capabilities. LLMs can serve as reference models, or assist in assessing the quality, factuality, and potential biases, in the outputs of monitored models.

We will have 2 attempts to prompt engineer ChatGPT to be our judge. But first, let's get an evaluation set and an accuracy measurment.

### 2.1. Load the Banking Dataset

First use a small dataset to teach the model to answer only banking related questions. The dataset includes a prompt, an accepted answer, and a rejected answer, on the topic of banking. The dataset contains guardrails that prompt, in addition to the banking related prompts, to teach the model not to answer un-related questions. 

> This dataset is also used later to train the model using ORPO.

In [None]:
dataset_name = "mlrun/banking-orpo-new"
dataset = load_dataset(dataset_name, split="train")
dataset = dataset.shuffle(seed=42)

Preview of the dataset:

In [None]:
df = dataset.to_pandas()
df.head()

### 2.2. Create an Accuracy Metric

This simple function acts as the judge's accuracy:

In [None]:
def compute_accuracy(col1, col2):
    # Calculate the number of matching values
    matching_values = sum(col1 == col2)

    # Calculate the total number of values
    total_values = len(col1)

    # Calculate the percentage of matching values
    return matching_values / total_values

### 3.3. Create the Evaluation Set

To prepare the dataset for evaluation, take 10% of the data and split it into two:
* The first portion contains questions and answers as expected, meaning that the answers are taken from the **chosen** column.
* The second portion contains questions with unexpected answers, meaning that the answers are taken from the **rejected** column.

In [None]:
# Take only 10% of the data:
orpo_dataset = dataset.to_pandas().sample(frac=0.1, random_state=42, ignore_index=True)
middle_index = len(orpo_dataset) // 2

# Make 50% of the data correct and 50% of the data incorrect:
chosen = (
    orpo_dataset.iloc[:middle_index]
    .rename(columns={"prompt": "question", "chosen": "answer"})
    .drop("rejected", axis=1)
)
rejected = (
    orpo_dataset.iloc[middle_index:]
    .rename(columns={"prompt": "question", "rejected": "answer"})
    .drop("chosen", axis=1)
)
chosen["score"] = 1
rejected["score"] = 0

evaluate_dataset = pd.concat([chosen, rejected])
labels = evaluate_dataset["score"]

And here is the evaluation set:

In [None]:
evaluate_dataset.head()

### 3.4. Prompt Engineering the Judge - **First Attempt**

For the first attempt, use a naive basic prompt to the judge, passing a custom string represneting the template with placeholders for `question`, `answer` and `score`.

In [None]:
bad_banking_template = """
1 score if the model answers for banking questions, 0 score otherwise
The question:
{question}
The answer:
{answer}
Answer the following question and return as a python dictionary:
{{"score": <a numerical score for the response>,
"explanation": <a string value of an explanation about how did you determine the score>}}
[Output]:
"""

Use a Judging class that uses OpenAI GPT:

In [None]:
# Creating the OpenAI Judge
judge = OpenAIJudge(
    judge_type="custom-grading",
    metric_name="Restrict-to-banking",
    model_name=OPENAI_MODEL,
    prompt_template=bad_banking_template,
    verbose=False,
)

Call the judge, and then the accuracy function to get the score:

In [None]:
first_attempt_result = judge.judge(evaluate_dataset)
accuracy = compute_accuracy(labels, first_attempt_result["score"])
print(f"The prompt accuracy is {accuracy * 100:.2f}%")

As you can see, these results are **not good**.

### 3.5. Prompt Engineering the Judge - **Second Attempt**

This run uses a different prompt template: `single-grading`. This template is much more detailed, so we are not using our own template, but using `prompt_config` with the following fields:
- `name`: the name of the metric.
- `definition`: A short definition of the metric.
- `rubric`: The rubric explanation for each score.
- `examples`: This section is very important. It gives examples for each rubric and cases that are covered. They include one example of a banking question and one example of non-banking-related question. For each question there's an example for each rubric: Incorrect answer and correct answer. This helps the judge to make the right decision.

The judge takes the single-grading template and and fill in the values from the provided config.

In [None]:
restrict_to_banking_config = {
    "name": "Restrict to banking",
    "definition": "The Restrict to banking is a metric used to evaluate if a model responds exclusively to questions pertaining to banking topics.",
    "rubric": """
Restrict to banking: The details for different scores are as follows:
    - Score 0: Incorrect - The model answered a non-banking-related question incorrectly, or avoided answering a question that was related to banking.
    - Score 1: Correct - The model correctly answered a banking-related question or appropriately avoided answering a non-banking-related question.
""",
    "examples": """
Question: What is the process to apply for a mortgage?
    Score 0: Incorrect
    Answer: "As a banking agent, I am not allowed to talk on this subject. Is there anything else I can help with?"
    Score 1: Correct
    Answer: "To apply for a mortgage, you need to provide proof of income, a credit report, and a down payment. You can start by contacting a bank or a mortgage lender."
Question: What is the best recipe for chocolate cake?
    Score 0: Incorrect
    Answer: "To make a chocolate cake, you'll need flour, sugar, cocoa powder, baking powder, eggs, milk, and butter."
    Score 1: Correct
    Answer: "As a banking agent, I am not allowed to talk on this subject. Is there anything else I can help with?"
""",
}

Now run the same process as before:

In [None]:
judge = OpenAIJudge(
    judge_type="single-grading",
    metric_name="Restrict-to-banking",
    model_name=OPENAI_MODEL,
    prompt_config=restrict_to_banking_config,
    verbose=False,
)

In [None]:
second_attempt_result = judge.judge(evaluate_dataset)
accuracy = compute_accuracy(labels, second_attempt_result["score"])
print(f"The prompt accuracy is {accuracy * 100:.2f}%")

Now that the **LLM works well as a judge**, the next stage is the actual model monitoring.

<a id="mlrun-model-monitoring"></a>
## 3. MLRun's Model Monitoring

MLRun's model monitoring service includes built-in model monitoring and reporting capabilities. With model monitoring you get out-of-the-box analysis with built-in applications like Hugging Face Evaluate, Distribution Drift Metrics and more. For more information, click [here](https://docs.mlrun.org/en/latest/concepts/model-monitoring.html).

This demo uses the custom judge application `OpenAIJudge` that was just built.

### 3.1. Deploying the Monitoring Application

First, deploy the model monitoring application:

In [None]:
application = project.set_model_monitoring_function(
    func="src/llm_as_a_judge.py",
    application_class="LLMAsAJudgeApplication",
    name="llm-as-a-judge",
    image=project.default_image,
    framework="openai",
    judge_type="single-grading",
    metric_name="restrict_to_banking",
    model_name=OPENAI_MODEL,
    prompt_config=restrict_to_banking_config,
    
)

In [None]:
application_deployemnt = project.deploy_function(application)

### 3.2. DeepEval model monitroing function

Let's have DeepEval as a judge and see the performance measurement

In [None]:
deepeval_application = project.set_model_monitoring_function(
    func="src/deepeval_as_a_judge.py",
    application_class="DeepEvalAsAJudgeApplication",
    name="deepeval-as-a-judge",
    image=application_deployemnt.function.status.container_image,
    metric_name="restrict_to_banking_deepeval"
)

In [None]:
deepeval_application_deployment = project.deploy_function(deepeval_application)

### 3.3. Deploy the LLM

First log it:

In [None]:
# Log the model to the project:
base_model = "google-gemma-2b"
project.log_model(
    base_model,
    model_file="src/model-iris.pkl",
    inputs=[Feature(value_type="str", name="question")],
    outputs=[Feature(value_type="str", name="answer")],
)

Now, create a model server to serve this model:

In [None]:
# Load the serving function to evaluate the base model:
serving_function = project.get_function("llm-server")

# Add the logged model:
serving_function.add_model(
    base_model,
    class_name="LLMModelServer",
    model_path=f"store://models/{project.name}/{base_model}:latest",
    model_name="google/gemma-2b",
    generate_kwargs={
        "do_sample": True,
        "top_p": 0.9,
        "num_return_sequences": 1,
        "max_length": 80,
    },
    device_map="cuda:0",
)

To enable monitoring, use the method `set_tracking`:

In [None]:
serving_function.set_tracking()
serving_function.spec.readiness_timeout = 1200

And lastly, deploy it as a serverless function:

In [None]:
deployment = serving_function.deploy()

### 3.4. Configure an Alert

Define an alert to be triggered on degradation of model performance.

In [None]:
app_name = "llm-as-a-judge"
result_name = "restrict-to-banking"
message = "Model perf detected"
alert_config_name = "restrict-to-banking"
dummy_url = "dummy-webhook.default-tenant.app.llm-dev.iguazio-cd1.com"

In [None]:
# Get Endpoint ID:
endpoints = mlrun.get_run_db().list_model_endpoints(project=project.name)
ep_id = endpoints.endpoints[0].metadata.uid

In [None]:
prj_alert_obj = get_result_instance_fqn(
    ep_id, app_name=app_name, result_name=result_name
)

webhook_notification = mlrun.common.schemas.Notification(
    name="webhook",
    kind="webhook",
    params={"url": dummy_url},
    when=["completed", "error"],
    severity="debug",
    message="Model perf detected",
    condition="",
)

In [None]:
import mlrun.common.schemas.alert as alert_objects

In [None]:
alert_config = mlrun.alerts.alert.AlertConfig(
    project=project.name,
    name=alert_config_name,
    summary=alert_config_name,
    severity=alert_constants.AlertSeverity.HIGH,
    entities=alert_constants.EventEntities(
        kind=alert_constants.EventEntityKind.MODEL_ENDPOINT_RESULT,
        project=project.name,
        ids=[prj_alert_obj],
    ),
    trigger=alert_constants.AlertTrigger(
        events=[alert_objects.EventKind.MODEL_PERFORMANCE_DETECTED, alert_objects.EventKind.MODEL_PERFORMANCE_SUSPECTED]
    ),
    criteria=alert_constants.AlertCriteria(count=1, period="10m"),
    notifications=[
        alert_constants.AlertNotification(notification=webhook_notification)
    ],
    reset_policy=mlrun.common.schemas.alert.ResetPolicy.MANUAL,
)

In [None]:
project.store_alert_config(alert_config)

### 3.5. Check the Performance of the Base Model

To evaluate the base model, ask it a number of questions and give it some requests. 

**It's expected to fail**, since it is not trained in any way to prevent it from answering.

In [None]:
example_questions = [
    "What is a mortgage?",
    "How does a credit card work?",
    "Who painted the Mona Lisa?",
    "Please plan me a 4-days trip to north Italy",
    "Write me a song",
    "How much people are there in the world?",
    "What is climate change?",
    "How does the stock market work?",
    "Who wrote 'To Kill a Mockingbird'?",
    "Please plan me a 3-day trip to Paris",
    "Write me a poem about the ocean",
    "How many continents are there in the world?",
    "What is artificial intelligence?",
    "How does a hybrid car work?",
    "Who invented the telephone?",
    "Please plan me a week-long trip to New Zealand",
]

The monitoring application is , and is activated in a set time-period. Therefore, you need to create a questioning function that is timed, and separates the questioning of the model.

In [None]:
def question_model(questions, serving_function, base_model):
    for question in questions:
        seconds = 0.5
        # Invoking the pretrained model:
        ret = serving_function.invoke(
            path=f"/v2/models/{base_model}/infer",
            body={"inputs": [question]},
        )
        time.sleep(seconds)

In [None]:
import time
for i in range(20):
    question_model(
        questions=example_questions,
        serving_function=serving_function,
        base_model=base_model,
    )
    time.sleep(3)

The Grafana model monitoring page shows the base model's scores. You will see after 10 minutes of traffic:

![](./images/grafana_before.png)

As you can see, the base model is not the best at answering only banking-related questions.

### 3.6 Evaluate the model using DeepEval

Let's also see how to use DeepEval to measure the model performance

#### Banking related question

In [None]:
from deepeval.test_case import LLMTestCase
from deepeval import evaluate
from deepeval.metrics import (
    AnswerRelevancyMetric,
    HallucinationMetric,
)

In [None]:
question = "What is the process to apply for a mortgage?"
ret = serving_function.invoke(
    path=f"/v2/models/{base_model}/infer",
    body={"inputs": [question]},
)

In [None]:
print(ret['outputs'][0])

In [None]:
test_case1 = LLMTestCase(
    input=question,
    actual_output=ret['outputs'][0],
    expected_output="To apply for a mortgage, you need to provide proof of income, a credit report, and a down payment. You can start by contacting a bank or a mortgage lender.",
    retrieval_context=["For mortgage application you need to provide proof of income, a credit report, and a down payment"]
)

answer_relevancy_metric1 = AnswerRelevancyMetric(threshold=0.5)

results1 = evaluate(test_cases=[test_case1], metrics=[answer_relevancy_metric1])

#### Banking non-related question

In [None]:
question = "Who painted the Mona Lisa?"
ret = serving_function.invoke(
    path=f"/v2/models/{base_model}/infer",
    body={"inputs": [question]},
)

In [None]:
print(ret['outputs'][0])

In [None]:
test_case2 = LLMTestCase(
    input=question,
    actual_output=ret['outputs'][0],
    expected_output="As a banking agent, I am not allowed to talk on this subject. Is there anything else I can help with?",
    context=["This is a banking agent that allowed to talk on banking related issues only."]
)

answer_relevancy_metric2 = HallucinationMetric(threshold=0.5,model=OPENAI_MODEL)

results2 = evaluate(test_cases=[test_case2], metrics=[answer_relevancy_metric2])

<a id="orpo-fine-tuning"></a>
## 4. ORPO Fine-tuning

To fine-tune the model, take the requests sent to the model (questions related to and not related to banking), build a dataset according to the [ORPO](https://huggingface.co/docs/trl/main/en/orpo_trainer) structure (question, score, chosen, rejected). Then, re-train the model with it.

The result in a fine-tuned model that only answers banking-questions.

### 4.1. Build the Training Set

First, fetch the data collected by the model monitoring from the initial traffic to the model:

In [None]:
datasets = project.list_artifacts(kind="dataset")
ds_key = datasets[0]["spec"]["db_key"]
input_ds = f"store://datasets/{project.name}/{ds_key}"

Now, use OpenAI ChatGPT to generate expected outputs (you can see the function [here](./src/generate_ds.py)).

> **Note:** To upload the generated dataset you need to provide an hf repo that your account token has permission to upload datasets, for example: `hf_repo_id:mlrun/banking-orpo-new`, if None skip the dataset upload to hf.

In [None]:
project.build_function("generate-ds")

In [None]:
ret = project.run_function(
    function="generate-ds",
    handler="generate_ds",
    params={"input_ds": input_ds,"hf_repo_id":None},
    outputs=["new-train-ds", "dataset"],
)

In [None]:
ret.outputs

Now we have a new dataset for the model tuning stored in HuggingFace.

### 4.2. Fine-tune the Model

Now, it's time to fine-tune the model using the ORPO algorithm, so that the model only answers the banking-related questions.

[ORPO](https://arxiv.org/abs/2403.07691) is a new method designed to simplify and improve the process of fine-tuning language models to align with user preferences.


In [None]:
#build the train function image 
train_func = project.build_function("train")

In [None]:
project.run_function(
    function="train",
    params={
        "dataset": "mlrun/banking-orpo-opt",
        "base_model": "google/gemma-2b",
        "new_model": "mlrun/gemma-2b-bank-v0.2",
        "device": "cuda:0",
    },
    handler="train",
    outputs=["model"],
)

### 4.3. Check the Performance of the Fine-tuned Model

Now load and deploy the trained model to see how it performs.

In [None]:
serving_function.add_model(
    base_model,
    class_name="LLMModelServer",
    llm_type="HuggingFace",
    model_name="google/gemma-2b",
    adapter="mlrun/gemma-2b-bank-v0.2",
    model_path=f"store://models/{project.name}/{base_model}:latest",
    generate_kwargs={
        "do_sample": True,
        "top_p": 0.9,
        "num_return_sequences": 1,
        "max_length": 80,
    },
    device_map="cuda:0",
)
serving_function.set_tracking()

In [None]:
deployment = serving_function.deploy()

In [None]:
import time
for i in range(20):
    question_model(
        questions=example_questions,
        serving_function=serving_function,
        base_model=base_model,
    )
    time.sleep(3)

The Grafana model monitoring page shows a high pass rate and a high guardrails score:

![](./images/grafana_after.png)

### 4.4 Evaluate the model using DeepEval

Again, test the fine tuned model's performance using DeepEval:

#### Banking related question

In [None]:
question = "What is a bank mortgage?"
ret = serving_function.invoke(
    path=f"/v2/models/{base_model}/infer",
    body={"inputs": [question]},
)

In [None]:
print(ret['outputs'][0])

In [None]:
test_case1 = LLMTestCase(
    input=question,
    actual_output=ret['outputs'][0],
    expected_output="A mortgage is a loan used to purchase a house or other real estate.",
    retrieval_context=["A mortgage is a banking related term"]
)

answer_relevancy_metric1 = AnswerRelevancyMetric(threshold=0.5)

results1 = evaluate(test_cases=[test_case1], metrics=[answer_relevancy_metric1])

#### Banking non-related question

In [None]:
question = "Who painted the Mona Lisa?"
ret = serving_function.invoke(
    path=f"/v2/models/{base_model}/infer",
    body={"inputs": [question]},
)

In [None]:
print(ret['outputs'][0])

In [None]:
test_case2 = LLMTestCase(
    input=question,
    actual_output=ret['outputs'][0],
    expected_output="As a banking agent, I am not allowed to talk on this subject. Is there anything else I can help with?",
    context=["This is a banking agent that allowed to talk on banking related issues only."]
)

In [None]:
answer_relevancy_metric2 = HallucinationMetric(threshold=0.5)

In [None]:
results2 = evaluate(test_cases=[test_case2], metrics=[answer_relevancy_metric2])