# Run Evaluation


Evaluates responses using MLflow and LLM-as-a-judge

In [None]:
import sys
import os

# Add src to path for imports (notebook runs from notebooks/ directory)
notebook_dir = os.getcwd()
repo_root = os.path.dirname(notebook_dir)
src_path = os.path.join(repo_root, 'src')
if src_path not in sys.path:
    sys.path.insert(0, src_path)


In [None]:
import logging
import uuid
from verdict.evaluation.mlflow_evaluator import MLflowEvaluator
from verdict.evaluation.custom_judges import LLMJudgeEvaluator
from verdict.evaluation.deterministic_metrics import DeterministicMetricsCalculator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# Widget parameters
dbutils.widgets.text("candidate_version", "", "Candidate Version")
dbutils.widgets.text("run_id", "", "Run ID (from inference)")
dbutils.widgets.text("judge_endpoint", "databricks-llama-4-maverick", "Judge Model Endpoint")
dbutils.widgets.text("catalog_name", "verdict", "Catalog Name")

candidate_version = dbutils.widgets.get("candidate_version")
run_id = dbutils.widgets.get("run_id") or None
judge_endpoint = dbutils.widgets.get("judge_endpoint")
catalog_name = dbutils.widgets.get("catalog_name")

In [None]:
logger.info(f"Starting evaluation for model version: {candidate_version}")
logger.info(f"Judge endpoint: {judge_endpoint}")

In [None]:
# Load responses from inference
responses_table = f"{catalog_name}.raw.model_responses"
responses_df = spark.table(responses_table)

if run_id:
    responses_df = responses_df.filter(f"run_id = '{run_id}'")

# Join with prompts for ground truth
prompts_df = spark.table(f"{catalog_name}.raw.prompt_datasets")
responses_df = responses_df.join(
    prompts_df.select("prompt_id", "prompt", "ground_truth"),
    on="prompt_id",
    how="left"
)

response_count = responses_df.count()
logger.info(f"Loaded {response_count} responses for evaluation")

In [None]:
# Run deterministic metrics
logger.info("Computing deterministic metrics...")
det_calculator = DeterministicMetricsCalculator(catalog_name=catalog_name)
metrics_df = det_calculator.calculate_metrics(responses_df)

# Latency stats
latency_stats = det_calculator.compute_latency_stats(metrics_df)
logger.info("Latency statistics:")
latency_stats.display()

In [None]:
# Run MLflow evaluation
logger.info("Running MLflow LLM Evaluate...")
mlflow_evaluator = MLflowEvaluator(
    catalog_name=catalog_name,
    experiment_path="/verdict/experiments"
)

eval_run_id = str(uuid.uuid4())
mlflow_results = mlflow_evaluator.evaluate_responses(
    responses_df=responses_df,
    run_id=eval_run_id,
    metrics=["faithfulness", "answer_relevance", "toxicity"]
)

In [None]:
# Run LLM-as-a-judge evaluation
logger.info(f"Running LLM-as-a-judge evaluation with {judge_endpoint}...")
judge_evaluator = LLMJudgeEvaluator(
    catalog_name=catalog_name,
    judge_endpoint=judge_endpoint,
    max_workers=10
)

judge_results = judge_evaluator.evaluate(
    responses_df=responses_df,
    run_id=eval_run_id
)

In [None]:
# Summary
print(f"\nEvaluation Run ID: {eval_run_id}")
print(f"Model Version: {candidate_version}")
print(f"Judge Endpoint: {judge_endpoint}")

# Display summary
eval_table = f"{catalog_name}.evaluated.eval_results"
spark.sql(f"""
    SELECT metric_name,
           COUNT(*) as count,
           AVG(metric_value) as avg_value,
           MIN(metric_value) as min_value,
           MAX(metric_value) as max_value
    FROM {eval_table}
    WHERE run_id = '{eval_run_id}'
    GROUP BY metric_name
""").display()

In [None]:
# Return values for downstream tasks
dbutils.jobs.taskValues.set("eval_run_id", eval_run_id)