## Dependencies

In [None]:
import os
import sys

sys.path.append(os.path.join(os.getcwd(), ".."))


In [None]:
%load_ext autoreload
%autoreload 2

import json
import logging
import os

from medbench.evaluators import (
    EvaluatorRunner,
    SummaryEvaluatorRunner,
    MultimodalEvaluatorRunner,
    ABEvaluatorRunner,
)
from medbench.metrics import (
    calculate_exact_match_metrics,
    calculate_image_metrics,
    calculate_summarization_metrics,
)
from medbench.models import Model, ModelRegistry, ModelRun, OpenAIReasoningModel, Runner
from medbench.config import settings

## Calculate metrics

### Load data

In [None]:
DATA_PATH = "../data/azf/metrics"
METRICS_INPUT_PATH = os.path.join(
    DATA_PATH,
    "vqarad_gpt4o_eval_metrics_input.json",
)

In [None]:
metrics_type: str
model_run: ModelRun
with open(METRICS_INPUT_PATH, "r") as f:
    data = json.load(f)


metrics_type = data.get("metrics_type")
model_run = ModelRun.from_json(data.get("model_run"))

model_run.id

In [None]:
model_run.model

In [None]:
model_run.model.name, metrics_type, f"Instances to evaluate: {len(model_run.results)}"

### Calculate

In [None]:
metrics: list[dict[str, float]]
if metrics_type == "exact_match":
    metrics = calculate_exact_match_metrics(model_run)
elif metrics_type == "summarization":
    metrics = calculate_summarization_metrics(model_run)
elif metrics_type == "image_match":
    metrics = calculate_image_metrics(model_run)

In [None]:
metrics

## Model as a judge

In [None]:
DATA_PATH = "../../"

SAMPLE_FULL_VALIDATION_JOB_JSON = os.path.join(
    DATA_PATH, "sample_full_validation_job.json"
)

SAMPLE_AB_TESTING_JOB_JSON = os.path.join(
    DATA_PATH, "sample_ab_testing_job.json"
)

In [None]:
import attrs
from medbench.datasets import (
    Data,
    Dataset,
    EMediaObjectType,
    Instance,
    MediaObject,
)


async def run_summary_evaluator(model_run, llm_evaluator, questions_generator_runner=None, output_instructions=""):
    """
    Run SummaryEvaluatorRunner.
    
    Args:
        model_run: The model run to evaluate
        llm_evaluator: The LLM evaluator model
        questions_generator_runner: Optional pre-existing questions generator
        output_instructions: Optional output specifications to inject
        
    Returns:
        SummaryEvaluatorRunner: The completed evaluator instance
    """
    try:
        # Initialize SummaryEvaluatorRunner
        kwargs = {}
        
        # Use provided questions generator if available
        if questions_generator_runner is not None:
            kwargs["questions_generator_runner"] = questions_generator_runner

        # Inject output instructions if provided
        if output_instructions:
            kwargs["output_specs_prompt"] = output_instructions

        evaluator = SummaryEvaluatorRunner(
            predictions_model_run=model_run,
            evaluator=llm_evaluator,
            skip_errors=True,
            **kwargs,
        )

        # Run summary evaluation
        await evaluator.evaluate()

        # Return the evaluator itself
        return evaluator

    except Exception as e:
        logging.error(f"Error in Summary evaluation: {str(e)}")
        raise


async def _process_model_run_async(
    model_run, llm_evaluator, questions_generator_runner=None, output_instructions=""
):
    """Helper function to process model run using SummaryEvaluatorRunner with injected output instructions."""
    try:
        evaluator = await run_summary_evaluator(
            model_run, llm_evaluator, questions_generator_runner, output_instructions
        )
        # Extract the evaluation result text
        return evaluator.evaluator_runner._model_run.results[0].completions.get_text()
    except Exception as e:
        logging.error(f"Error in Summary evaluation: {str(e)}")
        raise


async def _process_ab_testing_async(model_run, llm_evaluator, output_instructions):
    """Helper function to process A/B testing using SummaryEvaluatorRunner + MultimodalEvaluatorRunner."""
    try:
        # Step 1: Run vanilla SummaryEvaluatorRunner for each model separately
        model_runs = []
        questions_generator_runner = None  # Placeholder for questions generator if needed
        for i, result in enumerate(model_run.results):
            # Create a single-model run for each result
            single_model_run = attrs.evolve(
                model_run, id=f"{model_run.id}_model_{i}", results=[result]
            )

            # Process with vanilla SummaryEvaluatorRunner using the shared function
            evaluator = await run_summary_evaluator(
                single_model_run, llm_evaluator, questions_generator_runner
            )

            questions_generator_runner = evaluator.questions_generator_runner

            # Extract the evaluator runner's model run for AB comparison
            model_runs.append(evaluator.evaluator_runner._model_run)

        # Step 2: Use ABEvaluatorRunner to compare the outputs according to output_instructions
        comparison_runner = ABEvaluatorRunner(
            predictions_model_run=model_runs[0],
            predictions_model_run_b=model_runs[1],
            evaluator=llm_evaluator,
            output_specs_prompt=output_instructions,
        )

        await comparison_runner.evaluate()
        return comparison_runner

    except Exception as e:
        logging.error(f"Error in A/B testing evaluation: {str(e)}")
        raise

In [None]:
with open(SAMPLE_AB_TESTING_JOB_JSON, "r") as f:
    request_data = json.load(f)

# Parse the model run data
model_run_data = request_data.get("model_run", {})
model_run = ModelRun.from_json(model_run_data)

# Extract output instructions for injection into evaluator
output_instructions = request_data.get("output_instructions", "")

# Initialize OpenAI model for Summary evaluation
# Use MedBench config settings with fallback to environment variables
llm_evaluator = OpenAIReasoningModel(
    name=settings.azure_openai_deployment,
    version=settings.azure_openai_version,
    endpoint=settings.azure_openai_endpoint,
    api_key=settings.azure_openai_api_key,
    vision_enabled=False,
    system_prompt="",  # Prompts are defined by the evaluator runner
    max_tokens=40000,
    stop=None,
    stream=False,
)

# Check if this is A/B testing (Arena experiment)
is_ab_testing = len(model_run.results) > 1

output_instructions, model_run.id, is_ab_testing, model_run.model.name

In [None]:
mr = await _process_ab_testing_async(model_run, llm_evaluator, output_instructions)

In [None]:
print(mr.evaluator_runner._model_run.dataset.instances[0].input.get_text())

In [None]:
mr.predictions_model_run.results[0].completions.get_text()

In [None]:
mr.predictions_model_run_b.id

In [None]:
model_run.results

In [None]:
request_data

In [None]:
kwargs = {}
if output_instructions:
    kwargs["output_specs_prompt"] = output_instructions

evaluator = SummaryEvaluatorRunner(
    predictions_model_run=model_run,
    evaluator=llm_evaluator,
    skip_errors=True,
    **kwargs,
)

evaluator

In [None]:
await evaluator.evaluate()

In [None]:
evaluator.evaluator_runner._model_run.to_json()

### Drill down debugging of evaluation workflow

Check original dataset's outputs - that's what we will be evaluating

In [None]:
evaluator.predictions_model_run.results

Check questions' dataset, the first step of the `SummaryEvaluator`

In [None]:
evaluator.questions_generator_runner._model_run.to_json()

Seeing the `results` key, we generated questions correctly, so let's check the second step of the `SummaryEvaluator`: the answer generation step

In [None]:
evaluator.answerer_runner._model_run.to_json()

Hmm, something is wrong with the answer generation step. First, we have no input for this step (empty `instance`, and thus no answers were generated (empty `results`). The `SummaryEvaluator`, after generating questions, should be prepare the answer generation input in two steps:

```python
# ...

questions: List[Instance] = self._process_triplet_output(
    self.questions_generator_runner._model_run.results,
)

# ...
instances=self._prepare_summary_questions_instances(
    questions, self.predictions_model_run.results
)

# ...
```

So let's look into these:

In [None]:
questions = evaluator._process_triplet_output(
    evaluator.questions_generator_runner._model_run.results,
)

questions

In [None]:
evaluator._prepare_summary_questions_instances(
    questions, evaluator.predictions_model_run.results
)

#### Drill down into `_prepare_summary_questions_instances`

In [None]:
# Mimic function's params:
# questions = questions
summaries = evaluator.predictions_model_run.results
questions_header: str = "QUESTIONS:\n\n"
summaries_header: str = "\n\nAI SYSTEM SUMMARY:\n\n"

In [None]:
summary_questions_map = {}
for question in questions:
    if question.id not in summary_questions_map:
        summary_questions_map[question.id] = []
    summary_questions_map[question.id].append(question)

summary_questions_map

In [None]:
# Mimic loop
summary = summaries[0]

summary

In [None]:
# Stop conditions
summary.error is not None, summary.input_id not in summary_questions_map

In [None]:
summary.input_id

In [None]:
summary_questions_map: Dict[str, List[Instance]] = {}
for question in questions:
    if question.id not in summary_questions_map:
        summary_questions_map[question.id] = []
    summary_questions_map[question.id].append(question)

prepared_instances: List[Instance] = []
for summary in summaries:
    if summary.error is not None:
        logging.debug(
            "Cannot prepare summary questions for evaluation. "
            f"Skipping instance {summary.input_id} due to error: {summary.error}"
        )
        continue

    if summary.input_id not in summary_questions_map:
        logging.debug(
            f"Skipping instance {summary.input_id} because no questions were generated."
        )
        continue

    # Questions and references are in the same format, one line per entry.
    questions_text = "\n".join(
        [
            q.input.get_text().strip()
            for q in summary_questions_map[summary.input_id]
        ]
    )
    questions_references = "\n".join(
        [
            r.output.get_text().strip()
            for q in summary_questions_map[summary.input_id]
            for r in q.references
        ]
    )
    prepared_instances.append(
        Instance(
            id=summary.input_id,
            input=Data.from_text(
                data=(
                    questions_header
                    + questions_text
                    + summaries_header
                    + summary.completions.get_text()
                )
            ),
            references=questions_references,
            split="eval",
        )
    )

return prepared_instances