diff --git a/docs/evals.md b/docs/evals.md index 3f4720d..11b40a7 100644 --- a/docs/evals.md +++ b/docs/evals.md @@ -67,6 +67,8 @@ This installs: | `--azure-api-version` | ❌ | Azure OpenAI API version (default: 2025-01-01-preview) | | `--models` | ❌ | Models for benchmark mode (benchmark only) | | `--latency-iterations` | ❌ | Latency test samples (default: 25) (benchmark only) | +| `--max-parallel-models` | ❌ | Maximum number of models to benchmark concurrently (default: max(1, min(model_count, cpu_count))) (benchmark only) | +| `--benchmark-chunk-size` | ❌ | Optional number of samples per chunk when benchmarking to limit long-running runs (benchmark only) | ## Configuration @@ -205,6 +207,8 @@ guardrails-evals \ - **Automatic stage detection**: Evaluates all stages found in configuration - **Batch processing**: Configurable parallel processing - **Benchmark mode**: Model performance comparison with ROC AUC, precision at recall thresholds +- **Parallel benchmarking**: Run multiple models concurrently (defaults to CPU count) +- **Benchmark chunking**: Process large datasets in chunks for better progress tracking - **Latency testing**: End-to-end guardrail performance measurement - **Visualization**: Automatic chart and graph generation - **Multi-provider support**: OpenAI, Azure OpenAI, Ollama, vLLM, and other OpenAI-compatible APIs diff --git a/src/guardrails/evals/guardrail_evals.py b/src/guardrails/evals/guardrail_evals.py index 35dfa37..688edfc 100644 --- a/src/guardrails/evals/guardrail_evals.py +++ b/src/guardrails/evals/guardrail_evals.py @@ -9,8 +9,11 @@ import asyncio import copy import logging +import math +import os import sys -from collections.abc import Sequence +import time +from collections.abc import Iterator, Sequence from pathlib import Path from typing import Any @@ -33,7 +36,7 @@ JsonResultsReporter, LatencyTester, ) -from guardrails.evals.core.types import Context +from guardrails.evals.core.types import Context, Sample logger = logging.getLogger(__name__) @@ -67,6 +70,8 @@ def __init__( models: Sequence[str] | None = None, latency_iterations: int = DEFAULT_LATENCY_ITERATIONS, multi_turn: bool = False, + max_parallel_models: int | None = None, + benchmark_chunk_size: int | None = None, ) -> None: """Initialize the evaluator. @@ -84,8 +89,18 @@ def __init__( models: Models to test in benchmark mode. multi_turn: Whether to evaluate guardrails on multi-turn conversations. latency_iterations: Number of iterations for latency testing. + max_parallel_models: Maximum number of models to benchmark concurrently. + benchmark_chunk_size: Optional sample chunk size for per-model benchmarking. """ - self._validate_inputs(config_path, dataset_path, batch_size, mode, latency_iterations) + self._validate_inputs( + config_path, + dataset_path, + batch_size, + mode, + latency_iterations, + max_parallel_models, + benchmark_chunk_size, + ) self.config_path = config_path self.dataset_path = dataset_path @@ -97,7 +112,9 @@ def __init__( self.azure_endpoint = azure_endpoint self.azure_api_version = azure_api_version or "2025-01-01-preview" self.mode = mode - self.models = models or DEFAULT_BENCHMARK_MODELS + self.models = list(models) if models else list(DEFAULT_BENCHMARK_MODELS) + self.max_parallel_models = self._determine_parallel_model_limit(len(self.models), max_parallel_models) + self.benchmark_chunk_size = benchmark_chunk_size self.latency_iterations = latency_iterations self.multi_turn = multi_turn @@ -105,7 +122,16 @@ def __init__( if azure_endpoint and not AsyncAzureOpenAI: raise ValueError("Azure OpenAI support requires openai>=1.0.0. Please upgrade: pip install --upgrade openai") - def _validate_inputs(self, config_path: Path, dataset_path: Path, batch_size: int, mode: str, latency_iterations: int) -> None: + def _validate_inputs( + self, + config_path: Path, + dataset_path: Path, + batch_size: int, + mode: str, + latency_iterations: int, + max_parallel_models: int | None, + benchmark_chunk_size: int | None, + ) -> None: """Validate input parameters.""" if not config_path.exists(): raise ValueError(f"Config file not found: {config_path}") @@ -122,6 +148,61 @@ def _validate_inputs(self, config_path: Path, dataset_path: Path, batch_size: in if latency_iterations <= 0: raise ValueError(f"Latency iterations must be positive, got: {latency_iterations}") + if max_parallel_models is not None and max_parallel_models <= 0: + raise ValueError(f"max_parallel_models must be positive, got: {max_parallel_models}") + + if benchmark_chunk_size is not None and benchmark_chunk_size <= 0: + raise ValueError(f"benchmark_chunk_size must be positive, got: {benchmark_chunk_size}") + + @staticmethod + def _determine_parallel_model_limit(model_count: int, requested_limit: int | None) -> int: + """Resolve the number of benchmark tasks that can run concurrently. + + Args: + model_count: Total number of models scheduled for benchmarking. + requested_limit: Optional user-provided parallelism limit. + + Returns: + Number of concurrent benchmark tasks to run. + + Raises: + ValueError: If either model_count or requested_limit is invalid. + """ + if model_count <= 0: + raise ValueError("model_count must be positive") + + if requested_limit is not None: + if requested_limit <= 0: + raise ValueError("max_parallel_models must be positive") + return min(requested_limit, model_count) + + cpu_count = os.cpu_count() or 1 + return max(1, min(cpu_count, model_count)) + + @staticmethod + def _chunk_samples(samples: list[Sample], chunk_size: int | None) -> Iterator[list[Sample]]: + """Yield contiguous sample chunks respecting the configured chunk size. + + Args: + samples: Samples to evaluate. + chunk_size: Optional maximum chunk size to enforce. + + Returns: + Iterator yielding chunks of the provided samples. + + Raises: + ValueError: If chunk_size is non-positive when provided. + """ + if chunk_size is not None and chunk_size <= 0: + raise ValueError("chunk_size must be positive when provided") + + if not samples or chunk_size is None or chunk_size >= len(samples): + yield samples + return + + for start in range(0, len(samples), chunk_size): + yield samples[start : start + chunk_size] + async def run(self) -> None: """Run the evaluation pipeline for all specified stages.""" try: @@ -178,7 +259,13 @@ async def _run_evaluation(self) -> None: async def _run_benchmark(self) -> None: """Run benchmark mode comparing multiple models.""" - logger.info("Running benchmark mode with models: %s", ", ".join(self.models)) + logger.info('event="benchmark_start" duration_ms=0 models="%s"', ", ".join(self.models)) + logger.info( + 'event="benchmark_parallel_config" duration_ms=0 parallel_limit=%d chunk_size=%s batch_size=%d', + self.max_parallel_models, + self.benchmark_chunk_size if self.benchmark_chunk_size else "dataset", + self.batch_size, + ) pipeline_bundles = load_pipeline_bundles(self.config_path) stage_to_test, guardrail_name = self._get_benchmark_target(pipeline_bundles) @@ -191,24 +278,23 @@ async def _run_benchmark(self) -> None: "Benchmark mode requires LLM-based guardrails with configurable models." ) - logger.info("Benchmarking guardrail '%s' from stage '%s'", guardrail_name, stage_to_test) + logger.info('event="benchmark_target" duration_ms=0 guardrail="%s" stage="%s"', guardrail_name, stage_to_test) loader = JsonlDatasetLoader() samples = loader.load(self.dataset_path) - logger.info("Loaded %d samples for benchmarking", len(samples)) + logger.info('event="benchmark_samples_loaded" duration_ms=0 count=%d', len(samples)) - context = self._create_context() benchmark_calculator = BenchmarkMetricsCalculator() basic_calculator = GuardrailMetricsCalculator() benchmark_reporter = BenchmarkReporter(self.output_dir) # Run benchmark for all models results_by_model, metrics_by_model = await self._benchmark_all_models( - stage_to_test, guardrail_name, samples, context, benchmark_calculator, basic_calculator + stage_to_test, guardrail_name, samples, benchmark_calculator, basic_calculator ) # Run latency testing - logger.info("Running latency tests for all models") + logger.info('event="benchmark_latency_start" duration_ms=0 model_count=%d', len(self.models)) latency_results = await self._run_latency_tests(stage_to_test, samples) # Save benchmark results @@ -217,14 +303,14 @@ async def _run_benchmark(self) -> None: ) # Create visualizations - logger.info("Generating visualizations") + logger.info('event="benchmark_visualization_start" duration_ms=0 guardrail="%s"', guardrail_name) visualizer = BenchmarkVisualizer(benchmark_dir / "graphs") visualization_files = visualizer.create_all_visualizations( results_by_model, metrics_by_model, latency_results, guardrail_name, samples[0].expected_triggers if samples else {} ) - logger.info("Benchmark completed. Results saved to: %s", benchmark_dir) - logger.info("Generated %d visualizations", len(visualization_files)) + logger.info('event="benchmark_complete" duration_ms=0 output="%s"', benchmark_dir) + logger.info('event="benchmark_visualization_complete" duration_ms=0 count=%d', len(visualization_files)) def _has_model_configuration(self, stage_bundle) -> bool: """Check if the guardrail has a model configuration.""" @@ -242,7 +328,7 @@ def _has_model_configuration(self, stage_bundle) -> bool: return False - async def _run_latency_tests(self, stage_to_test: str, samples: list) -> dict[str, Any]: + async def _run_latency_tests(self, stage_to_test: str, samples: list[Sample]) -> dict[str, Any]: """Run latency tests for all models.""" latency_results = {} latency_tester = LatencyTester(iterations=self.latency_iterations) @@ -378,7 +464,7 @@ def _get_valid_stages(self, pipeline_bundles) -> list[str]: return valid_requested_stages async def _evaluate_single_stage( - self, stage: str, pipeline_bundles, samples: list, context: Context, calculator: GuardrailMetricsCalculator + self, stage: str, pipeline_bundles, samples: list[Sample], context: Context, calculator: GuardrailMetricsCalculator ) -> dict[str, Any] | None: """Evaluate a single pipeline stage.""" try: @@ -418,8 +504,7 @@ async def _benchmark_all_models( self, stage_to_test: str, guardrail_name: str, - samples: list, - context: Context, + samples: list[Sample], benchmark_calculator: BenchmarkMetricsCalculator, basic_calculator: GuardrailMetricsCalculator, ) -> tuple[dict[str, list], dict[str, dict]]: @@ -427,42 +512,88 @@ async def _benchmark_all_models( pipeline_bundles = load_pipeline_bundles(self.config_path) stage_bundle = getattr(pipeline_bundles, stage_to_test) - results_by_model = {} - metrics_by_model = {} - - for i, model in enumerate(self.models, 1): - logger.info("Testing model %d/%d: %s", i, len(self.models), model) - - try: - modified_stage_bundle = self._create_model_specific_stage_bundle(stage_bundle, model) - - model_results = await self._benchmark_single_model( - model, modified_stage_bundle, samples, context, guardrail_name, benchmark_calculator, basic_calculator + semaphore = asyncio.Semaphore(self.max_parallel_models) + total_models = len(self.models) + + async def run_model_task(index: int, model: str) -> tuple[str, dict[str, Any]]: + """Execute a benchmark task under concurrency control. + + Args: + index: One-based position of the model being benchmarked. + model: Identifier of the model to benchmark. + + Returns: + Tuple of (model_name, results_dict) where results_dict contains "results" and "metrics" keys. + """ + async with semaphore: + start_time = time.perf_counter() + logger.info( + 'event="benchmark_model_start" duration_ms=0 model="%s" position=%d total=%d', + model, + index, + total_models, ) - if model_results: - results_by_model[model] = model_results["results"] - metrics_by_model[model] = model_results["metrics"] - logger.info("Completed benchmarking for model %s (%d/%d)", model, i, len(self.models)) - else: - logger.warning("Model %s benchmark returned no results (%d/%d)", model, i, len(self.models)) - results_by_model[model] = [] - metrics_by_model[model] = {} - - except Exception as e: - logger.error("Failed to benchmark model %s (%d/%d): %s", model, i, len(self.models), e) - results_by_model[model] = [] - metrics_by_model[model] = {} + try: + modified_stage_bundle = self._create_model_specific_stage_bundle(stage_bundle, model) + + model_results = await self._benchmark_single_model( + model, + modified_stage_bundle, + samples, + guardrail_name, + benchmark_calculator, + basic_calculator, + ) + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + + if model_results: + logger.info( + 'event="benchmark_model_complete" duration_ms=%.2f model="%s" status="success"', + elapsed_ms, + model, + ) + return (model, model_results) + else: + logger.warning( + 'event="benchmark_model_empty" duration_ms=%.2f model="%s" status="no_results"', + elapsed_ms, + model, + ) + return (model, {"results": [], "metrics": {}}) + + except Exception as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + 'event="benchmark_model_failure" duration_ms=%.2f model="%s" error="%s"', + elapsed_ms, + model, + e, + ) + return (model, {"results": [], "metrics": {}}) + + task_results = await asyncio.gather(*(run_model_task(index, model) for index, model in enumerate(self.models, start=1))) + + # Build dictionaries from collected results + results_by_model: dict[str, list] = {} + metrics_by_model: dict[str, dict] = {} + for model, result_dict in task_results: + results_by_model[model] = result_dict["results"] + metrics_by_model[model] = result_dict["metrics"] # Log summary - successful_models = [model for model, results in results_by_model.items() if results] - failed_models = [model for model, results in results_by_model.items() if not results] + successful_models = [model for model in self.models if results_by_model.get(model)] + failed_models = [model for model in self.models if not results_by_model.get(model)] - logger.info("BENCHMARK SUMMARY") - logger.info("Successful models: %s", ", ".join(successful_models) if successful_models else "None") + logger.info('event="benchmark_summary" duration_ms=0 successful=%d failed=%d', len(successful_models), len(failed_models)) + logger.info( + 'event="benchmark_successful_models" duration_ms=0 models="%s"', + ", ".join(successful_models) if successful_models else "None", + ) if failed_models: - logger.warning("Failed models: %s", ", ".join(failed_models)) - logger.info("Total models tested: %d", len(self.models)) + logger.warning('event="benchmark_failed_models" duration_ms=0 models="%s"', ", ".join(failed_models)) + logger.info('event="benchmark_total_models" duration_ms=0 total=%d', len(self.models)) return results_by_model, metrics_by_model @@ -470,8 +601,7 @@ async def _benchmark_single_model( self, model: str, stage_bundle, - samples: list, - context: Context, + samples: list[Sample], guardrail_name: str, benchmark_calculator: BenchmarkMetricsCalculator, basic_calculator: GuardrailMetricsCalculator, @@ -482,7 +612,15 @@ async def _benchmark_single_model( guardrails = instantiate_guardrails(stage_bundle) engine = AsyncRunEngine(guardrails, multi_turn=self.multi_turn) - model_results = await engine.run(model_context, samples, self.batch_size, desc=f"Benchmarking {model}") + chunk_total = 1 + if self.benchmark_chunk_size and len(samples) > 0: + chunk_total = max(1, math.ceil(len(samples) / self.benchmark_chunk_size)) + + model_results: list[Any] = [] + for chunk_index, chunk in enumerate(self._chunk_samples(samples, self.benchmark_chunk_size), start=1): + chunk_desc = f"Benchmarking {model}" if chunk_total == 1 else f"Benchmarking {model} ({chunk_index}/{chunk_total})" + chunk_results = await engine.run(model_context, chunk, self.batch_size, desc=chunk_desc) + model_results.extend(chunk_results) guardrail_config = stage_bundle.guardrails[0].config if stage_bundle.guardrails else None @@ -630,6 +768,16 @@ def main() -> None: default=DEFAULT_LATENCY_ITERATIONS, help=f"Number of iterations for latency testing in benchmark mode (default: {DEFAULT_LATENCY_ITERATIONS})", ) + parser.add_argument( + "--max-parallel-models", + type=int, + help="Maximum number of models to benchmark concurrently (default: max(1, min(model_count, cpu_count)))", + ) + parser.add_argument( + "--benchmark-chunk-size", + type=int, + help="Optional number of samples per chunk when benchmarking to limit long-running runs.", + ) args = parser.parse_args() @@ -651,6 +799,14 @@ def main() -> None: print(f"❌ Error: Latency iterations must be positive, got: {args.latency_iterations}") sys.exit(1) + if args.max_parallel_models is not None and args.max_parallel_models <= 0: + print(f"❌ Error: max-parallel-models must be positive, got: {args.max_parallel_models}") + sys.exit(1) + + if args.benchmark_chunk_size is not None and args.benchmark_chunk_size <= 0: + print(f"❌ Error: benchmark-chunk-size must be positive, got: {args.benchmark_chunk_size}") + sys.exit(1) + if args.stages: invalid_stages = [stage for stage in args.stages if stage not in VALID_STAGES] if invalid_stages: @@ -694,6 +850,13 @@ def main() -> None: if args.mode == "benchmark": print(f" Models: {', '.join(args.models or DEFAULT_BENCHMARK_MODELS)}") print(f" Latency iterations: {args.latency_iterations}") + model_count = len(args.models or DEFAULT_BENCHMARK_MODELS) + parallel_setting = GuardrailEval._determine_parallel_model_limit(model_count, args.max_parallel_models) + print(f" Parallel models: {parallel_setting}") + if args.benchmark_chunk_size: + print(f" Benchmark chunk size: {args.benchmark_chunk_size}") + else: + print(" Benchmark chunk size: dataset") if args.multi_turn: print(" Conversation handling: multi-turn incremental") @@ -714,6 +877,8 @@ def main() -> None: models=args.models, latency_iterations=args.latency_iterations, multi_turn=args.multi_turn, + max_parallel_models=args.max_parallel_models, + benchmark_chunk_size=args.benchmark_chunk_size, ) asyncio.run(eval.run()) diff --git a/tests/unit/evals/test_guardrail_evals.py b/tests/unit/evals/test_guardrail_evals.py new file mode 100644 index 0000000..8a78346 --- /dev/null +++ b/tests/unit/evals/test_guardrail_evals.py @@ -0,0 +1,66 @@ +"""Unit tests for guardrail evaluation utilities.""" + +from __future__ import annotations + +import os + +import pytest + +from guardrails.evals.core.types import Sample +from guardrails.evals.guardrail_evals import GuardrailEval + + +def _build_samples(count: int) -> list[Sample]: + """Build synthetic samples for chunking tests. + + Args: + count: Number of synthetic samples to build. + + Returns: + List of Sample instances configured for evaluation. + """ + return [ + Sample(id=f"sample-{idx}", data=f"payload-{idx}", expected_triggers={"g": bool(idx % 2)}) + for idx in range(count) + ] + + +def test_determine_parallel_model_limit_defaults(monkeypatch: pytest.MonkeyPatch) -> None: + """Use cpu_count when explicit parallelism is not provided. + + Args: + monkeypatch: Pytest monkeypatch helper. + """ + monkeypatch.setattr(os, "cpu_count", lambda: 4) + assert GuardrailEval._determine_parallel_model_limit(10, None) == 4 + assert GuardrailEval._determine_parallel_model_limit(2, None) == 2 + + +def test_determine_parallel_model_limit_respects_request() -> None: + """Honor user-provided parallelism constraints.""" + assert GuardrailEval._determine_parallel_model_limit(5, 3) == 3 + with pytest.raises(ValueError): + GuardrailEval._determine_parallel_model_limit(5, 0) + + +def test_chunk_samples_without_size() -> None: + """Return the original sample list when no chunk size is provided.""" + samples = _build_samples(3) + chunks = list(GuardrailEval._chunk_samples(samples, None)) + assert len(chunks) == 1 + assert chunks[0] is samples + + +def test_chunk_samples_even_splits() -> None: + """Split samples into evenly sized chunks.""" + samples = _build_samples(5) + chunks = list(GuardrailEval._chunk_samples(samples, 2)) + assert [len(chunk) for chunk in chunks] == [2, 2, 1] + assert [chunk[0].id for chunk in chunks] == ["sample-0", "sample-2", "sample-4"] + + +def test_chunk_samples_rejects_invalid_size() -> None: + """Raise ValueError for non-positive chunk sizes.""" + samples = _build_samples(2) + with pytest.raises(ValueError): + list(GuardrailEval._chunk_samples(samples, 0))