From 44051aa23e204ea34d2e5af4239743e41e50c737 Mon Sep 17 00:00:00 2001 From: Mark Kurtz Date: Wed, 15 Oct 2025 23:12:06 -0400 Subject: [PATCH 1/4] Updates for scenarios and benchmarking entrypoints to reenable them for the latest state of refactor --- src/guidellm/__main__.py | 563 +++++--------- src/guidellm/benchmark/__init__.py | 20 +- src/guidellm/benchmark/benchmarker.py | 57 +- src/guidellm/benchmark/entrypoints.py | 267 ++++--- src/guidellm/benchmark/profile.py | 233 +++--- src/guidellm/benchmark/scenario.py | 169 ---- src/guidellm/benchmark/scenarios/__init__.py | 40 + src/guidellm/benchmark/scenarios/chat.json | 2 +- src/guidellm/benchmark/scenarios/rag.json | 2 +- src/guidellm/benchmark/schemas.py | 736 ++++++++++++++++-- src/guidellm/benchmark/types.py | 22 - .../data/deserializers/deserializer.py | 15 +- src/guidellm/presentation/data_models.py | 12 +- src/guidellm/utils/cli.py | 19 +- 14 files changed, 1276 insertions(+), 881 deletions(-) delete mode 100644 src/guidellm/benchmark/scenario.py delete mode 100644 src/guidellm/benchmark/types.py diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 680ac852..2df537ee 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -1,14 +1,12 @@ """ -GuideLLM command-line interface providing benchmarking, dataset preprocessing, and -mock server functionality. +GuideLLM command-line interface entry point. -This module serves as the primary entry point for the GuideLLM CLI application, -offering a comprehensive suite of tools for language model evaluation and testing. -It provides three main command groups: benchmark operations for performance testing -against generative models, dataset preprocessing utilities for data preparation and -transformation, and a mock server for testing and development scenarios. The CLI -supports various backends, output formats, and configuration options to accommodate -different benchmarking needs and deployment environments. +Primary CLI application providing benchmark execution, dataset preprocessing, and +mock server functionality for language model evaluation. Organizes commands into +three main groups: benchmark operations for performance testing, preprocessing +utilities for data transformation, and mock server capabilities for development +and testing. Supports multiple backends, output formats, and flexible configuration +through CLI options and environment variables. Example: :: @@ -38,12 +36,13 @@ from guidellm.backends import BackendType from guidellm.benchmark import ( + BenchmarkGenerativeTextArgs, GenerativeConsoleBenchmarkerProgress, ProfileType, benchmark_generative_text, + get_builtin_scenarios, reimport_benchmarks_report, ) -from guidellm.benchmark.scenario import GenerativeTextScenario from guidellm.mock_server import MockServer, MockServerConfig from guidellm.preprocess.dataset import ShortPromptStrategy, process_dataset from guidellm.scheduler import StrategyType @@ -65,22 +64,21 @@ "run", ] -# Available strategy and profile choices for benchmark execution types STRATEGY_PROFILE_CHOICES: list[str] = list(get_literal_vals(ProfileType | StrategyType)) +"""Available strategy and profile type choices for benchmark execution.""" def decode_escaped_str(_ctx, _param, value): """ Decode escape sequences in Click option values. - Click automatically escapes characters in option values, converting sequences - like "\\n" to "\\\\n". This function properly decodes these escape sequences - to their intended characters for use in CLI options. + Click automatically escapes characters converting sequences like "\\n" to + "\\\\n". This function decodes these sequences to their intended characters. :param _ctx: Click context (unused) :param _param: Click parameter (unused) - :param value: String value to decode escape sequences from - :return: Decoded string with proper escape sequences + :param value: String value to decode + :return: Decoded string with proper escape sequences, or None if input is None :raises click.BadParameter: When escape sequence decoding fails """ if value is None: @@ -94,89 +92,76 @@ def decode_escaped_str(_ctx, _param, value): @click.group() @click.version_option(package_name="guidellm", message="guidellm version: %(version)s") def cli(): - """ - Main entry point for the GuideLLM command-line interface. - - This is the root command group that organizes all GuideLLM CLI functionality - into logical subgroups for benchmarking, preprocessing, configuration, and - mock server operations. - """ + """GuideLLM CLI for benchmarking, preprocessing, and testing language models.""" @cli.group( - help="Commands to run a new benchmark or load a prior one.", + help="Run a benchmark or load a previously saved benchmark report.", cls=DefaultGroupHandler, default="run", ) def benchmark(): - """ - Benchmark command group for running and managing performance tests. - - This command group provides functionality to execute new benchmarks against - generative models and load previously saved benchmark reports for analysis. - Supports various benchmarking strategies, output formats, and backend types. - """ + """Benchmark commands for performance testing generative models.""" @benchmark.command( "run", - help="Run a benchmark against a generative model using the specified arguments.", + help=( + "Run a benchmark against a generative model. " + "Supports multiple backends, data sources, strategies, and output formats. " + "Configuration can be loaded from a scenario file or specified via options." + ), context_settings={"auto_envvar_prefix": "GUIDELLM"}, ) -# @click.option( -# "--scenario", -# type=cli_tools.Union( -# click.Path( -# exists=True, -# readable=True, -# file_okay=True, -# dir_okay=False, -# path_type=Path, -# ), -# click.Choice(get_builtin_scenarios()), -# ), -# default=None, -# help=( -# "The name of a builtin scenario or path to a config file. " -# "Missing values from the config will use defaults. " -# "Options specified on the commandline will override the scenario." -# ), -# ) +@click.option( + "--scenario", + type=cli_tools.Union( + click.Path( + exists=True, + readable=True, + file_okay=True, + dir_okay=False, + path_type=Path, + ), + click.Choice(get_builtin_scenarios().keys()), + ), + default=None, + help=( + "Builtin scenario name or path to config file. " + "CLI options override scenario settings." + ), +) @click.option( "--target", type=str, - help="The target path for the backend to run benchmarks against. For example, http://localhost:8000", + help="Target backend URL (e.g., http://localhost:8000).", ) @click.option( "--data", type=str, multiple=True, help=( - "The HuggingFace dataset ID, a path to a HuggingFace dataset, " - "a path to a data file csv, json, jsonl, or txt, " - "or a synthetic data config as a json or key=value string." + "HuggingFace dataset ID, path to dataset, path to data file " + "(csv/json/jsonl/txt), or synthetic data config (json/key=value)." ), ) @click.option( "--profile", "--rate-type", # legacy alias "profile", + default=BenchmarkGenerativeTextArgs.get_default("profile"), type=click.Choice(STRATEGY_PROFILE_CHOICES), - help=( - "The type of benchmark to run. " - f"Supported types {', '.join(STRATEGY_PROFILE_CHOICES)}. " - ), + help=f"Benchmark profile type. Options: {', '.join(STRATEGY_PROFILE_CHOICES)}.", ) @click.option( "--rate", - default=GenerativeTextScenario.get_default("rate"), + type=float, + multiple=True, + default=BenchmarkGenerativeTextArgs.get_default("rate"), help=( - "The rates to run the benchmark at. " - "Can be a single number or a comma-separated list of numbers. " - "For rate-type=sweep, this is the number of benchmarks it runs in the sweep. " - "For rate-type=concurrent, this is the number of concurrent requests. " - "For rate-type=async,constant,poisson, this is the rate requests per second. " - "For rate-type=synchronous,throughput, this must not be set." + "Benchmark rate(s) to test. Meaning depends on profile: " + "sweep=number of benchmarks, concurrent=concurrent requests, " + "async/constant/poisson=requests per second." ), ) # Backend configuration @@ -185,166 +170,132 @@ def benchmark(): "--backend-type", # legacy alias "backend", type=click.Choice(list(get_literal_vals(BackendType))), - default=GenerativeTextScenario.get_default("backend"), - help=( - "The type of backend to use to run requests against. Defaults to 'openai_http'." - f" Supported types: {', '.join(get_literal_vals(BackendType))}" - ), + default=BenchmarkGenerativeTextArgs.get_default("backend"), + help=f"Backend type. Options: {', '.join(get_literal_vals(BackendType))}.", ) @click.option( "--backend-kwargs", "--backend-args", # legacy alias "backend_kwargs", callback=cli_tools.parse_json, - default=GenerativeTextScenario.get_default("backend_kwargs"), - help=( - "A JSON string containing any arguments to pass to the backend as a " - "dict with **kwargs." - ), + default=BenchmarkGenerativeTextArgs.get_default("backend_kwargs"), + help="JSON string of arguments to pass to the backend.", ) @click.option( "--model", - default=GenerativeTextScenario.get_default("model"), + default=BenchmarkGenerativeTextArgs.get_default("model"), type=str, - help=( - "The ID of the model to benchmark within the backend. " - "If None provided (default), then it will use the first model available." - ), + help="Model ID to benchmark. If not provided, uses first available model.", ) # Data configuration @click.option( "--request-type", - default="chat_completions", + default=BenchmarkGenerativeTextArgs.get_default("data_request_formatter"), type=click.Choice(list(get_literal_vals(GenerativeRequestType))), help=( - "The type of request to create for each data sample and send to the backend. " - f"Supported types: {list(get_literal_vals(GenerativeRequestType))}." + f"Request type to create for each data sample. " + f"Options: {', '.join(get_literal_vals(GenerativeRequestType))}." ), ) @click.option( "--request-formatter-kwargs", default=None, callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the request formatter " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to the request formatter.", ) @click.option( "--processor", - default=GenerativeTextScenario.get_default("processor"), + default=BenchmarkGenerativeTextArgs.get_default("processor"), type=str, help=( - "The processor or tokenizer to use to calculate token counts for statistics " - "and synthetic data generation. If None provided (default), will load " - "using the model arg, if needed." + "Processor or tokenizer for token count calculations. " + "If not provided, loads from model." ), ) @click.option( "--processor-args", - default=GenerativeTextScenario.get_default("processor_args"), + default=BenchmarkGenerativeTextArgs.get_default("processor_args"), callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the processor constructor " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to the processor constructor.", ) @click.option( "--data-args", multiple=True, - default=None, + default=BenchmarkGenerativeTextArgs.get_default("data_args"), callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the dataset creation " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to dataset creation.", ) @click.option( "--data-samples", - default=-1, + default=BenchmarkGenerativeTextArgs.get_default("data_samples"), type=int, help=( - "The number of samples to use from the dataset. If -1 (default), will use all " - "samples in the dataset and dynamically generate samples. " - "If >1, will precompile that number of items from the dataset configs." + "Number of samples from dataset. -1 (default) uses all samples " + "and dynamically generates more." ), ) @click.option( - "--data-column-mappings", - default=None, + "--data-column-mapper", + default=BenchmarkGenerativeTextArgs.get_default("data_column_mapper"), callback=cli_tools.parse_json, - help=( - "A JSON string of column mappings to apply to the dataset to map into request " - "column types." - ), + help="JSON string of column mappings to apply to the dataset.", ) @click.option( "--data-sampler", - default=None, + default=BenchmarkGenerativeTextArgs.get_default("data_sampler"), type=click.Choice(["shuffle"]), - help="The data sampler type to use.", + help="Data sampler type.", ) @click.option( "--data-num-workers", - default=None, + default=BenchmarkGenerativeTextArgs.get_default("data_num_workers"), type=int, - help="The number of worker processes to use for data loading.", + help="Number of worker processes for data loading.", ) @click.option( "--dataloader_kwargs", - default=None, + default=BenchmarkGenerativeTextArgs.get_default("dataloader_kwargs"), callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the dataloader constructor " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to the dataloader constructor.", ) @click.option( "--random-seed", - default=GenerativeTextScenario.get_default("random_seed"), + default=BenchmarkGenerativeTextArgs.get_default("random_seed"), type=int, - help="The random seed to use for benchmarking to ensure reproducibility.", + help="Random seed for reproducibility.", ) # Output configuration @click.option( "--output-path", type=click.Path(), - default=Path.cwd(), + default=BenchmarkGenerativeTextArgs.get_default("output_path"), help=( - "The path to save the output formats to, if the format is a file type. " - "If it is a directory, it will save all output formats selected under it. " - "If it is a file, it will save the corresponding output format to that file. " - "Any output formats that were given that do not match the file extension will " - "be saved in the parent directory of the file path. " - "Defaults to the current working directory. " + "Path to save output files. Can be a directory or file. " + "If a file, saves that format; mismatched formats save to parent directory." ), ) @click.option( "--output-formats", multiple=True, type=str, - default=("console", "json"), # ("console", "json", "html", "csv") - help=( - "The output formats to use for the benchmark results. " - "Defaults to console, json, html, and csv where the file formats " - "will be saved at the specified output path." - ), + default=BenchmarkGenerativeTextArgs.get_default("output_formats"), + help="Output formats for results (e.g., console, json, html, csv).", ) @click.option( "--disable-console-outputs", is_flag=True, - help="Set this flag to disable console output", + help="Disable console output.", ) # Updates configuration @click.option( "--disable-progress", is_flag=True, - help="Set this flag to disable progress updates to the console", + help="Disable progress updates to the console.", ) @click.option( "--display-scheduler-stats", is_flag=True, - help="Set this flag to display stats for the processes running the benchmarks", + help="Display scheduler process statistics.", ) # Aggregators configuration @click.option( @@ -352,13 +303,10 @@ def benchmark(): "--warmup-percent", # legacy alias "warmup", type=float, - default=GenerativeTextScenario.get_default("warmup"), + default=BenchmarkGenerativeTextArgs.get_default("warmup"), help=( - "The specification around the number of requests to run before benchmarking. " - "If within (0, 1), then the percent of requests/time to use for warmup. " - "If >=1, then the number of requests or seconds to use for warmup." - "Whether it's requests/time used is dependent on which constraint is active. " - "Default None for no warmup." + "Warmup specification: if in (0,1) = percent, if >=1 = number of " + "requests/seconds (depends on active constraint)." ), ) @click.option( @@ -366,13 +314,10 @@ def benchmark(): "--cooldown-percent", # legacy alias "cooldown", type=float, - default=GenerativeTextScenario.get_default("cooldown"), + default=BenchmarkGenerativeTextArgs.get_default("cooldown"), help=( - "The specification around the number of requests to run after benchmarking. " - "If within (0, 1), then the percent of requests/time to use for cooldown. " - "If >=1, then the number of requests or seconds to use for cooldown." - "Whether it's requests/time used is dependent on which constraint is active. " - "Default None for no cooldown." + "Cooldown specification: if in (0,1) = percent, if >=1 = number of " + "requests/seconds (depends on active constraint)." ), ) @click.option( @@ -381,129 +326,82 @@ def benchmark(): "sample_requests", type=int, help=( - "The number of samples for each request status and each benchmark to save " - "in the output file. If None (default), will save all samples. " - "Defaults to 20." + "Number of sample requests per status to save. " + "None (default) saves all, recommended: 20." ), ) # Constraints configuration @click.option( "--max-seconds", type=float, - default=GenerativeTextScenario.get_default("max_seconds"), + default=BenchmarkGenerativeTextArgs.get_default("max_seconds"), help=( - "The maximum number of seconds each benchmark can run for. " - "If None, will run until max_requests or the data is exhausted." + "Maximum seconds per benchmark. " + "If None, runs until max_requests or data exhaustion." ), ) @click.option( "--max-requests", type=int, - default=GenerativeTextScenario.get_default("max_requests"), + default=BenchmarkGenerativeTextArgs.get_default("max_requests"), help=( - "The maximum number of requests each benchmark can run for. " - "If None, will run until max_seconds or the data is exhausted." + "Maximum requests per benchmark. " + "If None, runs until max_seconds or data exhaustion." ), ) @click.option( "--max-errors", type=int, - default=GenerativeTextScenario.get_default("max_errors"), - help="Maximum number of errors allowed before stopping the benchmark", + default=BenchmarkGenerativeTextArgs.get_default("max_errors"), + help="Maximum errors before stopping the benchmark.", ) @click.option( "--max-error-rate", type=float, - default=GenerativeTextScenario.get_default("max_error_rate"), - help="Maximum error rate allowed before stopping the benchmark", + default=BenchmarkGenerativeTextArgs.get_default("max_error_rate"), + help="Maximum error rate before stopping the benchmark.", ) @click.option( "--max-global-error-rate", type=float, - default=GenerativeTextScenario.get_default("max_global_error_rate"), - help="Maximum global error rate allowed across all benchmarks", + default=BenchmarkGenerativeTextArgs.get_default("max_global_error_rate"), + help="Maximum global error rate across all benchmarks.", ) -def run( - target, - data, - profile, - rate, - # Backend Configuration - backend, - backend_kwargs, - model, - # Data configuration - request_type, - request_formatter_kwargs, - processor, - processor_args, - data_args, - data_samples, - data_column_mappings, - data_sampler, - data_num_workers, - dataloader_kwargs, - random_seed, - # Output configuration - output_path, - output_formats, - # Updates configuration - disable_console_outputs, - disable_progress, - display_scheduler_stats, - # Benchmarker configuration - sample_requests, - warmup, - cooldown, - # Constraints configuration - max_seconds, - max_requests, - max_errors, - max_error_rate, - max_global_error_rate, -): - """ - Execute a generative text benchmark against a target model backend. - - Runs comprehensive performance testing using various strategies and profiles, - collecting metrics on latency, throughput, error rates, and resource usage. - Supports multiple backends, data sources, output formats, and constraint types - for flexible benchmark configuration. - """ - data_request_formatter = ( +def run(**kwargs): + request_type = kwargs.pop("request_type", None) + request_formatter_kwargs = kwargs.pop("request_formatter_kwargs", None) + kwargs["data_request_formatter"] = ( request_type if not request_formatter_kwargs else {"request_type": request_type, **request_formatter_kwargs} ) + if not isinstance((data := kwargs.get("data")), list | tuple): + kwargs["data"] = [data] if data else [] + + if not isinstance((data_args := kwargs.get("data_args")), list | tuple): + kwargs["data_args"] = [data_args] if data_args else None + + if ( + not (rate := kwargs.get("rate")) + or len(rate) == 0 + or (len(rate) == 1 and not rate[0]) + ): + kwargs["rate"] = None + elif len(rate) == 1: + kwargs["rate"] = rate[0] + + disable_console_outputs = kwargs.pop("disable_console_outputs", False) + display_scheduler_stats = kwargs.pop("display_scheduler_stats", False) + disable_progress = kwargs.pop("disable_progress", False) + if uvloop is not None: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run( benchmark_generative_text( - target=target, - data=list(data), - # Benchmark configuration - profile=profile, - rate=rate, - # Backend configuration - backend=backend, - backend_kwargs=backend_kwargs, - model=model, - # Data configuration - processor=processor, - processor_args=processor_args, - data_args=data_args, - data_samples=data_samples, - data_column_mapper=data_column_mappings, - data_request_formatter=data_request_formatter, - data_sampler=data_sampler, - data_num_workers=data_num_workers, - dataloader_kwargs=dataloader_kwargs, - random_seed=random_seed, - # Output configuration - output_path=output_path, - output_formats=output_formats, - # Updates configuration + args=BenchmarkGenerativeTextArgs.create( + scenario=kwargs.pop("scenario", None), **kwargs + ), progress=( GenerativeConsoleBenchmarkerProgress( display_scheduler_stats=display_scheduler_stats @@ -511,22 +409,18 @@ def run( if not disable_progress else None ), - print_updates=not disable_console_outputs, - # Benchmarker configuration - sample_requests=sample_requests, - warmup=warmup, - cooldown=cooldown, - # Constraints configuration - max_seconds=max_seconds, - max_requests=max_requests, - max_errors=max_errors, - max_error_rate=max_error_rate, - max_global_error_rate=max_global_error_rate, + console=Console() if not disable_console_outputs else None, ) ) -@benchmark.command("from-file", help="Load a saved benchmark report.") +@benchmark.command( + "from-file", + help=( + "Load a saved benchmark report and optionally re-export to other formats. " + "PATH: Path to the saved benchmark report file (default: ./benchmarks.json)." + ), +) @click.argument( "path", type=click.Path(file_okay=True, dir_okay=False, exists=True), @@ -537,13 +431,9 @@ def run( type=click.Path(), default=Path.cwd(), help=( - "Allows re-exporting the benchmarks to other formats. " - "The path to save the output formats to, if the format is a file type. " - "If it is a directory, it will save all output formats selected under it. " - "If it is a file, it will save the corresponding output format to that file. " - "Any output formats that were given that do not match the file extension will " - "be saved in the parent directory of the file path. " - "Defaults to the current working directory. " + "Directory or file path to save re-exported benchmark results. " + "If a directory, all output formats will be saved there. " + "If a file, the matching format will be saved to that file." ), ) @click.option( @@ -551,57 +441,33 @@ def run( multiple=True, type=str, default=("console", "json"), # ("console", "json", "html", "csv") - help=( - "The output formats to use for the benchmark results. " - "Defaults to console, json, html, and csv where the file formats " - "will be saved at the specified output path." - ), + help="Output formats for benchmark results (e.g., console, json, html, csv).", ) def from_file(path, output_path, output_formats): - """ - Load and optionally re-export a previously saved benchmark report. - - Imports benchmark results from a saved file and provides optional conversion - to different output formats. Supports JSON, YAML, and CSV export formats - based on the output file extension. - """ asyncio.run(reimport_benchmarks_report(path, output_path, output_formats)) @cli.command( - short_help="Prints environment variable settings.", - help=( - "Print out the available configuration settings that can be set " - "through environment variables." - ), + short_help="Show configuration settings.", + help="Display environment variables for configuring GuideLLM behavior.", ) def config(): - """ - Display available GuideLLM configuration environment variables. - - Prints a comprehensive list of all environment variables that can be used - to configure GuideLLM behavior, including their current values, defaults, - and descriptions. - """ print_config() -@cli.group(help="General preprocessing tools and utilities.") +@cli.group(help="Tools for preprocessing datasets for use in benchmarks.") def preprocess(): - """ - Preprocessing command group for dataset preparation and transformation. - - This command group provides utilities for converting, processing, and - optimizing datasets for use in GuideLLM benchmarks. Includes functionality - for token count adjustments, format conversions, and data validation. - """ + """Dataset preprocessing utilities.""" @preprocess.command( + "dataset", help=( - "Convert a dataset to have specific prompt and output token sizes.\n" - "DATA: Path to the input dataset or dataset ID.\n" - "OUTPUT_PATH: Path to save the converted dataset, including file suffix." + "Process a dataset to have specific prompt and output token sizes. " + "Supports multiple strategies for handling prompts and optional " + "Hugging Face Hub upload.\n\n" + "DATA: Path to the input dataset or dataset ID.\n\n" + "OUTPUT_PATH: Path to save the processed dataset, including file suffix." ), context_settings={"auto_envvar_prefix": "GUIDELLM"}, ) @@ -619,81 +485,70 @@ def preprocess(): "--processor", type=str, required=True, - help=( - "The processor or tokenizer to use to calculate token counts for statistics " - "and synthetic data generation." - ), + help="Processor or tokenizer name for calculating token counts.", ) @click.option( "--processor-args", default=None, callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the processor constructor " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to the processor constructor.", ) @click.option( "--data-args", callback=cli_tools.parse_json, - help=( - "A JSON string containing any arguments to pass to the dataset creation " - "as a dict with **kwargs." - ), + help="JSON string of arguments to pass to dataset creation.", ) @click.option( "--short-prompt-strategy", type=click.Choice([s.value for s in ShortPromptStrategy]), default=ShortPromptStrategy.IGNORE.value, show_default=True, - help="Strategy to handle prompts shorter than the target length. ", + help="Strategy for handling prompts shorter than target length.", ) @click.option( "--pad-char", type=str, default="", callback=decode_escaped_str, - help="The token to pad short prompts with when using the 'pad' strategy.", + help="Character to pad short prompts with when using 'pad' strategy.", ) @click.option( "--concat-delimiter", type=str, default="", help=( - "The delimiter to use when concatenating prompts that are too short." - " Used when strategy is 'concatenate'." + "Delimiter for concatenating short prompts (used with 'concatenate' strategy)." ), ) @click.option( "--prompt-tokens", type=str, default=None, - help="Prompt tokens config (JSON, YAML file or key=value string)", + help="Prompt tokens configuration (JSON, YAML file, or key=value string).", ) @click.option( "--output-tokens", type=str, default=None, - help="Output tokens config (JSON, YAML file or key=value string)", + help="Output tokens configuration (JSON, YAML file, or key=value string).", ) @click.option( "--push-to-hub", is_flag=True, - help="Set this flag to push the converted dataset to the Hugging Face Hub.", + help="Push the processed dataset to Hugging Face Hub.", ) @click.option( "--hub-dataset-id", type=str, default=None, - help="The Hugging Face Hub dataset ID to push to. " - "Required if --push-to-hub is used.", + help=("Hugging Face Hub dataset ID for upload (required if --push-to-hub is set)."), ) @click.option( "--random-seed", type=int, default=42, show_default=True, - help="Random seed for prompt token sampling and output tokens sampling.", + help="Random seed for reproducible token sampling.", ) def dataset( data, @@ -710,13 +565,6 @@ def dataset( hub_dataset_id, random_seed, ): - """ - Convert and process datasets for specific prompt and output token requirements. - - Transforms datasets to meet target token length specifications using various - strategies for handling short prompts and output length adjustments. Supports - multiple input formats and can optionally push results to Hugging Face Hub. - """ process_dataset( data=data, output_path=output_path, @@ -734,71 +582,87 @@ def dataset( ) -@cli.command(help="Start the GuideLLM mock OpenAI/vLLM server for testing.") -@click.option("--host", default="127.0.0.1", help="Host to bind the server to") -@click.option("--port", default=8000, type=int, help="Port to bind the server to") -@click.option("--workers", default=1, type=int, help="Number of worker processes") +@cli.command( + "mock-server", + help=( + "Start a mock OpenAI/vLLM-compatible server for testing. " + "Simulates model inference with configurable latency and token generation." + ), +) +@click.option( + "--host", + default="127.0.0.1", + help="Host address to bind the server to.", +) +@click.option( + "--port", + default=8000, + type=int, + help="Port number to bind the server to.", +) +@click.option( + "--workers", + default=1, + type=int, + help="Number of worker processes.", +) +@click.option( + "--model", + default="llama-3.1-8b-instruct", + help="Name of the model to mock.", +) @click.option( - "--model", default="llama-3.1-8b-instruct", help="The name of the model to mock" + "--processor", + default=None, + help="Processor or tokenizer to use for requests.", ) -@click.option("--processor", default=None, help="The processor to use for requests") @click.option( "--request-latency", default=3, type=float, - help="Request latency in seconds for non-streaming requests", + help="Request latency in seconds for non-streaming requests.", ) @click.option( "--request-latency-std", default=0, type=float, - help=( - "Request latency standard deviation (normal distribution) " - "in seconds for non-streaming requests" - ), + help="Request latency standard deviation in seconds (normal distribution).", ) @click.option( "--ttft-ms", default=150, type=float, - help="Time to first token in milliseconds for streaming requests", + help="Time to first token in milliseconds for streaming requests.", ) @click.option( "--ttft-ms-std", default=0, type=float, - help=( - "Time to first token standard deviation (normal distribution) in milliseconds" - ), + help="Time to first token standard deviation in milliseconds.", ) @click.option( "--itl-ms", default=10, type=float, - help="Inter token latency in milliseconds for streaming requests", + help="Inter-token latency in milliseconds for streaming requests.", ) @click.option( "--itl-ms-std", default=0, type=float, - help=( - "Inter token latency standard deviation (normal distribution) " - "in milliseconds for streaming requests" - ), + help="Inter-token latency standard deviation in milliseconds.", ) @click.option( "--output-tokens", default=128, type=int, - help="Output tokens for streaming requests", + help="Number of output tokens for streaming requests.", ) @click.option( "--output-tokens-std", default=0, type=float, - help=( - "Output tokens standard deviation (normal distribution) for streaming requests" - ), + help="Output tokens standard deviation (normal distribution).", ) def mock_server( host: str, @@ -815,15 +679,6 @@ def mock_server( output_tokens: int, output_tokens_std: float, ): - """ - Start a GuideLLM mock OpenAI/vLLM-compatible server for testing and development. - - Launches a mock server that simulates model inference with configurable latency - characteristics, token generation patterns, and response timing. Useful for - testing GuideLLM benchmarks without requiring actual model deployment or for - development scenarios requiring predictable server behavior. - """ - config = MockServerConfig( host=host, port=port, diff --git a/src/guidellm/benchmark/__init__.py b/src/guidellm/benchmark/__init__.py index 4c7cc4a5..ef7b2900 100644 --- a/src/guidellm/benchmark/__init__.py +++ b/src/guidellm/benchmark/__init__.py @@ -1,3 +1,15 @@ +""" +Benchmark execution and performance analysis framework. + +Provides comprehensive benchmarking capabilities for LLM inference workloads, +including profile-based execution strategies, metrics collection and aggregation, +progress tracking, and multi-format output generation. Supports synchronous, +asynchronous, concurrent, sweep, and throughput-based benchmarking profiles for +evaluating model performance under various load conditions. +""" + +from __future__ import annotations + from .benchmarker import Benchmarker from .entrypoints import benchmark_generative_text, reimport_benchmarks_report from .output import ( @@ -16,10 +28,12 @@ ThroughputProfile, ) from .progress import BenchmarkerProgress, GenerativeConsoleBenchmarkerProgress +from .scenarios import get_builtin_scenarios from .schemas import ( Benchmark, - BenchmarkArgs, + BenchmarkerArgs, BenchmarkerDict, + BenchmarkGenerativeTextArgs, BenchmarkSchedulerStats, EstimatedBenchmarkState, GenerativeAudioMetricsSummary, @@ -35,9 +49,10 @@ __all__ = [ "AsyncProfile", "Benchmark", - "BenchmarkArgs", + "BenchmarkGenerativeTextArgs", "BenchmarkSchedulerStats", "Benchmarker", + "BenchmarkerArgs", "BenchmarkerDict", "BenchmarkerProgress", "ConcurrentProfile", @@ -61,7 +76,6 @@ "SynchronousProfile", "ThroughputProfile", "benchmark_generative_text", - "enable_scenarios", "get_builtin_scenarios", "reimport_benchmarks_report", ] diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index 6a5a5627..35b9cbf1 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -3,16 +3,9 @@ Provides the core benchmarking engine that coordinates request scheduling, data aggregation, and result compilation across different execution strategies -and environments. - -Classes: - Benchmarker: Abstract benchmark orchestrator for request processing workflows. - -Type Variables: - BenchmarkT: Generic benchmark result type. - RequestT: Generic request object type. - RequestTimingsT: Generic request timing object type. - ResponseT: Generic response object type. +and environments. The Benchmarker acts as the primary workflow coordinator, +managing the complete benchmark lifecycle from request submission through +result compilation while supporting thread-safe singleton operations. """ from __future__ import annotations @@ -25,7 +18,7 @@ from guidellm.benchmark.profile import Profile from guidellm.benchmark.progress import BenchmarkerProgress from guidellm.benchmark.schemas import ( - BenchmarkArgs, + BenchmarkerArgs, BenchmarkT, EstimatedBenchmarkState, ) @@ -50,12 +43,11 @@ class Benchmarker( """ Abstract benchmark orchestrator for request processing workflows. - Coordinates the execution of benchmarking runs across different scheduling + Coordinates execution of benchmarking runs across different scheduling strategies, aggregating metrics and compiling results. Manages the complete - benchmark lifecycle from request submission through result compilation. - - Implements thread-safe singleton pattern to ensure consistent state across - concurrent benchmark operations. + benchmark lifecycle from request submission through result compilation while + implementing thread-safe singleton pattern to ensure consistent state across + concurrent operations. """ async def run( @@ -74,18 +66,23 @@ async def run( """ Execute benchmark runs across multiple scheduling strategies. - Orchestrates the complete benchmark workflow: iterates through scheduling - strategies from the profile, executes requests through the scheduler, - aggregates metrics, and compiles final benchmark results. - - :param requests: Request datasets for processing across strategies. - :param backend: Backend interface for request processing. - :param profile: Benchmark profile defining strategies and constraints. - :param environment: Execution environment for coordination. - :param benchmark_aggregators: Metric aggregation functions by name. - :param benchmark_class: Class for constructing final benchmark objects. - :yield: Tuples of (metrics_update, benchmark_result, strategy, state). - :raises Exception: If benchmark execution or compilation fails. + Orchestrates the complete benchmark workflow by iterating through scheduling + strategies from the profile, executing requests through the scheduler, + aggregating metrics, and compiling final benchmark results. + + :param benchmark_class: Class for constructing final benchmark objects + :param requests: Request datasets for processing across strategies + :param backend: Backend interface for request processing + :param profile: Benchmark profile defining strategies and constraints + :param environment: Execution environment for coordination + :param progress: Optional progress tracker for benchmark lifecycle events + :param sample_requests: Number of sample requests to use for estimation + :param warmup: Optional warmup duration in seconds before benchmarking + :param cooldown: Optional cooldown duration in seconds after benchmarking + :param prefer_response_metrics: Whether to prefer response-based metrics over + request-based metrics + :yield: Compiled benchmark results for each strategy execution + :raises Exception: If benchmark execution or compilation fails """ with self.thread_lock: if progress: @@ -99,7 +96,7 @@ async def run( if progress: await progress.on_benchmark_start(strategy) - args = BenchmarkArgs( + args = BenchmarkerArgs( run_id=run_id, run_index=len(profile.completed_strategies), sample_requests=sample_requests, @@ -137,7 +134,7 @@ async def run( await progress.on_benchmark_update( estimated_state, scheduler_state ) - except Exception as err: + except Exception as err: # noqa: BLE001 logger.error( f"Error updating benchmark estimate/progress: {err}" ) diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 61dfa680..1962f552 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -1,3 +1,15 @@ +""" +High-level entry points for executing generative text benchmarks. + +This module provides the primary interface for running generative text benchmarks +through the `benchmark_generative_text` function and re-importing existing benchmark +reports via `reimport_benchmarks_report`. It orchestrates the initialization and +coordination of backends, data loaders, profiles, and output formats to execute +comprehensive benchmarking workflows. The module handles all resolution logic for +converting user-provided arguments into fully configured components ready for +benchmarking execution. +""" + from __future__ import annotations from collections.abc import Callable @@ -5,14 +17,19 @@ from typing import Any, Literal from torch.utils.data import Sampler +from transformers import PreTrainedTokenizerBase +from typing_extensions import TypeAliasType from guidellm.backends import Backend, BackendType from guidellm.benchmark.benchmarker import Benchmarker from guidellm.benchmark.output import GenerativeBenchmarkerOutput from guidellm.benchmark.profile import Profile, ProfileType -from guidellm.benchmark.progress import BenchmarkerProgress -from guidellm.benchmark.schemas import GenerativeBenchmark, GenerativeBenchmarksReport -from guidellm.benchmark.types import OutputFormatT, ProcessorInputT +from guidellm.benchmark.progress import GenerativeConsoleBenchmarkerProgress +from guidellm.benchmark.schemas import ( + BenchmarkGenerativeTextArgs, + GenerativeBenchmark, + GenerativeBenchmarksReport, +) from guidellm.data import ( DataLoader, DatasetPreprocessor, @@ -35,12 +52,17 @@ ] -# Helper Variables - -_CURRENT_WORKING_DIR = Path.cwd() +# Helper Functions +OutputFormatT = TypeAliasType( + "OutputFormatT", + tuple[str, ...] + | list[str] + | dict[str, str | dict[str, Any] | GenerativeBenchmarkerOutput] + | None, +) -# Helper Functions +ProcessorInputT = TypeAliasType("ProcessorInputT", str | Path | PreTrainedTokenizerBase) async def resolve_backend( @@ -50,6 +72,16 @@ async def resolve_backend( console: Console | None = None, **backend_kwargs: dict[str, Any], ) -> tuple[Backend, str | None]: + """ + Initialize and validate a backend instance for benchmarking. + + :param backend: Backend type identifier or pre-configured Backend instance + :param target: Target endpoint URL or connection string for the backend + :param model: Model identifier to use with the backend, or None to use default + :param console: Console instance for progress reporting, or None + :param backend_kwargs: Additional keyword arguments passed to backend initialization + :return: Tuple of initialized Backend instance and resolved model identifier + """ console_step = ( console.print_update_step(title=f"Initializing backend {backend}") if console @@ -94,6 +126,14 @@ async def resolve_processor( model: str | None, console: Console | None = None, ) -> ProcessorInputT | None: + """ + Resolve the processor for tokenization, defaulting to model if not provided. + + :param processor: Processor identifier, path, tokenizer instance, or None + :param model: Model identifier to use as fallback processor + :param console: Console instance for progress reporting, or None + :return: Resolved processor or None if neither processor nor model provided + """ console_step = ( console.print_update_step(title=f"Resolving processor {processor}") if console @@ -137,6 +177,25 @@ async def resolve_request_loader( console: Console | None = None, **dataloader_kwargs: dict[str, Any] | None, ) -> DataLoader[GenerationRequest]: + """ + Construct a DataLoader for GenerationRequest objects from raw data inputs. + + :param data: List of data sources to load requests from + :param model: Model identifier for request formatting + :param data_args: Arguments for each data source in the data list + :param data_samples: Number of samples to draw from the dataset + :param processor: Processor for tokenization operations + :param processor_args: Arguments for processor initialization + :param data_column_mapper: Preprocessor or mapping for standardizing column names + :param data_request_formatter: Preprocessor or config for formatting requests + :param data_collator: Collation function or type for batching requests + :param data_sampler: Sampler instance or type for data sampling + :param data_num_workers: Number of worker processes for data loading + :param random_seed: Seed for reproducible random operations + :param console: Console instance for progress reporting, or None + :param dataloader_kwargs: Additional arguments passed to DataLoader initialization + :return: Configured DataLoader instance for GenerationRequest objects + """ console_step = ( console.print_update_step(title=f"Initializing request loader from {data}") if console @@ -210,6 +269,22 @@ async def resolve_profile( max_global_error_rate: float | None, console: Console | None = None, ) -> Profile: + """ + Resolve and configure a benchmark profile with rate and constraint settings. + + :param profile: Profile type identifier or pre-configured Profile instance + :param rate: Request rate(s) for the benchmark execution + :param random_seed: Seed for reproducible random operations + :param constraints: Dictionary of constraint initializers for benchmark limits + :param max_seconds: Maximum duration in seconds for the benchmark + :param max_requests: Maximum number of requests to process + :param max_errors: Maximum number of errors before stopping + :param max_error_rate: Maximum error rate threshold before stopping + :param max_global_error_rate: Maximum global error rate threshold before stopping + :param console: Console instance for progress reporting, or None + :return: Configured Profile instance ready for benchmarking + :raises ValueError: If constraints are provided with a pre-configured Profile + """ console_step = ( console.print_update_step(title=f"Resolving profile {profile}") if console @@ -253,6 +328,14 @@ async def resolve_output_formats( output_path: str | Path | None, console: Console | None = None, ) -> dict[str, GenerativeBenchmarkerOutput]: + """ + Resolve output format specifications into configured output handler instances. + + :param output_formats: Specification of desired output formats + :param output_path: Base path for output file generation, or None for default + :param console: Console instance for progress reporting, or None + :return: Dictionary mapping format names to configured output handler instances + """ console_step = ( console.print_update_step(title="Resolving output formats") if console else None ) @@ -271,120 +354,93 @@ async def resolve_output_formats( return resolved -async def benchmark_generative_text( # noqa: C901, PLR0915, PLR0912 - # Required - target: str, - data: list[Any], - # Benchmark configuration - profile: StrategyType | ProfileType | Profile = "sweep", - rate: float | list[float] | None = None, - # Backend configuration - backend: BackendType | Backend = "openai_http", - backend_kwargs: dict[str, Any] | None = None, - model: str | None = None, - # Data configuration - processor: ProcessorInputT | None = None, - processor_args: dict[str, Any] | None = None, - data_args: list[dict[str, Any]] | None = None, - data_samples: int = -1, - data_column_mapper: ( - DatasetPreprocessor | dict[str, str] | Literal["generative_column_mapper"] - ) = "generative_column_mapper", - data_request_formatter: ( - DatasetPreprocessor | dict[str, str] | str - ) = "chat_completions", - data_collator: Callable | Literal["generative"] | None = "generative", - data_sampler: Sampler[int] | Literal["shuffle"] | None = None, - data_num_workers: int | None = None, - dataloader_kwargs: dict[str, Any] | None = None, - random_seed: int = 42, - # Output configuration - output_path: str | Path | None = _CURRENT_WORKING_DIR, - output_formats: ( - tuple[str, ...] - | list[str] - | dict[str, str | dict[str, Any] | GenerativeBenchmarkerOutput] - | None - ) = ("console", "json", "html", "csv"), - # Updates configuration - progress: BenchmarkerProgress | None = None, - print_updates: bool = False, - # Benchmarker configuration - benchmark_cls: type[GenerativeBenchmark] = GenerativeBenchmark, - sample_requests: int | None = 10, - warmup: float | None = None, - cooldown: float | None = None, - # Constraints configuration - max_seconds: int | float | None = None, - max_requests: int | None = None, - max_errors: int | None = None, - max_error_rate: float | None = None, - max_global_error_rate: float | None = None, +# Main Entrypoints Functions + + +async def benchmark_generative_text( + args: BenchmarkGenerativeTextArgs, + progress: GenerativeConsoleBenchmarkerProgress | None = None, + console: Console | None = None, **constraints: dict[str, ConstraintInitializer | Any], ) -> tuple[GenerativeBenchmarksReport, dict[str, Any]]: - console = Console(quiet=not print_updates) + """ + Execute a comprehensive generative text benchmarking workflow. + + Orchestrates the full benchmarking pipeline by resolving all components (backend, + data loader, profile, outputs) from provided arguments, executing the benchmark + runs, and finalizing results in the specified output formats. + + :param args: Configuration arguments for the benchmark execution + :param progress: Progress tracker for benchmark execution, or None for no tracking + :param console: Console instance for status reporting, or None for silent operation + :param constraints: Additional constraint initializers for benchmark limits + :return: Tuple of GenerativeBenchmarksReport and dictionary of output format results + """ backend, model = await resolve_backend( - backend=backend, - target=target, - model=model, + backend=args.backend, + target=args.target, + model=args.model, console=console, - **(backend_kwargs or {}), + **(args.backend_kwargs or {}), ) processor = await resolve_processor( - processor=processor, model=model, console=console + processor=args.processor, model=model, console=console ) request_loader = await resolve_request_loader( - data=data, + data=args.data, model=model, - data_args=data_args, - data_samples=data_samples, + data_args=args.data_args, + data_samples=args.data_samples, processor=processor, - processor_args=processor_args, - data_column_mapper=data_column_mapper, - data_request_formatter=data_request_formatter, - data_collator=data_collator, - data_sampler=data_sampler, - data_num_workers=data_num_workers, - random_seed=random_seed, + processor_args=args.processor_args, + data_column_mapper=args.data_column_mapper, + data_request_formatter=args.data_request_formatter, + data_collator=args.data_collator, + data_sampler=args.data_sampler, + data_num_workers=args.data_num_workers, + random_seed=args.random_seed, console=console, - **(dataloader_kwargs or {}), + **(args.dataloader_kwargs or {}), ) profile = await resolve_profile( - profile=profile, - rate=rate, - random_seed=random_seed, + profile=args.profile, + rate=args.rate, + random_seed=args.random_seed, constraints=constraints, - max_seconds=max_seconds, - max_requests=max_requests, - max_errors=max_errors, - max_error_rate=max_error_rate, - max_global_error_rate=max_global_error_rate, + max_seconds=args.max_seconds, + max_requests=args.max_requests, + max_errors=args.max_errors, + max_error_rate=args.max_error_rate, + max_global_error_rate=args.max_global_error_rate, console=console, ) output_formats = await resolve_output_formats( - output_formats=output_formats, output_path=output_path, console=console + output_formats=args.output_formats, + output_path=args.output_path, + console=console, ) - report = GenerativeBenchmarksReport() - console.print_update( - title="Setup complete, starting benchmarks...", status="success" - ) - console.print("\n\n") + report = GenerativeBenchmarksReport(args=args) + if console: + console.print_update( + title="Setup complete, starting benchmarks...", status="success" + ) + console.print("\n\n") benchmarker: Benchmarker[ GenerativeBenchmark, GenerationRequest, GenerationResponse ] = Benchmarker() async for benchmark in benchmarker.run( - benchmark_class=benchmark_cls, + benchmark_class=args.benchmark_cls, requests=request_loader, backend=backend, profile=profile, environment=NonDistributedEnvironment(), progress=progress, - sample_requests=sample_requests, - warmup=warmup, - cooldown=cooldown, - prefer_response_metrics=True, + sample_requests=args.sample_requests, + warmup=args.warmup, + cooldown=args.cooldown, + prefer_response_metrics=args.prefer_response_metrics, ): if benchmark: report.benchmarks.append(benchmark) @@ -394,13 +450,17 @@ async def benchmark_generative_text( # noqa: C901, PLR0915, PLR0912 output_result = await output.finalize(report) output_format_results[key] = output_result - console.print("\n\n") - console.print_update( - title=f"Benchmarking complete, generated {len(report.benchmarks)} benchmark(s)", - status="success", - ) - for key, value in output_format_results.items(): - console.print_update(title=f" {key:<8}: {value}", status="debug") + if console: + console.print("\n\n") + console.print_update( + title=( + "Benchmarking complete, generated " + f"{len(report.benchmarks)} benchmark(s)" + ), + status="success", + ) + for key, value in output_format_results.items(): + console.print_update(title=f" {key:<8}: {value}", status="debug") return report, output_format_results @@ -411,9 +471,12 @@ async def reimport_benchmarks_report( output_formats: OutputFormatT = ("console", "json", "html", "csv"), ) -> tuple[GenerativeBenchmarksReport, dict[str, Any]]: """ - The command-line entry point for re-importing and displaying an - existing benchmarks report. Can also specify an output format. - Assumes the file provided exists. + Load and re-export an existing benchmarks report in specified formats. + + :param file: Path to the existing benchmark report file to load + :param output_path: Base path for output file generation, or None for default + :param output_formats: Specification of desired output formats for the report + :return: Tuple of loaded GenerativeBenchmarksReport and dictionary of output results """ console = Console() diff --git a/src/guidellm/benchmark/profile.py b/src/guidellm/benchmark/profile.py index 8564afde..4b3f36fd 100644 --- a/src/guidellm/benchmark/profile.py +++ b/src/guidellm/benchmark/profile.py @@ -1,32 +1,17 @@ """ -Benchmarking profile configurations for coordinating multi-strategy execution. - -Provides configurable profile abstractions for orchestrating sequential and -parallel execution of different scheduling strategies during benchmarking, -with automatic strategy generation and constraint management. - -Classes: - Profile: Abstract base for multi-strategy benchmarking profiles. - SynchronousProfile: Single synchronous strategy execution profile. - ConcurrentProfile: Fixed-concurrency strategy execution profile. - ThroughputProfile: Maximum throughput strategy execution profile. - AsyncProfile: Rate-based asynchronous strategy execution profile. - SweepProfile: Adaptive multi-strategy sweep execution profile. - -Type Aliases: - ProfileType: Literal type for supported profile configurations. +Profile configurations for orchestrating multi-strategy benchmark execution. + +Provides configurable abstractions for coordinating sequential execution of +scheduling strategies during benchmarking workflows. Profiles automatically +generate strategies based on configuration parameters, manage runtime +constraints, and track completion state across the execution sequence. """ from __future__ import annotations from abc import ABC, abstractmethod from collections.abc import Generator -from typing import ( - TYPE_CHECKING, - Any, - ClassVar, - Literal, -) +from typing import TYPE_CHECKING, Any, ClassVar, Literal import numpy as np from pydantic import ( @@ -75,11 +60,14 @@ class Profile( ABC, ): """ - Abstract base for multi-strategy benchmarking execution profiles. + Abstract base for coordinating multi-strategy benchmark execution. - Coordinates sequential execution of scheduling strategies with automatic - strategy generation, constraint management, and completion tracking for - comprehensive benchmarking workflows. + Manages sequential execution of scheduling strategies with automatic strategy + generation, constraint management, and completion tracking. Subclasses define + specific execution patterns like synchronous, concurrent, throughput-focused, + rate-based async, or adaptive sweep profiles. + + :cvar schema_discriminator: Field name used for polymorphic deserialization """ schema_discriminator: ClassVar[str] = "type_" @@ -100,14 +88,14 @@ def create( **kwargs: Any, ) -> Profile: """ - Create a profile instance based on the specified type. + Factory method to create a profile instance based on type. - :param rate_type: The type of profile to create. - :param rate: Rate parameter for profile configuration. - :param random_seed: Random seed for stochastic strategies. - :param kwargs: Additional arguments for profile configuration. - :return: Configured profile instance for the specified type. - :raises ValueError: If the profile type is not registered. + :param rate_type: Profile type identifier to instantiate + :param rate: Rate configuration for the profile strategy + :param random_seed: Seed for stochastic strategy reproducibility + :param kwargs: Additional profile-specific configuration parameters + :return: Configured profile instance for the specified type + :raises ValueError: If rate_type is not registered """ profile_class: type[Profile] = cls.get_registered_object(rate_type) resolved_kwargs = profile_class.resolve_args( @@ -128,33 +116,31 @@ def resolve_args( """ Resolve and validate arguments for profile construction. - :param rate_type: The type of the profile. - :param rate: Rate parameter for configuration. - :param random_seed: Random seed for stochastic strategies. - :param kwargs: Additional arguments to resolve. - :return: Dictionary of resolved arguments for profile construction. + :param rate_type: Profile type identifier + :param rate: Rate configuration parameter + :param random_seed: Seed for stochastic strategies + :param kwargs: Additional arguments to resolve and validate + :return: Resolved arguments dictionary for profile initialization """ ... type_: Literal["profile"] = Field( - description="The type of benchmarking profile to use", + description="Profile type discriminator for polymorphic serialization", ) completed_strategies: list[SchedulingStrategy] = Field( default_factory=list, - description="The strategies that have completed execution", + description="Strategies that have completed execution in this profile", ) constraints: dict[str, Any | dict[str, Any] | ConstraintInitializer] | None = Field( default=None, - description="Runtime constraints to apply during strategy execution", + description="Runtime constraints applied to strategy execution", ) @computed_field # type: ignore[misc] @property def strategy_types(self) -> list[StrategyType]: """ - :return: List of all strategy types expected to be executed or have been - executed in this profile. By default, this returns just the - completed strategies. + :return: Strategy types executed or expected to execute in this profile """ return [strat.type_ for strat in self.completed_strategies] @@ -169,10 +155,10 @@ def strategies_generator( None, ]: """ - Generate strategies and constraints for sequential profile execution. + Generate strategies and constraints for sequential execution. - :return: Generator yielding (strategy, constraints) tuples and - receiving benchmark results from each execution. + :return: Generator yielding (strategy, constraints) tuples and receiving + benchmark results after each execution """ prev_strategy: SchedulingStrategy | None = None prev_benchmark: Benchmark | None = None @@ -197,11 +183,11 @@ def next_strategy( prev_benchmark: Benchmark | None, ) -> SchedulingStrategy | None: """ - Generate the next strategy to execute in the profile sequence. + Generate the next strategy in the profile execution sequence. - :param prev_strategy: The previously completed strategy. - :param prev_benchmark: Benchmark results from the previous strategy. - :return: Next strategy to execute, or None if profile is complete. + :param prev_strategy: Previously completed strategy instance + :param prev_benchmark: Benchmark results from previous strategy execution + :return: Next strategy to execute, or None if profile complete """ ... @@ -214,10 +200,10 @@ def next_strategy_constraints( """ Generate constraints for the next strategy execution. - :param next_strategy: The next strategy to be executed. - :param prev_strategy: The previously completed strategy. - :param prev_benchmark: Benchmark results from the previous strategy. - :return: Constraints dictionary for the next strategy, or None. + :param next_strategy: Strategy to be executed next + :param prev_strategy: Previously completed strategy instance + :param prev_benchmark: Benchmark results from previous strategy execution + :return: Constraints dictionary for next strategy, or None """ _ = (prev_strategy, prev_benchmark) # unused return ( @@ -281,12 +267,12 @@ def resolve_args( """ Resolve arguments for synchronous profile construction. - :param rate_type: The type/strategy of the profile (ignored). - :param rate: Rate parameter (must be None, will be stripped). - :param random_seed: Random seed (ignored and stripped). - :param kwargs: Additional arguments to pass through. - :return: Dictionary of resolved arguments. - :raises ValueError: If rate is not None. + :param rate_type: Profile type identifier (ignored) + :param rate: Rate parameter (must be None) + :param random_seed: Random seed (ignored) + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary + :raises ValueError: If rate is not None """ _ = (rate_type, random_seed) # unused if rate is not None: @@ -297,7 +283,7 @@ def resolve_args( @property def strategy_types(self) -> list[StrategyType]: """ - :return: The single synchronous strategy type. + :return: Single synchronous strategy type """ return [self.type_] @@ -309,9 +295,9 @@ def next_strategy( """ Generate synchronous strategy or None if already completed. - :param prev_strategy: The previously completed strategy (unused). - :param prev_benchmark: Benchmark results from the previous strategy (unused). - :return: SynchronousStrategy for the first execution, None afterward. + :param prev_strategy: Previously completed strategy (unused) + :param prev_benchmark: Benchmark results from previous execution (unused) + :return: SynchronousStrategy for first execution, None afterward """ _ = (prev_strategy, prev_benchmark) # unused if len(self.completed_strategies) >= 1: @@ -326,7 +312,7 @@ class ConcurrentProfile(Profile): type_: Literal["concurrent"] = "concurrent" # type: ignore[assignment] streams: list[PositiveInt] = Field( - description="Number of concurrent streams for request scheduling", + description="Concurrent stream counts for request scheduling", ) startup_duration: NonNegativeFloat = Field( default=0.0, @@ -347,20 +333,23 @@ def resolve_args( """ Resolve arguments for concurrent profile construction. - :param rate_type: The type/strategy of the profile (ignored). - :param rate: Rate parameter, remapped to streams. - :param random_seed: Random seed (ignored and stripped). - :param kwargs: Additional arguments to pass through. - :return: Dictionary of resolved arguments. - :raises ValueError: If rate is None. + :param rate_type: Profile type identifier (ignored) + :param rate: Rate parameter remapped to streams + :param random_seed: Random seed (ignored) + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary + :raises ValueError: If rate is None """ _ = (rate_type, random_seed) # unused - kwargs["streams"] = [int(r) for r in rate] if rate else None + rate = rate if isinstance(rate, list) or rate is None else [rate] + kwargs["streams"] = [int(stream) for stream in rate] if rate else None return kwargs @property def strategy_types(self) -> list[StrategyType]: - """Get concurrent strategy types for each configured stream count.""" + """ + :return: Concurrent strategy types for each configured stream count + """ return [self.type_] * len(self.streams) def next_strategy( @@ -371,9 +360,9 @@ def next_strategy( """ Generate concurrent strategy for the next stream count. - :param prev_strategy: The previously completed strategy (unused). - :param prev_benchmark: Benchmark results from the previous strategy (unused). - :return: ConcurrentStrategy with next stream count, or None if complete. + :param prev_strategy: Previously completed strategy (unused) + :param prev_benchmark: Benchmark results from previous execution (unused) + :return: ConcurrentStrategy with next stream count, or None if complete """ _ = (prev_strategy, prev_benchmark) # unused @@ -395,7 +384,7 @@ class ThroughputProfile(Profile): type_: Literal["throughput"] = "throughput" # type: ignore[assignment] max_concurrency: PositiveInt | None = Field( default=None, - description="Maximum number of concurrent requests to schedule", + description="Maximum concurrent requests to schedule", ) startup_duration: NonNegativeFloat = Field( default=0.0, @@ -416,11 +405,11 @@ def resolve_args( """ Resolve arguments for throughput profile construction. - :param rate_type: The type/strategy of the profile (ignored). - :param rate: Rate parameter to remap to max_concurrency. - :param random_seed: Random seed (ignored and stripped). - :param kwargs: Additional arguments to pass through. - :return: Dictionary of resolved arguments. + :param rate_type: Profile type identifier (ignored) + :param rate: Rate parameter remapped to max_concurrency + :param random_seed: Random seed (ignored) + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary """ _ = (rate_type, random_seed) # unused # Remap rate to max_concurrency, strip out random_seed @@ -431,7 +420,9 @@ def resolve_args( @property def strategy_types(self) -> list[StrategyType]: - """Get the single throughput strategy type.""" + """ + :return: Single throughput strategy type + """ return [self.type_] def next_strategy( @@ -442,9 +433,9 @@ def next_strategy( """ Generate throughput strategy or None if already completed. - :param prev_strategy: The previously completed strategy (unused). - :param prev_benchmark: Benchmark results from the previous strategy (unused). - :return: ThroughputStrategy for the first execution, None afterward. + :param prev_strategy: Previously completed strategy (unused) + :param prev_benchmark: Benchmark results from previous execution (unused) + :return: ThroughputStrategy for first execution, None afterward """ _ = (prev_strategy, prev_benchmark) # unused if len(self.completed_strategies) >= 1: @@ -458,13 +449,11 @@ def next_strategy( @Profile.register(["async", "constant", "poisson"]) class AsyncProfile(Profile): - """ - Rate-based asynchronous strategy execution profile with configurable patterns. - """ + """Rate-based asynchronous strategy execution profile with configurable patterns.""" type_: Literal["async", "constant", "poisson"] = "async" # type: ignore[assignment] strategy_type: Literal["constant", "poisson"] = Field( - description="Type of asynchronous strategy pattern to use", + description="Asynchronous strategy pattern type to use", ) rate: list[PositiveFloat] = Field( description="Request scheduling rate in requests per second", @@ -478,7 +467,7 @@ class AsyncProfile(Profile): ) max_concurrency: PositiveInt | None = Field( default=None, - description="Maximum number of concurrent requests to schedule", + description="Maximum concurrent requests to schedule", ) random_seed: int = Field( default=42, @@ -496,12 +485,12 @@ def resolve_args( """ Resolve arguments for async profile construction. - :param rate_type: The type/strategy of the profile. - :param rate: Rate parameter for the profile. - :param random_seed: Random seed for stochastic strategies. - :param kwargs: Additional arguments to pass through. - :return: Dictionary of resolved arguments. - :raises ValueError: If rate is None. + :param rate_type: Profile type identifier + :param rate: Rate configuration for the profile + :param random_seed: Seed for stochastic strategies + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary + :raises ValueError: If rate is None """ if rate is None: raise ValueError("AsyncProfile requires a rate parameter") @@ -516,13 +505,15 @@ def resolve_args( if rate_type in ["constant", "poisson"] else kwargs.get("strategy_type", "constant") ) - kwargs["rate"] = rate + kwargs["rate"] = rate if isinstance(rate, list) else [rate] kwargs["random_seed"] = random_seed return kwargs @property def strategy_types(self) -> list[StrategyType]: - """Get async strategy types for each configured rate.""" + """ + :return: Async strategy types for each configured rate + """ num_strategies = len(self.rate) return [self.strategy_type] * num_strategies @@ -534,11 +525,11 @@ def next_strategy( """ Generate async strategy for the next configured rate. - :param prev_strategy: The previously completed strategy (unused). - :param prev_benchmark: Benchmark results from the previous strategy (unused). + :param prev_strategy: Previously completed strategy (unused) + :param prev_benchmark: Benchmark results from previous execution (unused) :return: AsyncConstantStrategy or AsyncPoissonStrategy for next rate, - or None if all rates completed. - :raises ValueError: If strategy_type is neither 'constant' nor 'poisson'. + or None if all rates completed + :raises ValueError: If strategy_type is neither 'constant' nor 'poisson' """ _ = (prev_strategy, prev_benchmark) # unused @@ -566,9 +557,7 @@ def next_strategy( @Profile.register("sweep") class SweepProfile(Profile): - """ - Adaptive multi-strategy sweep execution profile with rate discovery. - """ + """Adaptive multi-strategy sweep execution profile with rate discovery.""" type_: Literal["sweep"] = "sweep" # type: ignore[assignment] sweep_size: int = Field( @@ -585,7 +574,7 @@ class SweepProfile(Profile): ) max_concurrency: PositiveInt | None = Field( default=None, - description="Maximum number of concurrent requests to schedule", + description="Maximum concurrent requests to schedule", ) random_seed: int = Field( default=42, @@ -605,7 +594,7 @@ class SweepProfile(Profile): ) measured_rates: list[float] = Field( default_factory=list, - description="Calculated interpolated rates between synchronous and throughput", + description="Interpolated rates between synchronous and throughput", ) @classmethod @@ -619,11 +608,11 @@ def resolve_args( """ Resolve arguments for sweep profile construction. - :param rate_type: The type/strategy for async strategies in the sweep. - :param rate: Rate parameter (ignored for sweep). - :param random_seed: Random seed for stochastic strategies. - :param kwargs: Additional arguments to pass through. - :return: Dictionary of resolved arguments. + :param rate_type: Async strategy type for sweep execution + :param rate: Rate parameter specifying sweep size (if provided) + :param random_seed: Seed for stochastic strategies + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary """ sweep_size_from_rate = int(rate[0]) if rate else settings.default_sweep_number kwargs["sweep_size"] = kwargs.get("sweep_size", sweep_size_from_rate) @@ -634,7 +623,9 @@ def resolve_args( @property def strategy_types(self) -> list[StrategyType]: - """Get strategy types for the complete sweep sequence.""" + """ + :return: Strategy types for the complete sweep sequence + """ types = ["synchronous", "throughput"] types += [self.strategy_type] * (self.sweep_size - len(types)) return types @@ -653,13 +644,13 @@ def next_strategy( """ Generate the next strategy in the adaptive sweep sequence. - Executes synchronous and throughput strategies first to measure - baseline rates, then generates interpolated rates for async strategies. + Executes synchronous and throughput strategies first to measure baseline + rates, then generates interpolated rates for async strategies. - :param prev_strategy: The previously completed strategy. - :param prev_benchmark: Benchmark results from the previous strategy. - :return: Next strategy in sweep sequence, or None if complete. - :raises ValueError: If strategy_type is neither 'constant' nor 'poisson'. + :param prev_strategy: Previously completed strategy instance + :param prev_benchmark: Benchmark results from previous strategy execution + :return: Next strategy in sweep sequence, or None if complete + :raises ValueError: If strategy_type is neither 'constant' nor 'poisson' """ if prev_strategy is None: return SynchronousStrategy() diff --git a/src/guidellm/benchmark/scenario.py b/src/guidellm/benchmark/scenario.py deleted file mode 100644 index 59cdef27..00000000 --- a/src/guidellm/benchmark/scenario.py +++ /dev/null @@ -1,169 +0,0 @@ -from __future__ import annotations - -import json -from collections.abc import Callable -from functools import cache, wraps -from inspect import Parameter, signature -from pathlib import Path -from typing import Annotated, Any, Literal, TypeVar - -import yaml -from loguru import logger -from pydantic import BeforeValidator, Field, PositiveFloat, PositiveInt - -from guidellm.backends import Backend, BackendType -from guidellm.benchmark.profile import Profile, ProfileType -from guidellm.benchmark.types import ProcessorInputT -from guidellm.scheduler import StrategyType -from guidellm.utils import StandardBaseModel - -__all__ = [ - "GenerativeTextScenario", - "Scenario", - "enable_scenarios", - "get_builtin_scenarios", -] - -SCENARIO_DIR = Path(__file__).parent / "scenarios/" - - -@cache -def get_builtin_scenarios() -> list[str]: - """Returns list of builtin scenario names.""" - return [p.stem for p in SCENARIO_DIR.glob("*.json")] - - -def parse_float_list(value: str | float | list[float]) -> list[float]: - """ - Parse a comma separated string to a list of float - or convert single float list of one or pass float - list through. - """ - if isinstance(value, int | float): - return [value] - elif isinstance(value, list): - return value - - values = value.split(",") if "," in value else [value] - - try: - return [float(val) for val in values] - except ValueError as err: - raise ValueError( - "must be a number or comma-separated list of numbers." - ) from err - - -T = TypeVar("T", bound="Scenario") - - -class Scenario(StandardBaseModel): - """ - Parent Scenario class with common options for all benchmarking types. - """ - - target: str - - @classmethod - def get_default(cls: type[T], field: str) -> Any: - """Get default values for model fields""" - return cls.model_fields[field].default - - @classmethod - def from_file(cls: type[T], filename: Path, overrides: dict | None = None) -> T: - """ - Attempt to create a new instance of the model using - data loaded from json or yaml file. - """ - try: - with filename.open() as f: - if str(filename).endswith(".json"): - data = json.load(f) - else: # Assume everything else is yaml - data = yaml.safe_load(f) - except (json.JSONDecodeError, yaml.YAMLError) as e: - logger.error(f"Failed to parse {filename} as type {cls.__name__}") - raise ValueError(f"Error when parsing file: {filename}") from e - - data.update(overrides or {}) - return cls.model_validate(data) - - @classmethod - def from_builtin(cls: type[T], name: str, overrides: dict | None = None) -> T: - filename = SCENARIO_DIR / f"{name}.json" - - if not filename.is_file(): - raise ValueError(f"{name} is not a valid builtin scenario") - - return cls.from_file(filename, overrides) - - -class GenerativeTextScenario(Scenario): - """ - Scenario class for generative text benchmarks. - """ - - class Config: - # NOTE: This prevents errors due to unvalidatable - # types like PreTrainedTokenizerBase - arbitrary_types_allowed = True - - data: Any - profile: StrategyType | ProfileType | Profile - rate: Annotated[list[PositiveFloat] | None, BeforeValidator(parse_float_list)] = ( - None - ) - random_seed: int = 42 - # Backend configuration - backend: BackendType | Backend = "openai_http" - backend_kwargs: dict[str, Any] | None = None - model: str | None = None - # Data configuration - processor: ProcessorInputT | None = None - processor_args: dict[str, Any] | None = None - data_args: dict[str, Any] | None = None - data_sampler: Literal["random"] | None = None - # Aggregators configuration - warmup: Annotated[float | None, Field(gt=0, le=1)] = None - cooldown: Annotated[float | None, Field(gt=0, le=1)] = None - request_samples: PositiveInt | None = 20 - # Constraints configuration - max_seconds: PositiveFloat | PositiveInt | None = None - max_requests: PositiveInt | None = None - max_errors: PositiveInt | None = None - max_error_rate: PositiveFloat | None = None - max_global_error_rate: PositiveFloat | None = None - - -# Decorator function to apply scenario to a function -def enable_scenarios(func: Callable) -> Any: - @wraps(func) - async def decorator(*args, scenario: Scenario | None = None, **kwargs) -> Any: - if scenario is not None: - kwargs.update(scenario.model_dump()) - return await func(*args, **kwargs) - - # Modify the signature of the decorator to include the `scenario` argument - sig = signature(func) - params = list(sig.parameters.values()) - # Place `scenario` before `**kwargs` or any parameter with a default value - loc = next( - ( - i - for i, p in enumerate(params) - if p.kind is Parameter.VAR_KEYWORD or p.default is not Parameter.empty - ), - len(params), - ) - params.insert( - loc, - Parameter( - "scenario", - Parameter.POSITIONAL_OR_KEYWORD, - default=None, - annotation=Scenario | None, - ), - ) - decorator.__signature__ = sig.replace(parameters=params) # type: ignore [attr-defined] - - return decorator diff --git a/src/guidellm/benchmark/scenarios/__init__.py b/src/guidellm/benchmark/scenarios/__init__.py index e69de29b..030f9bbd 100644 --- a/src/guidellm/benchmark/scenarios/__init__.py +++ b/src/guidellm/benchmark/scenarios/__init__.py @@ -0,0 +1,40 @@ +""" +Builtin benchmark scenario definitions and discovery utilities. + +This module provides access to predefined benchmark scenarios stored as JSON files +within the scenarios directory. It enables discovery and retrieval of builtin +scenarios by name or filename, supporting both stem names (without extension) and +full filenames for flexible scenario loading. +""" + +from __future__ import annotations + +from functools import cache +from pathlib import Path +from typing import Annotated + +__all__ = ["SCENARIO_DIR", "get_builtin_scenarios"] + +SCENARIO_DIR: Annotated[ + Path, + "Directory path containing builtin scenario JSON files", +] = Path(__file__).parent + + +@cache +def get_builtin_scenarios() -> dict[str, Path]: + """ + Retrieve all builtin scenario definitions from the scenarios directory. + + Scans the scenarios directory for JSON files and returns a mapping of scenario + names to their file paths. Each scenario is indexed by both its stem name + (filename without extension) and full filename for convenient lookup. + + :return: Dictionary mapping scenario names and filenames to their Path objects + """ + builtin = {} + for path in SCENARIO_DIR.glob("*.json"): + builtin[path.stem] = path + builtin[path.name] = path + + return builtin diff --git a/src/guidellm/benchmark/scenarios/chat.json b/src/guidellm/benchmark/scenarios/chat.json index 7ed4ce16..69602af5 100644 --- a/src/guidellm/benchmark/scenarios/chat.json +++ b/src/guidellm/benchmark/scenarios/chat.json @@ -1,4 +1,4 @@ { "profile": "sweep", - "data": "prompt_tokens=512,prompt_tokens_stdev=128,prompt_tokens_min=1,prompt_tokens_max=1024,output_tokens=256,output_tokens_stdev=64,output_tokens_min=1,output_tokens_max=1024" + "data": ["prompt_tokens=512,prompt_tokens_stdev=128,prompt_tokens_min=1,prompt_tokens_max=1024,output_tokens=256,output_tokens_stdev=64,output_tokens_min=1,output_tokens_max=1024"] } diff --git a/src/guidellm/benchmark/scenarios/rag.json b/src/guidellm/benchmark/scenarios/rag.json index d790ce60..4e6b94a5 100644 --- a/src/guidellm/benchmark/scenarios/rag.json +++ b/src/guidellm/benchmark/scenarios/rag.json @@ -1,4 +1,4 @@ { "profile": "sweep", - "data": "prompt_tokens=4096,prompt_tokens_stdev=512,prompt_tokens_min=2048,prompt_tokens_max=6144,output_tokens=512,output_tokens_stdev=128,output_tokens_min=1,output_tokens_max=1024" + "data": ["prompt_tokens=4096,prompt_tokens_stdev=512,prompt_tokens_min=2048,prompt_tokens_max=6144,output_tokens=512,output_tokens_stdev=128,output_tokens_min=1,output_tokens_max=1024"] } diff --git a/src/guidellm/benchmark/schemas.py b/src/guidellm/benchmark/schemas.py index 2f2d8f98..8329d2bc 100644 --- a/src/guidellm/benchmark/schemas.py +++ b/src/guidellm/benchmark/schemas.py @@ -1,24 +1,13 @@ """ -Benchmark data models and metrics for performance measurement and analysis. +Benchmark data models and metrics for generative AI performance measurement. Provides comprehensive data structures for capturing, storing, and analyzing -benchmark results from scheduler executions. Includes timing measurements, -token statistics, and performance metrics for generative AI workloads. - -Classes: - BenchmarkSchedulerStats: Scheduler timing and performance statistics. - BenchmarkMetrics: Core benchmark metrics and distributions. - BenchmarkRequestStats: Individual request processing statistics. - Benchmark: Base benchmark result container with generic metrics. - GenerativeRequestStats: Request statistics for generative AI workloads. - GenerativeMetrics: Comprehensive metrics for generative benchmarks. - GenerativeBenchmark: Complete generative benchmark results and analysis. - GenerativeBenchmarksReport: Container for multiple benchmark results. - -Type Variables: - BenchmarkMetricsT: Generic benchmark metrics type. - BenchmarkRequestStatsT: Generic request statistics type. - BenchmarkT: Generic benchmark container type. +benchmark results from scheduler-driven generative AI workload executions. +Core abstractions include base benchmark interfaces, generative-specific +metrics with token/latency distributions, request-level statistics tracking, +and multi-benchmark reporting capabilities. These models enable detailed +performance analysis including throughput, latency, concurrency patterns, and +domain-specific metrics for text, image, video, and audio generation tasks. """ from __future__ import annotations @@ -28,27 +17,33 @@ import time import uuid from abc import ABC, abstractmethod -from collections.abc import Iterable +from collections.abc import Callable, Iterable from pathlib import Path from typing import Any, ClassVar, Literal, TypeVar, cast import yaml -from pydantic import Field, computed_field - -from guidellm.benchmark.profile import Profile +from pydantic import ConfigDict, Field, computed_field, model_serializer +from torch.utils.data import Sampler +from transformers import PreTrainedTokenizerBase + +from guidellm.backends import Backend, BackendType +from guidellm.benchmark.profile import Profile, ProfileType +from guidellm.benchmark.scenarios import get_builtin_scenarios +from guidellm.data import DatasetPreprocessor from guidellm.scheduler import ( BackendInterface, Environment, SchedulerState, SchedulingStrategy, + StrategyType, ) from guidellm.schemas import ( GenerationRequest, GenerationResponse, GenerativeRequestStats, RequestInfo, + UsageMetrics, ) -from guidellm.schemas.request import UsageMetrics from guidellm.utils import ( InfoMixin, StandardBaseDict, @@ -59,9 +54,10 @@ __all__ = [ "Benchmark", - "BenchmarkArgs", + "BenchmarkGenerativeTextArgs", "BenchmarkSchedulerStats", "BenchmarkT", + "BenchmarkerArgs", "BenchmarkerDict", "EstimatedBenchmarkState", "GenerativeAudioMetricsSummary", @@ -77,6 +73,19 @@ class EstimatedBenchmarkState(dict[str, Any]): + """ + Accumulator for real-time benchmark metrics during scheduler execution. + + Tracks incremental metrics, running averages, and time-based statistics as + requests are processed. Maintains grouped metrics for benchmark state, + benchmark-level metrics, and scheduler-level metrics with support for + average, rate, and time-averaged metric calculations. + + :cvar benchmark_state_group: Metric group key for benchmark state tracking + :cvar benchmark_metrics_group: Metric group key for benchmark-level metrics + :cvar scheduler_state_group: Metric group key for scheduler-level metrics + """ + benchmark_state_group: ClassVar[Literal["benchmark_state"]] = "benchmark_state" benchmark_metrics_group: ClassVar[Literal["benchmark_metrics"]] = ( "benchmark_metrics" @@ -89,6 +98,14 @@ def get_metric( key: str, default: int | float | None = None, ) -> int | float | None: + """ + Retrieve a grouped metric value by group and key. + + :param group: Metric group identifier + :param key: Metric key within the group + :param default: Value returned if metric doesn't exist + :return: The metric value or default if not found + """ return self.get(f"{group}_{key}", default) def set_metric( @@ -98,6 +115,15 @@ def set_metric( value: bool | int | float | None, start_val: bool | int | float | None = None, ) -> bool | int | float | None: + """ + Set a grouped metric value, optionally adjusting by a starting value. + + :param group: Metric group identifier + :param key: Metric key within the group + :param value: Metric value to set + :param start_val: Optional starting value to subtract from the metric value + :return: The adjusted metric value or None if value is None + """ if value is None: return None @@ -115,6 +141,15 @@ def add_avg_metric( start_val: bool | int | float | None = 0.0, count: int | None = 1, ): + """ + Add a value to a running average metric calculation. + + :param group: Metric group identifier + :param key: Metric key within the group + :param value: Value to add to the average + :param start_val: Optional starting value to subtract before adding + :param count: Number of observations this value represents + """ if value is None or count is None: return @@ -143,6 +178,17 @@ def add_avg_rate_metric( end_time: float | None = None, numerator_type: Literal["avg", "total", "count"] = "total", ): + """ + Add a value to a rate-based average metric calculation. + + :param group: Metric group identifier + :param key: Metric key within the group + :param value: Value to add to the average + :param start_val: Optional starting value to subtract before adding + :param start_time: Start time for rate calculation, defaults to current time + :param end_time: End time for rate calculation, defaults to current time + :param numerator_type: Type of numerator for rate calculation + """ if value is None: return @@ -183,6 +229,14 @@ def add_time_averaged_metric( value: bool | int | float | None, recorded_time: float | None = None, ): + """ + Add a value to a time-weighted average metric calculation. + + :param group: Metric group identifier + :param key: Metric key within the group + :param value: Value to add to the time-weighted average + :param recorded_time: Time of the observation, defaults to current time + """ if value is None: return @@ -218,7 +272,16 @@ def add_time_averaged_metric( ) -class BenchmarkArgs(StandardBaseDict): +class BenchmarkerArgs(StandardBaseDict): + """ + Configuration parameters for benchmark execution and request sampling. + + Defines run identification, request sampling strategy, warmup/cooldown phases, + and metric preferences for benchmark executions. Provides methods to determine + whether a request falls within warmup or cooldown periods based on time, + request count, or percentage-based thresholds. + """ + run_id: str = Field( default_factory=lambda: str(uuid.uuid4()), description="Unique identifier for the benchmark run", @@ -226,7 +289,9 @@ class BenchmarkArgs(StandardBaseDict): run_index: int = Field(default=0, description="Index of the benchmark run") sample_requests: int | None = Field( default=20, - description="Number of requests to sample and keep in the final benchmark for metrics", + description=( + "Number of requests to sample and keep in the final benchmark for metrics" + ), ) warmup: int | float | None = Field( default=None, description="Warmup time before benchmarking starts" @@ -242,6 +307,13 @@ class BenchmarkArgs(StandardBaseDict): def is_in_warmup( self, request_info: RequestInfo, scheduler_state: SchedulerState ) -> bool: + """ + Check if a request is in the warmup phase. + + :param request_info: Information about the current request + :param scheduler_state: Current state of the scheduler + :return: True if the request is in warmup phase, False otherwise + """ if self.warmup is not None and 0 < self.warmup < 1: # Percentage-based warmup return ( @@ -265,6 +337,13 @@ def is_in_warmup( def is_in_cooldown( self, request_info: RequestInfo, scheduler_state: SchedulerState ) -> bool: + """ + Check if a request is in the cooldown phase. + + :param request_info: Information about the current request + :param scheduler_state: Current state of the scheduler + :return: True if the request is in cooldown phase, False otherwise + """ if self.cooldown is not None and 0 < self.cooldown < 1: # Percentage-based cooldown return ( @@ -293,10 +372,24 @@ def is_in_cooldown( class Benchmark(ABC): + """ + Abstract base interface for benchmark result implementations. + + Defines the contract for benchmark classes to provide run metrics sampling, + request metrics sampling, real-time estimate updates, and final compilation + of benchmark results from scheduler execution data. + """ + @abstractmethod def get_run_metrics_sample( self, - ) -> dict[Literal["start_time", "end_time", "duration"], float]: ... + ) -> dict[Literal["start_time", "end_time", "duration"], float]: + """ + Get a sample of run-level timing metrics. + + :return: Dictionary containing start_time, end_time, and duration metrics + """ + ... @abstractmethod def get_request_metrics_sample( @@ -309,25 +402,43 @@ def get_request_metrics_sample( "request_concurrency", ], float, - ]: ... + ]: + """ + Get a sample of request-level performance metrics. + + :return: Dictionary containing request count, latency, throughput, and + concurrency metrics + """ + ... @classmethod @abstractmethod def update_estimate( cls, - args: BenchmarkArgs, + args: BenchmarkerArgs, state: EstimatedBenchmarkState, response: Any, request: Any, request_info: RequestInfo, scheduler_state: SchedulerState, - ): ... + ): + """ + Update real-time benchmark estimates with new request data. + + :param args: Benchmark configuration arguments + :param state: Current estimated benchmark state to update + :param response: Response received from the backend + :param request: Original request sent to the backend + :param request_info: Metadata about the request execution + :param scheduler_state: Current state of the scheduler + """ + ... @classmethod @abstractmethod def compile( cls, - args: BenchmarkArgs, + args: BenchmarkerArgs, estimated_state: EstimatedBenchmarkState, scheduler_state: SchedulerState, profile: Profile, @@ -336,7 +447,22 @@ def compile( environment: Environment, strategy: SchedulingStrategy, constraints: dict[str, dict[str, Any]], - ) -> Any: ... + ) -> Any: + """ + Compile final benchmark results from accumulated state. + + :param args: Benchmark configuration arguments + :param estimated_state: Accumulated benchmark state from execution + :param scheduler_state: Final state of the scheduler + :param profile: Benchmark profile configuration + :param requests: Collection of requests executed + :param backend: Backend interface used for execution + :param environment: Execution environment configuration + :param strategy: Scheduling strategy used + :param constraints: Execution constraints applied + :return: Compiled benchmark results instance + """ + ... BenchmarkT = TypeVar("BenchmarkT", bound=Benchmark) @@ -382,6 +508,12 @@ class BenchmarkSchedulerStats(StandardBaseDict): @classmethod def update_estimate(cls, state: EstimatedBenchmarkState, request_info: RequestInfo): + """ + Update estimated scheduler statistics with request timing information. + + :param state: Current estimated benchmark state to update + :param request_info: Metadata about the request execution with timing data + """ state.set_metric(group=cls.group_name, key="updated", value=True) state.add_avg_metric( group=cls.group_name, @@ -442,6 +574,13 @@ def update_estimate(cls, state: EstimatedBenchmarkState, request_info: RequestIn def compile( cls, estimated_state: EstimatedBenchmarkState, scheduler_state: SchedulerState ) -> BenchmarkSchedulerStats: + """ + Compile final scheduler statistics from accumulated state. + + :param estimated_state: Accumulated benchmark state with scheduler metrics + :param scheduler_state: Final state of the scheduler + :return: Compiled scheduler statistics instance + """ return BenchmarkSchedulerStats( start_time=scheduler_state.start_time, end_time=scheduler_state.end_time or scheduler_state.start_time, @@ -517,17 +656,42 @@ def compile( class GenerativeMetricsSummary(StandardBaseDict): - input: StatusDistributionSummary = Field(description="") - input_per_second: StatusDistributionSummary = Field(description="") - input_concurrency: StatusDistributionSummary = Field(description="") + """ + Statistical summaries for input, output, and total metrics. + + Provides distribution summaries across successful, incomplete, and errored + requests for absolute values, per-second rates, and concurrency levels. + """ - output: StatusDistributionSummary = Field(description="") - output_per_second: StatusDistributionSummary = Field(description="") - output_concurrency: StatusDistributionSummary = Field(description="") + input: StatusDistributionSummary = Field( + description="Distribution of input metric values" + ) + input_per_second: StatusDistributionSummary = Field( + description="Distribution of input metric rates per second" + ) + input_concurrency: StatusDistributionSummary = Field( + description="Distribution of concurrent input metric values" + ) - total: StatusDistributionSummary = Field(description="") - total_per_second: StatusDistributionSummary = Field(description="") - total_concurrency: StatusDistributionSummary = Field(description="") + output: StatusDistributionSummary = Field( + description="Distribution of output metric values" + ) + output_per_second: StatusDistributionSummary = Field( + description="Distribution of output metric rates per second" + ) + output_concurrency: StatusDistributionSummary = Field( + description="Distribution of concurrent output metric values" + ) + + total: StatusDistributionSummary = Field( + description="Distribution of total metric values (input + output)" + ) + total_per_second: StatusDistributionSummary = Field( + description="Distribution of total metric rates per second" + ) + total_concurrency: StatusDistributionSummary = Field( + description="Distribution of concurrent total metric values" + ) @classmethod def compile( @@ -537,6 +701,15 @@ def compile( input_values: list[int | float], output_values: list[int | float], ) -> GenerativeMetricsSummary: + """ + Compile generative metrics summary from request data. + + :param request_types: Status types for each request + :param request_times: Start and end times for each request + :param input_values: Input metric values for each request + :param output_values: Output metric values for each request + :return: Compiled generative metrics summary + """ total_values = [ input_val + output_val for input_val, output_val in zip(input_values, output_values, strict=False) @@ -595,9 +768,22 @@ def compile( class GenerativeTextMetricsSummary(StandardBaseDict): - tokens: GenerativeMetricsSummary = Field(description="") - words: GenerativeMetricsSummary = Field(description="") - characters: GenerativeMetricsSummary = Field(description="") + """ + Text-specific metric summaries for generative benchmarks. + + Tracks token, word, and character-level metrics across input, output, and + total usage for text generation workloads. + """ + + tokens: GenerativeMetricsSummary = Field( + description="Token count metrics and distributions" + ) + words: GenerativeMetricsSummary = Field( + description="Word count metrics and distributions" + ) + characters: GenerativeMetricsSummary = Field( + description="Character count metrics and distributions" + ) @classmethod def compile( @@ -607,6 +793,15 @@ def compile( input_metrics: list[UsageMetrics], output_metrics: list[UsageMetrics], ) -> GenerativeTextMetricsSummary: + """ + Compile text metrics summary from request usage data. + + :param request_types: Status types for each request + :param request_times: Start and end times for each request + :param input_metrics: Input usage metrics for each request + :param output_metrics: Output usage metrics for each request + :return: Compiled text metrics summary + """ return GenerativeTextMetricsSummary( tokens=GenerativeMetricsSummary.compile( request_types=request_types, @@ -634,10 +829,25 @@ def compile( class GenerativeImageMetricsSummary(StandardBaseDict): - tokens: GenerativeMetricsSummary = Field(description="") - images: GenerativeMetricsSummary = Field(description="") - pixels: GenerativeMetricsSummary = Field(description="") - bytes: GenerativeMetricsSummary = Field(description="") + """ + Image-specific metric summaries for generative benchmarks. + + Tracks token, image count, pixel, and byte-level metrics across input, output, + and total usage for image generation workloads. + """ + + tokens: GenerativeMetricsSummary = Field( + description="Image token count metrics and distributions" + ) + images: GenerativeMetricsSummary = Field( + description="Image count metrics and distributions" + ) + pixels: GenerativeMetricsSummary = Field( + description="Pixel count metrics and distributions" + ) + bytes: GenerativeMetricsSummary = Field( + description="Byte size metrics and distributions" + ) @classmethod def compile( @@ -647,6 +857,15 @@ def compile( input_metrics: list[UsageMetrics], output_metrics: list[UsageMetrics], ) -> GenerativeImageMetricsSummary: + """ + Compile image metrics summary from request usage data. + + :param request_types: Status types for each request + :param request_times: Start and end times for each request + :param input_metrics: Input usage metrics for each request + :param output_metrics: Output usage metrics for each request + :return: Compiled image metrics summary + """ return GenerativeImageMetricsSummary( tokens=GenerativeMetricsSummary.compile( request_types=request_types, @@ -676,10 +895,25 @@ def compile( class GenerativeVideoMetricsSummary(StandardBaseDict): - tokens: GenerativeMetricsSummary = Field(description="") - frames: GenerativeMetricsSummary = Field(description="") - seconds: GenerativeMetricsSummary = Field(description="") - bytes: GenerativeMetricsSummary = Field(description="") + """ + Video-specific metric summaries for generative benchmarks. + + Tracks token, frame count, duration, and byte-level metrics across input, + output, and total usage for video generation workloads. + """ + + tokens: GenerativeMetricsSummary = Field( + description="Video token count metrics and distributions" + ) + frames: GenerativeMetricsSummary = Field( + description="Frame count metrics and distributions" + ) + seconds: GenerativeMetricsSummary = Field( + description="Duration metrics in seconds and distributions" + ) + bytes: GenerativeMetricsSummary = Field( + description="Byte size metrics and distributions" + ) @classmethod def compile( @@ -689,6 +923,15 @@ def compile( input_metrics: list[UsageMetrics], output_metrics: list[UsageMetrics], ) -> GenerativeVideoMetricsSummary: + """ + Compile video metrics summary from request usage data. + + :param request_types: Status types for each request + :param request_times: Start and end times for each request + :param input_metrics: Input usage metrics for each request + :param output_metrics: Output usage metrics for each request + :return: Compiled video metrics summary + """ return GenerativeVideoMetricsSummary( tokens=GenerativeMetricsSummary.compile( request_types=request_types, @@ -720,10 +963,25 @@ def compile( class GenerativeAudioMetricsSummary(StandardBaseDict): - tokens: GenerativeMetricsSummary = Field(description="") - samples: GenerativeMetricsSummary = Field(description="") - seconds: GenerativeMetricsSummary = Field(description="") - bytes: GenerativeMetricsSummary = Field(description="") + """ + Audio-specific metric summaries for generative benchmarks. + + Tracks token, sample count, duration, and byte-level metrics across input, + output, and total usage for audio generation workloads. + """ + + tokens: GenerativeMetricsSummary = Field( + description="Audio token count metrics and distributions" + ) + samples: GenerativeMetricsSummary = Field( + description="Sample count metrics and distributions" + ) + seconds: GenerativeMetricsSummary = Field( + description="Duration metrics in seconds and distributions" + ) + bytes: GenerativeMetricsSummary = Field( + description="Byte size metrics and distributions" + ) @classmethod def compile( @@ -733,6 +991,15 @@ def compile( input_metrics: list[UsageMetrics], output_metrics: list[UsageMetrics], ) -> GenerativeAudioMetricsSummary: + """ + Compile audio metrics summary from request usage data. + + :param request_types: Status types for each request + :param request_times: Start and end times for each request + :param input_metrics: Input usage metrics for each request + :param output_metrics: Output usage metrics for each request + :return: Compiled audio metrics summary + """ return GenerativeAudioMetricsSummary( tokens=GenerativeMetricsSummary.compile( request_types=request_types, @@ -802,7 +1069,10 @@ class GenerativeMetrics(StandardBaseDict): description="Distribution of inter-token latencies in milliseconds" ) output_tokens_wo_first_per_iteration: StatusDistributionSummary = Field( - description="Distribution of output tokens (without first) generated per streaming iteration" + description=( + "Distribution of output tokens (without first) generated per " + "streaming iteration" + ) ) output_tokens_per_second: StatusDistributionSummary = Field( description="Distribution of output token generation rates" @@ -815,10 +1085,18 @@ class GenerativeMetrics(StandardBaseDict): ) # Domain specific stats - text: GenerativeTextMetricsSummary = Field(description="") - image: GenerativeImageMetricsSummary = Field(description="") - video: GenerativeVideoMetricsSummary = Field(description="") - audio: GenerativeAudioMetricsSummary = Field(description="") + text: GenerativeTextMetricsSummary = Field( + description="Text-specific metrics for tokens, words, and characters" + ) + image: GenerativeImageMetricsSummary = Field( + description="Image-specific metrics for tokens, images, pixels, and bytes" + ) + video: GenerativeVideoMetricsSummary = Field( + description="Video-specific metrics for tokens, frames, duration, and bytes" + ) + audio: GenerativeAudioMetricsSummary = Field( + description="Audio-specific metrics for tokens, samples, duration, and bytes" + ) @classmethod def update_estimate( @@ -829,6 +1107,15 @@ def update_estimate( request_info: RequestInfo, scheduler_state: SchedulerState, ): + """ + Update real-time generative metrics estimates with new request data. + + :param state: Current estimated benchmark state to update + :param response: Response received from the backend + :param request: Original request sent to the backend + :param request_info: Metadata about the request execution + :param scheduler_state: Current state of the scheduler + """ benchmark_start_time = scheduler_state.start_time request_start_time = ( request_info.timings.request_start or request_info.timings.resolve_start @@ -1025,6 +1312,14 @@ def compile( errored: list[GenerativeRequestStats], incomplete: list[GenerativeRequestStats], ) -> GenerativeMetrics: + """ + Compile final generative metrics from request statistics. + + :param completed: Successfully completed request statistics + :param errored: Failed request statistics + :param incomplete: Incomplete/cancelled request statistics + :return: Compiled generative metrics with full distributions + """ requests = completed + errored + incomplete request_types = cast( "list[Literal['successful', 'error', 'incomplete']]", @@ -1139,19 +1434,30 @@ def compile( class SchedulerDict(StandardBaseDict): """Scheduler configuration and execution state dictionary.""" - strategy: SchedulingStrategy - constraints: dict[str, dict[str, Any]] - state: SchedulerState + strategy: SchedulingStrategy = Field( + description="Scheduling strategy used for request distribution" + ) + constraints: dict[str, dict[str, Any]] = Field( + description="Execution constraints applied during benchmarking" + ) + state: SchedulerState = Field( + description="Final state of the scheduler after execution" + ) class BenchmarkerDict(StandardBaseDict): """Benchmarker configuration and component settings dictionary.""" - args: BenchmarkArgs - profile: Profile - requests: dict[str, Any] - backend: dict[str, Any] - environment: dict[str, Any] + profile: Profile = Field(description="Benchmark profile configuration") + requests: dict[str, Any] = Field( + description="Request configuration and dataset information" + ) + backend: dict[str, Any] = Field( + description="Backend configuration and connection details" + ) + environment: dict[str, Any] = Field( + description="Execution environment configuration" + ) class GenerativeBenchmark(Benchmark, StandardBaseDict): @@ -1241,13 +1547,26 @@ def duration(self) -> float: @classmethod def update_estimate( cls, - args: BenchmarkArgs, + args: BenchmarkerArgs, state: EstimatedBenchmarkState, response: GenerationResponse | None, request: GenerationRequest, request_info: RequestInfo, scheduler_state: SchedulerState, ): + """ + Update generative benchmark estimates with new request data. + + Handles warmup/cooldown filtering, request sampling via reservoir sampling, + and delegates metric updates to child metric classes. + + :param args: Benchmark configuration arguments + :param state: Current estimated benchmark state to update + :param response: Response received from the backend + :param request: Original request sent to the backend + :param request_info: Metadata about the request execution + :param scheduler_state: Current state of the scheduler + """ if ( request_info.status == "cancelled" and request_info.timings.resolve_start is None @@ -1344,7 +1663,7 @@ def update_estimate( @classmethod def compile( cls, - args: BenchmarkArgs, + args: BenchmarkerArgs, estimated_state: EstimatedBenchmarkState, scheduler_state: SchedulerState, profile: Profile, @@ -1354,6 +1673,20 @@ def compile( strategy: SchedulingStrategy, constraints: dict[str, dict[str, Any]], ) -> GenerativeBenchmark: + """ + Compile final generative benchmark from accumulated state. + + :param args: Benchmark configuration arguments + :param estimated_state: Accumulated benchmark state from execution + :param scheduler_state: Final state of the scheduler + :param profile: Benchmark profile configuration + :param requests: Collection of requests executed + :param backend: Backend interface used for execution + :param environment: Execution environment configuration + :param strategy: Scheduling strategy used + :param constraints: Execution constraints applied + :return: Compiled generative benchmark instance + """ return GenerativeBenchmark( run_id=args.run_id, run_index=args.run_index, @@ -1366,7 +1699,6 @@ def compile( state=scheduler_state, ), benchmarker=BenchmarkerDict( - args=args, profile=profile, requests=InfoMixin.extract_from_obj(requests), backend=backend.info, @@ -1404,6 +1736,263 @@ def compile( ) +class BenchmarkGenerativeTextArgs(StandardBaseModel): + """ + Configuration arguments for generative text benchmark execution. + + Defines all parameters for benchmark setup including target endpoint, data + sources, backend configuration, processing pipeline, output formatting, and + execution constraints. Supports loading from scenario files and merging with + runtime overrides. + """ + + @classmethod + def create( + cls, scenario: Path | str | None, **kwargs: dict[str, Any] + ) -> BenchmarkGenerativeTextArgs: + """ + Create benchmark args from scenario file and/or keyword arguments. + + :param scenario: Path to scenario file or name of built-in scenario + :param kwargs: Additional keyword arguments to override scenario values + :return: Configured benchmark args instance + :raises ValueError: If scenario is not found or file format is unsupported + """ + constructor_kwargs = {} + + if scenario is not None: + if isinstance(scenario, str) and scenario in ( + builtin_scenarios := get_builtin_scenarios() + ): + scenario_path = builtin_scenarios[scenario] + elif Path(scenario).exists() and Path(scenario).is_dir(): + scenario_path = Path(scenario) + else: + raise ValueError(f"Scenario '{scenario}' not found.") + + with scenario_path.open() as file: + if scenario_path.suffix == ".json": + scenario_data = json.load(file) + elif scenario_path.suffix in {".yaml", ".yml"}: + scenario_data = yaml.safe_load(file) + else: + raise ValueError( + f"Unsupported scenario file format: {scenario_path.suffix}" + ) + if "args" in scenario_data: + # loading from a report file + scenario_data = scenario_data["args"] + constructor_kwargs.update(scenario_data) + + for key, value in kwargs.items(): + if value != cls.get_default(key): + constructor_kwargs[key] = value + + return cls.model_validate(constructor_kwargs) + + @classmethod + def get_default(cls: BenchmarkGenerativeTextArgs, field: str) -> Any: + """ + Get default value for a model field. + + :param field: Name of the field to retrieve default for + :return: Default value for the specified field + :raises ValueError: If field is not found in model + """ + if field not in BenchmarkGenerativeTextArgs.model_fields: + raise ValueError( + f"Field '{field}' not found in BenchmarkGenerativeTextArgs" + ) + + field_info = BenchmarkGenerativeTextArgs.model_fields[field] + if field_info.default_factory is not None: + return field_info.default_factory() + + return field_info.default + + model_config = ConfigDict( + extra="ignore", + use_enum_values=True, + from_attributes=True, + arbitrary_types_allowed=True, + ) + + # Required + target: str = Field(description="Target endpoint URL for benchmark execution") + data: list[Any] = Field(description="List of dataset sources or data files") + # Benchmark configuration + profile: StrategyType | ProfileType | Profile = Field( + default="sweep", description="Benchmark profile or scheduling strategy type" + ) + rate: float | list[float] | None = Field( + default=None, description="Request rate(s) for rate-based scheduling" + ) + # Backend configuration + backend: BackendType | Backend = Field( + default="openai_http", description="Backend type or instance for execution" + ) + backend_kwargs: dict[str, Any] | None = Field( + default=None, description="Additional backend configuration arguments" + ) + model: str | None = Field(default=None, description="Model identifier for backend") + # Data configuration + processor: str | Path | PreTrainedTokenizerBase | None = Field( + default=None, description="Tokenizer path, name, or instance for processing" + ) + processor_args: dict[str, Any] | None = Field( + default=None, description="Additional tokenizer configuration arguments" + ) + data_args: list[dict[str, Any]] | None = Field( + default_factory=list, description="Per-dataset configuration arguments" + ) + data_samples: int = Field( + default=-1, description="Number of samples to use from datasets (-1 for all)" + ) + data_column_mapper: ( + DatasetPreprocessor | dict[str, str] | Literal["generative_column_mapper"] + ) = Field( + default="generative_column_mapper", + description="Column mapping preprocessor for dataset fields", + ) + data_request_formatter: DatasetPreprocessor | dict[str, str] | str = Field( + default="chat_completions", + description="Request formatting preprocessor or template name", + ) + data_collator: Callable | Literal["generative"] | None = Field( + default="generative", description="Data collator for batch processing" + ) + data_sampler: Sampler[int] | Literal["shuffle"] | None = Field( + default=None, description="Data sampler for request ordering" + ) + data_num_workers: int | None = Field( + default=None, description="Number of workers for data loading" + ) + dataloader_kwargs: dict[str, Any] | None = Field( + default=None, description="Additional dataloader configuration arguments" + ) + random_seed: int = Field(default=42, description="Random seed for reproducibility") + # Output configuration + output_path: str | Path | None = Field( + default_factory=Path.cwd, description="Directory path for output files" + ) + output_formats: list[str] | dict[str, str | dict[str, Any]] | None = Field( + default_factory=lambda: ["console", "json"], + description="Output format names or configuration mappings", + ) + # Benchmarker configuration + benchmark_cls: type[GenerativeBenchmark] = Field( + default=GenerativeBenchmark, + description="Benchmark class to use for result compilation", + ) + sample_requests: int | None = Field( + default=10, + description="Number of requests to sample for detailed metrics (None for all)", + ) + warmup: float | None = Field( + default=None, + description="Warmup period in seconds, requests, or fraction (0-1)", + ) + cooldown: float | None = Field( + default=None, + description="Cooldown period in seconds, requests, or fraction (0-1)", + ) + prefer_response_metrics: bool = Field( + default=True, + description="Whether to prefer backend response metrics over request metrics", + ) + # Constraints configuration + max_seconds: int | float | None = Field( + default=None, description="Maximum benchmark execution time in seconds" + ) + max_requests: int | None = Field( + default=None, description="Maximum number of requests to execute" + ) + max_errors: int | None = Field( + default=None, description="Maximum number of errors before stopping" + ) + max_error_rate: float | None = Field( + default=None, description="Maximum error rate (0-1) before stopping" + ) + max_global_error_rate: float | None = Field( + default=None, description="Maximum global error rate (0-1) before stopping" + ) + + @model_serializer + def serialize_model(self): + """ + Custom serialization logic for benchmark args. + + Converts complex types to serializable formats including Profile to type + string, Backend to type string, and Path objects to strings. + + :return: Dictionary representation suitable for JSON/YAML serialization + """ + return { + # target - serialize as is + "target": self.target, + "data": [ + item if isinstance(item, str | type(None)) else str(item) + for item in self.data + ], # data - for each item in the list, if not a str or None, save str(item) + "profile": ( + self.profile.type_ + if isinstance(self.profile, Profile) + else self.profile + ), # profile - if instance of Profile, then save as profile.type_ + "rate": self.rate, + "backend": ( + self.backend.type_ + if isinstance(self.backend, Backend) + else self.backend + ), # backend - if instance of Backend, then save as backend.type_ + "backend_kwargs": self.backend_kwargs, + "model": self.model, + "processor": ( + self.processor + if isinstance(self.processor, str) + else str(self.processor) + if self.processor is not None + else None + ), # processor - if not str, then save as str(processor) + "processor_args": self.processor_args, + "data_args": self.data_args, + "data_samples": self.data_samples, + "data_column_mapper": ( + self.data_column_mapper + if isinstance(self.data_column_mapper, dict | str) + else {} + ), # data_column_mapper - if not dict or str, then save as an empty dict + "data_request_formatter": ( + self.data_request_formatter + if isinstance(self.data_request_formatter, dict | str) + else {} + ), # data_request_formatter - if not dict or str, then save as empty dict + "data_collator": ( + self.data_collator if isinstance(self.data_collator, str) else None + ), # data_collator - if not str, then save as None + "data_sampler": ( + self.data_sampler if isinstance(self.data_sampler, str) else None + ), # data_sampler - if not str, then save as None + "data_num_workers": self.data_num_workers, + "dataloader_kwargs": self.dataloader_kwargs, + "random_seed": self.random_seed, + "output_path": ( + str(self.output_path) if self.output_path is not None else None + ), # output_path - if not None, then ensure it's a str + "output_formats": self.output_formats, + # benchmark_cls - don't save at all (excluded) + "sample_requests": self.sample_requests, + "warmup": self.warmup, + "cooldown": self.cooldown, + "prefer_response_metrics": self.prefer_response_metrics, + "max_seconds": self.max_seconds, + "max_requests": self.max_requests, + "max_errors": self.max_errors, + "max_error_rate": self.max_error_rate, + "max_global_error_rate": self.max_global_error_rate, + } + + class GenerativeBenchmarksReport(StandardBaseModel): """Container for multiple benchmark results with load/save functionality.""" @@ -1439,6 +2028,9 @@ def load_file( return GenerativeBenchmarksReport.model_validate(model_dict) + args: BenchmarkGenerativeTextArgs = Field( + description="The benchmark arguments used for all benchmarks in the report." + ) benchmarks: list[GenerativeBenchmark] = Field( description="The list of completed benchmarks contained within the report.", default_factory=list, diff --git a/src/guidellm/benchmark/types.py b/src/guidellm/benchmark/types.py deleted file mode 100644 index 983e3189..00000000 --- a/src/guidellm/benchmark/types.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Any - -from transformers import PreTrainedTokenizerBase # type: ignore[import] -from typing_extensions import TypeAliasType - -from guidellm.benchmark.output import GenerativeBenchmarkerOutput - -__all__ = ["OutputFormatT", "ProcessorInputT"] - - -OutputFormatT = TypeAliasType( - "OutputFormatT", - tuple[str, ...] - | list[str] - | dict[str, str | dict[str, Any] | GenerativeBenchmarkerOutput] - | None, -) - -ProcessorInputT = TypeAliasType("ProcessorInputT", str | Path | PreTrainedTokenizerBase) diff --git a/src/guidellm/data/deserializers/deserializer.py b/src/guidellm/data/deserializers/deserializer.py index d50e4a9c..7f0dae39 100644 --- a/src/guidellm/data/deserializers/deserializer.py +++ b/src/guidellm/data/deserializers/deserializer.py @@ -50,7 +50,11 @@ def deserialize( dataset = None if type_ is None: - for deserializer in cls.registered_objects(): + for name, deserializer in cls.registry.items(): + if name == "huggingface": + # Save Hugging Face til the end since it is a catch-all. + continue + deserializer_fn: DatasetDeserializer = ( deserializer() if isinstance(deserializer, type) else deserializer ) @@ -62,6 +66,15 @@ def deserialize( random_seed=random_seed, **data_kwargs, ) + + if dataset is None: + deserializer_fn = cls.get_registered_object("huggingface")() + dataset = deserializer_fn( + data=data, + processor_factory=processor_factory, + random_seed=random_seed, + **data_kwargs, + ) elif deserializer := cls.get_registered_object(type_) is not None: deserializer_fn: DatasetDeserializer = ( deserializer() if isinstance(deserializer, type) else deserializer diff --git a/src/guidellm/presentation/data_models.py b/src/guidellm/presentation/data_models.py index ff2863b4..62bf97d8 100644 --- a/src/guidellm/presentation/data_models.py +++ b/src/guidellm/presentation/data_models.py @@ -72,7 +72,7 @@ def from_benchmarks(cls, benchmarks: list["GenerativeBenchmark"]): bm.run_stats.start_time for bm in benchmarks if bm.start_time is not None ) return cls( - model=Model(name=model, size=0), + model=Model(name=model or "", size=0), task="N/A", timestamp=timestamp, dataset=Dataset(name="N/A"), @@ -117,11 +117,15 @@ def from_benchmarks(cls, benchmarks: list["GenerativeBenchmark"]): range(len(successful_requests)), min(5, len(successful_requests)) ) sample_prompts = [ - successful_requests[i].prompt.replace("\n", " ").replace('"', "'") + successful_requests[i].request_args.replace("\n", " ").replace('"', "'") + if successful_requests[i].request_args is not None + else "" for i in sample_indices ] sample_outputs = [ successful_requests[i].output.replace("\n", " ").replace('"', "'") + if successful_requests[i].output is not None + else "" for i in sample_indices ] @@ -155,10 +159,10 @@ def from_benchmarks(cls, benchmarks: list["GenerativeBenchmark"]): min_start_time = benchmarks[0].start_time all_req_times = [ - req.scheduler_info.started_at - min_start_time + req.info.timings.request_start - min_start_time for bm in benchmarks for req in bm.requests.successful - if req.scheduler_info.started_at is not None + if req.info.timings.request_start is not None ] number_of_buckets = len(benchmarks) request_over_time_buckets, bucket_width = Bucket.from_data( diff --git a/src/guidellm/utils/cli.py b/src/guidellm/utils/cli.py index f049e94e..c1e594a9 100644 --- a/src/guidellm/utils/cli.py +++ b/src/guidellm/utils/cli.py @@ -7,8 +7,25 @@ def parse_json(ctx, param, value): # noqa: ARG001 if value is None or value == [None]: return None - if isinstance(value, (list, tuple)): + if isinstance(value, list | tuple): return [parse_json(ctx, param, val) for val in value] + + if "{" not in value and "}" not in value and "=" in value: + # Treat it as a key=value pair if it doesn't look like JSON. + result = {} + for pair in value.split(","): + if "=" not in pair: + raise click.BadParameter( + f"{param.name} must be a valid JSON string or key=value pairs." + ) + key, val = pair.split("=", 1) + result[key.strip()] = val.strip() + return result + + if "{" not in value and "}" not in value: + # Treat it as a plain string if it doesn't look like JSON. + return value + try: return json.loads(value) except json.JSONDecodeError as err: From cc6920365e1d760707d30b331b6eec5e244f30ef Mon Sep 17 00:00:00 2001 From: Mark Kurtz Date: Thu, 16 Oct 2025 12:52:21 -0400 Subject: [PATCH 2/4] fixes from review --- pyproject.toml | 1 + src/guidellm/benchmark/schemas.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8fe6d950..5135edad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ include = ["*"] [tool.setuptools.package-data] "guidellm.data" = ["*.gz"] +"guidellm.benchmark.scenarios" = ["*.json", "**/*.json"] [tool.pdm] distribution = true diff --git a/src/guidellm/benchmark/schemas.py b/src/guidellm/benchmark/schemas.py index 8329d2bc..6fa84641 100644 --- a/src/guidellm/benchmark/schemas.py +++ b/src/guidellm/benchmark/schemas.py @@ -1765,7 +1765,7 @@ def create( builtin_scenarios := get_builtin_scenarios() ): scenario_path = builtin_scenarios[scenario] - elif Path(scenario).exists() and Path(scenario).is_dir(): + elif Path(scenario).exists() and Path(scenario).is_file(): scenario_path = Path(scenario) else: raise ValueError(f"Scenario '{scenario}' not found.") From c4917bd2b71c2c4489781c0e38849f42693d1dbd Mon Sep 17 00:00:00 2001 From: Mark Kurtz Date: Thu, 16 Oct 2025 13:22:03 -0400 Subject: [PATCH 3/4] fix for scenarios not passing data correctly --- src/guidellm/__main__.py | 22 +++++++++++++++++----- src/guidellm/benchmark/schemas.py | 4 +++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 2df537ee..849c9321 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -376,11 +376,21 @@ def run(**kwargs): else {"request_type": request_type, **request_formatter_kwargs} ) - if not isinstance((data := kwargs.get("data")), list | tuple): - kwargs["data"] = [data] if data else [] - - if not isinstance((data_args := kwargs.get("data_args")), list | tuple): - kwargs["data_args"] = [data_args] if data_args else None + data = kwargs.get("data") + if not (data := kwargs.get("data")): + kwargs["data"] = [] + elif isinstance(data, tuple): + kwargs["data"] = list(data) + elif not isinstance(data, list): + kwargs["data"] = [data] + + data_args = kwargs.get("data_args") + if not (data_args := kwargs.get("data_args")): + kwargs["data_args"] = [] + elif isinstance(data_args, tuple): + kwargs["data_args"] = list(data_args) + elif not isinstance(data_args, list): + kwargs["data_args"] = [data_args] if ( not (rate := kwargs.get("rate")) @@ -390,6 +400,8 @@ def run(**kwargs): kwargs["rate"] = None elif len(rate) == 1: kwargs["rate"] = rate[0] + elif not isinstance(rate, list): + kwargs["rate"] = [rate] disable_console_outputs = kwargs.pop("disable_console_outputs", False) display_scheduler_stats = kwargs.pop("display_scheduler_stats", False) diff --git a/src/guidellm/benchmark/schemas.py b/src/guidellm/benchmark/schemas.py index 6fa84641..c3af1d55 100644 --- a/src/guidellm/benchmark/schemas.py +++ b/src/guidellm/benchmark/schemas.py @@ -1819,7 +1819,9 @@ def get_default(cls: BenchmarkGenerativeTextArgs, field: str) -> Any: # Required target: str = Field(description="Target endpoint URL for benchmark execution") - data: list[Any] = Field(description="List of dataset sources or data files") + data: list[Any] = Field( + description="List of dataset sources or data files", default_factory=list + ) # Benchmark configuration profile: StrategyType | ProfileType | Profile = Field( default="sweep", description="Benchmark profile or scheduling strategy type" From a375f6c23e55e5b69e2783b076d196512eaba4ea Mon Sep 17 00:00:00 2001 From: Mark Kurtz Date: Thu, 16 Oct 2025 15:41:33 -0400 Subject: [PATCH 4/4] Fixes from review --- src/guidellm/__main__.py | 53 ++++++++++------------ src/guidellm/benchmark/scenarios/chat.json | 6 ++- src/guidellm/benchmark/scenarios/rag.json | 6 ++- src/guidellm/benchmark/schemas.py | 4 +- src/guidellm/utils/cli.py | 25 ++++++++++ 5 files changed, 59 insertions(+), 35 deletions(-) diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 849c9321..1e9ba96f 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -28,6 +28,7 @@ from pathlib import Path import click +from pydantic import ValidationError try: import uvloop @@ -375,45 +376,37 @@ def run(**kwargs): if not request_formatter_kwargs else {"request_type": request_type, **request_formatter_kwargs} ) - - data = kwargs.get("data") - if not (data := kwargs.get("data")): - kwargs["data"] = [] - elif isinstance(data, tuple): - kwargs["data"] = list(data) - elif not isinstance(data, list): - kwargs["data"] = [data] - - data_args = kwargs.get("data_args") - if not (data_args := kwargs.get("data_args")): - kwargs["data_args"] = [] - elif isinstance(data_args, tuple): - kwargs["data_args"] = list(data_args) - elif not isinstance(data_args, list): - kwargs["data_args"] = [data_args] - - if ( - not (rate := kwargs.get("rate")) - or len(rate) == 0 - or (len(rate) == 1 and not rate[0]) - ): - kwargs["rate"] = None - elif len(rate) == 1: - kwargs["rate"] = rate[0] - elif not isinstance(rate, list): - kwargs["rate"] = [rate] + kwargs["data"] = cli_tools.format_list_arg( + kwargs.get("data"), default=[], simplify_single=False + ) + kwargs["data_args"] = cli_tools.format_list_arg( + kwargs.get("data_args"), default=[], simplify_single=False + ) + kwargs["rate"] = cli_tools.format_list_arg( + kwargs.get("rate"), default=None, simplify_single=True + ) disable_console_outputs = kwargs.pop("disable_console_outputs", False) display_scheduler_stats = kwargs.pop("display_scheduler_stats", False) disable_progress = kwargs.pop("disable_progress", False) + try: + args = BenchmarkGenerativeTextArgs.create( + scenario=kwargs.pop("scenario", None), **kwargs + ) + except ValidationError as err: + # Translate pydantic valdation error to click argument error + errs = err.errors(include_url=False, include_context=True, include_input=True) + param_name = "--" + str(errs[0]["loc"][0]).replace("_", "-") + raise click.BadParameter( + errs[0]["msg"], ctx=click.get_current_context(), param_hint=param_name + ) from err + if uvloop is not None: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run( benchmark_generative_text( - args=BenchmarkGenerativeTextArgs.create( - scenario=kwargs.pop("scenario", None), **kwargs - ), + args=args, progress=( GenerativeConsoleBenchmarkerProgress( display_scheduler_stats=display_scheduler_stats diff --git a/src/guidellm/benchmark/scenarios/chat.json b/src/guidellm/benchmark/scenarios/chat.json index 69602af5..58fd18e2 100644 --- a/src/guidellm/benchmark/scenarios/chat.json +++ b/src/guidellm/benchmark/scenarios/chat.json @@ -1,4 +1,6 @@ { "profile": "sweep", - "data": ["prompt_tokens=512,prompt_tokens_stdev=128,prompt_tokens_min=1,prompt_tokens_max=1024,output_tokens=256,output_tokens_stdev=64,output_tokens_min=1,output_tokens_max=1024"] -} + "data": [ + "prompt_tokens=512,prompt_tokens_stdev=128,prompt_tokens_min=1,prompt_tokens_max=1024,output_tokens=256,output_tokens_stdev=64,output_tokens_min=1,output_tokens_max=1024" + ] +} \ No newline at end of file diff --git a/src/guidellm/benchmark/scenarios/rag.json b/src/guidellm/benchmark/scenarios/rag.json index 4e6b94a5..ea38d76e 100644 --- a/src/guidellm/benchmark/scenarios/rag.json +++ b/src/guidellm/benchmark/scenarios/rag.json @@ -1,4 +1,6 @@ { "profile": "sweep", - "data": ["prompt_tokens=4096,prompt_tokens_stdev=512,prompt_tokens_min=2048,prompt_tokens_max=6144,output_tokens=512,output_tokens_stdev=128,output_tokens_min=1,output_tokens_max=1024"] -} + "data": [ + "prompt_tokens=4096,prompt_tokens_stdev=512,prompt_tokens_min=2048,prompt_tokens_max=6144,output_tokens=512,output_tokens_stdev=128,output_tokens_min=1,output_tokens_max=1024" + ] +} \ No newline at end of file diff --git a/src/guidellm/benchmark/schemas.py b/src/guidellm/benchmark/schemas.py index c3af1d55..9fd09461 100644 --- a/src/guidellm/benchmark/schemas.py +++ b/src/guidellm/benchmark/schemas.py @@ -1820,7 +1820,9 @@ def get_default(cls: BenchmarkGenerativeTextArgs, field: str) -> Any: # Required target: str = Field(description="Target endpoint URL for benchmark execution") data: list[Any] = Field( - description="List of dataset sources or data files", default_factory=list + description="List of dataset sources or data files", + default_factory=list, + min_length=1, ) # Benchmark configuration profile: StrategyType | ProfileType | Profile = Field( diff --git a/src/guidellm/utils/cli.py b/src/guidellm/utils/cli.py index c1e594a9..a75c37a8 100644 --- a/src/guidellm/utils/cli.py +++ b/src/guidellm/utils/cli.py @@ -3,6 +3,8 @@ import click +__all__ = ["Union", "format_list_arg", "parse_json", "set_if_not_default"] + def parse_json(ctx, param, value): # noqa: ARG001 if value is None or value == [None]: @@ -45,6 +47,29 @@ def set_if_not_default(ctx: click.Context, **kwargs) -> dict[str, Any]: return values +def format_list_arg( + value: Any, default: Any = None, simplify_single: bool = False +) -> list[Any] | Any: + """ + Format a multi-argument value for display. + + :param value: The value to format, which can be a single value or a list/tuple. + :param default: The default value to set if the value is non truthy. + :param simplify_single: If True and the value is a single-item list/tuple, + return the single item instead of a list. + :return: Formatted list of values, or single value if simplify_single and applicable + """ + if not value: + return default + + if isinstance(value, tuple): + value = list(value) + elif not isinstance(value, list): + value = [value] + + return value if not simplify_single or len(value) != 1 else value[0] + + class Union(click.ParamType): """ A custom click parameter type that allows for multiple types to be accepted.