# Using Paralellization with `CustomBatchModel` in Okareo

<a target="_blank" href="https://colab.research.google.com/github/okareo-ai/okareo-cookbook/blob/main/tutorials/notebooks/custom_batch_model.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In this notebook, we show you how to use the `CustomBatchModel` class with parallelization to speed up your custom model evaluations in Okareo.

To parallelize our custom model calls, we will run a Hugging Face model on a GPU-enabled machine. If your custom model uses an API endpoint and that endpoint supports parallelized batch inference, then `CustomBatchModel` can support your use case as well.

## 🎯 Goals

After using this notebook, you will be able to:
- Upload a scenario to Okareo
- Define a pre-trained LLM as a `CustomBatchModel` in Okareo
- Vary `batch_size` to find the fastest configuration for your Okareo evaluation

## Upload a scenario to Okareo

In [7]:
# get Okareo client
from okareo import Okareo

OKAREO_API_KEY = "<YOUR_OKAREO_API_KEY>"
okareo = Okareo(OKAREO_API_KEY)

In [None]:
import os

# For making an ephemeral directory when downloading/uploading the scenario
import tempfile

webbizz_url = "https://raw.githubusercontent.com/okareo-ai/okareo-python-sdk/main/examples/webbizz_classification_questions.jsonl"
webbizz_questions = os.popen(f"curl {webbizz_url}").read()
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, "webbizz_classification_questions.jsonl")
with open(file_path, "w+") as file:
    file.write(webbizz_questions)

source_scenario = okareo.upload_scenario_set(file_path=file_path, scenario_name="Webbizz Questions Scenario")

print(source_scenario.app_link)

# clean up tmp file
os.remove(file_path)

## Download Phi-3 from Hugging Face

In [9]:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

model_id = "microsoft/Phi-3-mini-4k-instruct"

# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto"
)

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.pad_token = tokenizer.eos_token

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 2/2 [00:06<00:00,  3.36s/it]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [10]:
PROMPT_PREAMBLE = """### Instruction:
Given "Input", return a category under "Output" that is one of the following:

- returns
- pricing
- complaints

Return only one category that is most relevant to the question.
"""

## Define `CustomBatchModel` for Phi-3

In contrast with the `CustomModel` class, `CustomBatchModel` requires you to define an `invoke_batch` method, which takes an `input_batch` as an input. `input_batch` is a list of dictionaries taking the following form:

In [None]:
"""
input_batch = [
    {
        "id": "<UUID-FOR-SCENARIO-ROW-1>",
        "input_value": "what is the cost of a Webbizz membership?"
    },
    ...,
    {
        "id": "<UUID-FOR-SCENARIO-ROW-N>",
        "input_value": "how can I get help with a log in issue?"
    },
]
"""

Given an `input_batch`, `invoke_batch` returns a corresponding list of invocations with the following format:

In [None]:
"""
return [
    {
        "id": "<UUID-FOR-SCENARIO-ROW-1>",
        "model_invocation": ModelInvocation(
            model_prediction="pricing",
            model_input="what is the cost of a Webbizz membership?",
            model_output_metadata={ ... },
        )
    },
    ...,
    {
        "id": "<UUID-FOR-SCENARIO-ROW-N>",
        "input_value": 
        "model_invocation": ModelInvocation(
            model_prediction="complaints",
            model_input="how can I get help with a log in issue?",
            model_output_metadata={ ... },
        )
    },
]
"""

For more details on the `invoke_batch` interface, you can read the docstring below:

In [21]:
?CustomBatchModel.invoke_batch

[0;31mSignature:[0m
[0mCustomBatchModel[0m[0;34m.[0m[0minvoke_batch[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mself[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0minput_batch[0m[0;34m:[0m [0mlist[0m[0;34m[[0m[0mdict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mUnion[0m[0;34m[[0m[0mdict[0m[0;34m,[0m [0mlist[0m[0;34m,[0m [0mstr[0m[0;34m][0m[0;34m][0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0mlist[0m[0;34m[[0m[0mdict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mUnion[0m[0;34m[[0m[0mokareo[0m[0;34m.[0m[0mmodel_under_test[0m[0;34m.[0m[0mModelInvocation[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mAny[0m[0;34m][0m[0;34m][0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
method for taking a batch of scenario inputs and returning a corresponding batch of model outputs

arguments:
-> input_batch: list[dict[str, Union[dict, list, str]]] - batch o

Now, let's set up a `CustomBatchModel` to use our Hugging Face model in its `invoke_batch` method.

In [13]:
from okareo.model_under_test import CustomBatchModel, ModelInvocation

def format_instruction(sample):
	prompt = f"""{PROMPT_PREAMBLE}
### Input:
{sample}
 
### Output:
"""
	return prompt

mut_name = f"WebBizz Intent Detection - Phi-3-mini-4k (unquantized, zero-shot)"

class Phi3ModelUnquantized(CustomBatchModel):
    def __init__(self, name, batch_size):
        super().__init__(name, batch_size)
        self.categories = [
            "returns",
            "pricing",
            "complaints",
        ]

    def invoke_batch(self, input_batch):
        # unpack the input_values, ids from the batch
        input_values = [input_dict['input_value'] for input_dict in input_batch]
        scenario_ids = [input_dict['id'] for input_dict in input_batch]

        # format the inputs as instructions
        prompts = [format_instruction(value) for value in input_values]

        # perform batch inference using the HF model
        input_ids = tokenizer(
            prompts,
            return_tensors="pt",
            padding=True
        ).input_ids.cuda()
        outputs = model.generate(
            input_ids=input_ids,
            max_new_tokens=2, # change based on expected len of generations
            temperature=0.0,
            do_sample=False,
        )
        decoded_batch = tokenizer.batch_decode(
             outputs.detach().cpu().numpy(), skip_special_tokens=True
        )

        # pack the decoded generations in a list of dicts with id + ModelInvocation
        invocations = []
        for scenario_id, decoded, prompt, input_value in zip(scenario_ids, decoded_batch, prompts, input_values):
            pred = decoded[len(prompt):].strip() # only use generation past the instruction prompt
            res = "unknown"
            for cat in self.categories:
                if cat in pred:
                    res = cat
            invocations.append({
                'id': scenario_id,
                'model_invocation': ModelInvocation(
                    model_prediction=res,
                    model_input=input_value,
                    model_output_metadata=pred,
                )
            })
        return invocations

## Run `CustomBatchModel` with different batch sizes

Run the same evaluation with different values of `batch_size` to find the fastest configuration for our evaluation.

In [14]:
from time import time
from okareo_api_client.models.test_run_type import TestRunType

batch_sizes = [1, 2, 4, 8, 16, 32]

print(f'--- evaluation on train split ---')
print(f'batch_size | eval_time (s) | app_link')
for batch_size in batch_sizes:
    # Register the model to use in the test run
    start_time = time()
    model_under_test = okareo.register_model(
        name=mut_name,
        model=[
            Phi3ModelUnquantized(
                name=Phi3ModelUnquantized.__name__,
                batch_size=batch_size
            )
        ],
        update=True
    )

    eval_name = f"Intent Detection (batch_size={batch_size})"
    evaluation = model_under_test.run_test(
        name=eval_name,
        scenario=source_scenario.scenario_id,
        test_run_type=TestRunType.MULTI_CLASS_CLASSIFICATION,
        calculate_metrics=True,
    )
    eval_time = time() - start_time
    print(f"{batch_size} | {eval_time:3.2f} | {evaluation.app_link}")

--- evaluation on train split ---
batch_size | eval_time (s) | app_link




1 | 22.89 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/c8e52165-1d55-4560-a5c9-f94abe1a37f7
2 | 16.70 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/0eed5954-c659-401a-bf86-d9b015599052
4 | 14.09 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/d8e25bd1-1f09-4133-9d41-df32e241e736
8 | 12.25 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/1015fa14-3271-4595-bd0d-7eb79d84cc01
16 | 11.57 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/14c8d065-0cb7-4713-8b96-663a8b60e8de
32 | 11.28 | https://app.okareo.com/project/89920a9a-54cc-40c8-af68-9975e64e8d18/eval/48c835f6-ff6a-4d0c-9acb-15d54c32bae7


On our GPU, the fastest evaluation time is achieved at `batch_size=32`, which takes less than half the time of `batch_size=1`!