From 34fbac659b22222d71ccbf61323076bd5eeb2adb Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 17:21:02 +0100 Subject: [PATCH 1/8] feat: Introduce ResourceProvider abstraction and a ResourceManager This will eventually be used by the Scheduler to manage more in-depth parallelism, where jobs will define resources and the scheduler will have to respect parallelism limits on those resources. The abstract `ResourceProvider` is designed in such a way that more complicated resource provider implementations could be added in the future (when compared to the StaticResourceProviders), with the ability to eventually support dynamic resource allocation. Signed-off-by: Alex Jones --- src/dvsim/job/data.py | 11 ++ src/dvsim/job/deploy.py | 7 ++ src/dvsim/scheduler/resources.py | 203 +++++++++++++++++++++++++++++++ tests/test_scheduler.py | 1 + 4 files changed, 222 insertions(+) create mode 100644 src/dvsim/scheduler/resources.py diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index 4dcffb75..bb269bd3 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -11,6 +11,7 @@ from collections.abc import Callable, Mapping, Sequence from pathlib import Path +from typing import TypeAlias from pydantic import BaseModel, ConfigDict @@ -41,6 +42,10 @@ class WorkspaceConfig(BaseModel): """Path within the scratch directory to use for this run.""" +# A mapping of resource names to the max number of that resource available (or None if unbounded). +ResourceMapping: TypeAlias = dict[str, int | None] + + class JobSpec(BaseModel): """Job specification.""" @@ -59,6 +64,12 @@ class JobSpec(BaseModel): indicates that whatever is configured as the 'default' backend should be used. """ + resources: ResourceMapping | None + """Resource requirements of the job. Maps the name of the resource to the amount + of that resource that is required to run the job. If the scheduler is instructed + to run with enforced resource limits, this limits per-resource parallelism. + """ + seed: int | None """Seed if there is one.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 42aeb55a..9dfb149b 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -149,6 +149,7 @@ def get_job_spec(self) -> "JobSpec": # TODO: for now we always use the default configured backend, but it might be good # to allow different jobs to run on different backends in the future? backend=None, + resources=self.resources, seed=getattr(self, "seed", None), full_name=self.full_name, qual_name=self.qual_name, @@ -236,6 +237,12 @@ def _set_attrs(self) -> None: """ self._extract_attrs(self.sim_cfg.__dict__) + # Use the configured tool to determine the resources (licenses) that are required. + # For now, we just assume that the tool itself is the only resource needed. + self.resources = None + if hasattr(self.sim_cfg, "tool") and self.sim_cfg.tool: + self.resources = {self.sim_cfg.tool.upper(): 1} + # Enable GUI mode, also when GUI debug mode has been invoked. self.gui = self.sim_cfg.gui diff --git a/src/dvsim/scheduler/resources.py b/src/dvsim/scheduler/resources.py new file mode 100644 index 00000000..8914c204 --- /dev/null +++ b/src/dvsim/scheduler/resources.py @@ -0,0 +1,203 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler resource (parallelism limitation) management.""" + +from collections import defaultdict +from collections.abc import Iterable +from enum import Enum +from typing import Protocol + +from dvsim.job.data import JobSpec, ResourceMapping +from dvsim.logging import log + +__all__ = ( + "ResourceManager", + "ResourceProvider", + "StaticResourceProvider", +) + + +class ResourceProvider(Protocol): + """An abstraction of something that provides resource availability info to the scheduler.""" + + is_dynamic: bool + """Whether the resources returned by the provider can change over time.""" + + async def get_capacity(self) -> ResourceMapping: + """Get the capacity of resources available from this provider. + + An integer capacity defines a strict limit for parallelism of that resource, whereas `None` + indicates no upper bound on parallelism. + """ + ... + + +class StaticResourceProvider: + """Provides information about static (unchanging) resource availability.""" + + is_dynamic = False + + def __init__(self, limits: ResourceMapping) -> None: + """Construct a ResourceProvider with defined static limits.""" + self._limits = limits + + async def get_capacity(self) -> ResourceMapping: + """Get the static capacity of the resources available from this provider. + + An integer capacity defines a strict limit for parallelism of that resource, whereas `None` + indicates no upper bound on parallelism. + """ + return self._limits + + +class UnknownResourcePolicy(str, Enum): + """Behaviour upon a job requesting a resource without any defined limit.""" + + IGNORE = "ignore" + WARN = "warn" + ERROR = "error" + FATAL = "fatal" + + +class ResourceManager: + """Manages scheduler resources, limiting parallelism on a per-resource basis.""" + + def __init__( + self, + provider: ResourceProvider, + missing_policy: UnknownResourcePolicy = UnknownResourcePolicy.IGNORE, + ) -> None: + """Construct a ResourceManager instance.""" + self._provider = provider + self._missing_policy = missing_policy + self._usage = defaultdict(int) + + async def can_allocate(self, request: ResourceMapping) -> bool: + """Check if a given resource request can be allocated, given current usage and limits.""" + capacity = await self._provider.get_capacity() + for resource, needed in request.items(): + limit = capacity.get(resource) + if limit is None: + continue + if needed is None or self._usage[resource] + needed > limit: + return False + return True + + async def try_allocate(self, request: ResourceMapping) -> bool: + """Attempt to allocate the requested resources, recording their usage. + + Returns: + True if successfully allocated, false otherwise. + + """ + # Note: we use no lock here on in `ResourceManager.release()` because we assume the + # invariant that there is never an await between the check and the mutation. If the + # code is changed in this manner, then both of these methods should acquire an + # asyncio.Lock() first around their operation. + if not await self.can_allocate(request): + return False + for resource, amount in request.items(): + if amount is not None: + self._usage[resource] += amount + return True + + def release(self, request: ResourceMapping) -> None: + """Release a set of allocated resources.""" + for resource, amount in request.items(): + if amount is not None: + self._usage[resource] -= amount + + def _log_usage(self, capacity: ResourceMapping, used: ResourceMapping) -> None: + """Debug log individual job resource usage aggregates.""" + if log.isEnabledFor(log.DEBUG): + for resource, usage in used.items(): + limit = capacity.get(resource) + usage_str = str(usage) if usage is not None else "unlimited" + limit_str = str(limit) if limit is not None else "unlimited" + log.debug( + "Total '%s' resources used: %s, limit: %s", resource, usage_str, limit_str + ) + + def _handle_missing_resource(self, job: JobSpec, resource: str, errors: list[str]) -> None: + """Handle a job using an undefined resource, according to the configured policy. + + Args: + job: the job with a missing resource + resource: the name of the missing resource + errors: the list of errors to append an error to if needed. + + """ + if not job.resources: + return + + needed = job.resources.get(resource) + message = f"Job '{job.full_name}' uses unknown resource '{resource}' ({needed} requested)" + match self._missing_policy: + case UnknownResourcePolicy.WARN: + log.warning(message) + case UnknownResourcePolicy.ERROR: + log.error(message) + case UnknownResourcePolicy.FATAL: + errors.append(message) + + def _emit_validation_errors( + self, missing_resources: list[str], limit_exceeded: list[str] + ) -> None: + """Emit aggregated job validation error messages according to the manager's configuration. + + Args: + missing_resources: error messages for any resources used that were not defined. + limit_exceeded: error messages for any job whose resources exceeded the defined limits. + + """ + if missing_resources: + msg = "Job resources had errors:\n" + "\n".join(missing_resources + limit_exceeded) + raise ValueError(msg) + if limit_exceeded: + msg = "Invalid job resource requirements:\n" + "\n".join(limit_exceeded) + # If we know the available resources are static, this should be a fatal error. + if not self._provider.is_dynamic: + raise ValueError(msg) + log.warning("%s", msg) + + async def validate_jobs(self, jobs: Iterable[JobSpec]) -> None: + """Validate given jobs against known (initial) resource limits. + + Validate that the resources required by a list of jobs are less than those that are + initially available from some resource providers. Note that if resources are not static, + this is not a guarantee that resources will remain available. + """ + capacity = await self._provider.get_capacity() + aggregate: ResourceMapping = defaultdict(int) + # Collect all errors before reporting to give more detailed info + missing_resource_errors: list[str] = [] + limit_exceeded_errors: list[str] = [] + + for job in jobs: + if not job.resources: + continue + + for resource, needed in job.resources.items(): + used = aggregate[resource] + if needed is None: + aggregate[resource] = None + elif used is not None: + aggregate[resource] = used + needed + + if resource not in capacity: + self._handle_missing_resource(job, resource, missing_resource_errors) + continue + + limit = capacity[resource] + if limit is not None and (needed is None or needed > limit): + amount = "unlimited" if needed is None else f"{needed} of" + msg = ( + f"Job '{job.full_name}' requires {amount} '{resource}' " + f"but the max available is {limit}" + ) + limit_exceeded_errors.append(msg) + + self._log_usage(capacity, aggregate) + self._emit_validation_errors(missing_resource_errors, limit_exceeded_errors) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 1497161b..3ee0be70 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -298,6 +298,7 @@ def job_spec_factory( "job_type": "mock_type", "target": "mock_target", "backend": None, + "resources": None, "seed": None, "dependencies": [], "needs_all_dependencies_passing": True, From 0e78d65dca881e49bff6503db05dec7659d8310d Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 17:45:54 +0100 Subject: [PATCH 2/8] fix: remove superfluous `Path` construction in xcelium sim tool This is just an idempotent constructor; it is confirmed via the arg type (and by manually inspecting possible call sites) that this will always be a `Path` already. Signed-off-by: Alex Jones --- src/dvsim/sim/tool/xcelium.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dvsim/sim/tool/xcelium.py b/src/dvsim/sim/tool/xcelium.py index 9dd1bd4a..3c403a46 100644 --- a/src/dvsim/sim/tool/xcelium.py +++ b/src/dvsim/sim/tool/xcelium.py @@ -32,7 +32,7 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] tuple of, List of metrics and values, and final coverage total """ - with Path(cov_report_path).open() as buf: + with cov_report_path.open() as buf: for line in buf: if "name" in line: # Strip the line and remove the unwanted "* Covered" string. From 0626f0e0cea0a45c8a7de7199d96a7442e876c04 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 17:47:01 +0100 Subject: [PATCH 3/8] fix: Xcelium sim tool pyright/ruff errors `OrderedDict` is redundant in modern Python, so let's type it properly and use modern conveniences. Likewise, we shouldn't be returning `dict_keys` if we intend to return a `Sequence` in the output tuples. Signed-off-by: Alex Jones --- src/dvsim/sim/tool/xcelium.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/dvsim/sim/tool/xcelium.py b/src/dvsim/sim/tool/xcelium.py index 3c403a46..d6355b50 100644 --- a/src/dvsim/sim/tool/xcelium.py +++ b/src/dvsim/sim/tool/xcelium.py @@ -5,7 +5,7 @@ """EDA tool plugin providing Xcelium support to DVSim.""" import re -from collections import OrderedDict +from collections import defaultdict from collections.abc import Mapping, Sequence from pathlib import Path from typing import TYPE_CHECKING @@ -41,9 +41,8 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] metrics[0] = "Score" # Gather the list of metrics. - items = OrderedDict() + items: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for metric in metrics: - items[metric] = {} items[metric]["covered"] = 0 items[metric]["total"] = 0 @@ -76,7 +75,7 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] if metric == "Score": cov_total = value - return [items.keys(), values], cov_total + return [list(items.keys()), values], cov_total # If we reached here, then we were unable to extract the coverage. msg = f"Coverage data not found in {buf.name}!" From 2bf69e1444ac0b31a047cc24b084efe5af964480 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 17:48:23 +0100 Subject: [PATCH 4/8] fix: Handle missing score metric in Xcelium sim tool coverage summary pyright is correct, we should be handling the case where `cov_total is None` which is when the coverage summary doesn't contain the expected "Score" metric. This should result in an appropriate error from parsing. Signed-off-by: Alex Jones --- src/dvsim/sim/tool/xcelium.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dvsim/sim/tool/xcelium.py b/src/dvsim/sim/tool/xcelium.py index d6355b50..ef0abd93 100644 --- a/src/dvsim/sim/tool/xcelium.py +++ b/src/dvsim/sim/tool/xcelium.py @@ -74,6 +74,8 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] values.append(value) if metric == "Score": cov_total = value + if cov_total is None: + break return [list(items.keys()), values], cov_total From a0c4721249390ce67a07e3db4eb15a328ff377ce Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 19:15:15 +0100 Subject: [PATCH 5/8] refactor: Change Scheduler `max_parallelism` type Rather than use an `int` and make `0` an implementation-defined "unbounded", it is nicer to explicitly support `max_parallelism=None` to more clearly refer to this case. Signed-off-by: Alex Jones --- src/dvsim/scheduler/core.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dvsim/scheduler/core.py b/src/dvsim/scheduler/core.py index fef9571a..6db10a6d 100644 --- a/src/dvsim/scheduler/core.py +++ b/src/dvsim/scheduler/core.py @@ -94,7 +94,7 @@ def __init__( # noqa: PLR0913 backends: Mapping[str, RuntimeBackend], default_backend: str, *, - max_parallelism: int = 0, + max_parallelism: int | None = None, priority_fn: JobPriorityFn | None = None, coalesce_window: float | None = 0.001, ) -> None: @@ -106,7 +106,7 @@ def __init__( # noqa: PLR0913 backends: The mapping (name -> backend) of backends available to the scheduler. default_backend: The name of the default backend to use if not specified by a job. max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch - at once, across all backends. The default value of `0` indicates no upper limit. + at once, across all backends. The default value of `None` indicates no upper limit. priority_fn: A function to calculate the priority of a given job. If no function is given, this defaults to using the job's weight. coalesce_window: If specified, the time in seconds to wait on receiving a job @@ -115,8 +115,8 @@ def __init__( # noqa: PLR0913 extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. """ - if max_parallelism < 0: - err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" + if max_parallelism is not None and max_parallelism <= 0: + err = f"max_parallelism must be some positive integer or None, not {max_parallelism}" raise ValueError(err) if default_backend not in backends: err = f"Default backend '{default_backend}' is not in the mapping of given backends" From 17bb9274c72607acf0de7ed1cc5683d53580c62c Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 19:44:29 +0100 Subject: [PATCH 6/8] feat: Integrate resources into the scheduler to limit parallelism Integrate the previously introduced `ResourceManager` into the scheduler. The scheduler now attempts to allocate resources for jobs when it decides to run them, which are then released when the job finishes execution (or fails to launch). All resources go through the manager, which will fail to allocate them if there are not enough resources to provide within the defined limits. If no resource limits are defined, behaviour depends on the `ResourceManager` configuration, but the default is to assume an unbounded limit. At the start of the scheduler run, all jobs are validated against the limits defined in the `ResourceManager`. For example, if static resources are used and there exists some job whose resource requirements cannot be satisfied by the defined limits, then this will be caught in advance of execution and reported early as an error. If the resources are dynamic, then this case only results in a warning. Signed-off-by: Alex Jones --- src/dvsim/scheduler/core.py | 65 +++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/src/dvsim/scheduler/core.py b/src/dvsim/scheduler/core.py index 6db10a6d..026ddd0e 100644 --- a/src/dvsim/scheduler/core.py +++ b/src/dvsim/scheduler/core.py @@ -18,6 +18,7 @@ from dvsim.logging import log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.data import JobCompletionEvent, JobHandle +from dvsim.scheduler.resources import ResourceManager __all__ = ( "JobPriorityFn", @@ -95,6 +96,7 @@ def __init__( # noqa: PLR0913 default_backend: str, *, max_parallelism: int | None = None, + resource_manager: ResourceManager | None = None, priority_fn: JobPriorityFn | None = None, coalesce_window: float | None = 0.001, ) -> None: @@ -107,6 +109,8 @@ def __init__( # noqa: PLR0913 default_backend: The name of the default backend to use if not specified by a job. max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch at once, across all backends. The default value of `None` indicates no upper limit. + resource_manager: The scheduler's resource manager, through which per-job resources + are allocated to enforce additional limits on scheduler parallelism. priority_fn: A function to calculate the priority of a given job. If no function is given, this defaults to using the job's weight. coalesce_window: If specified, the time in seconds to wait on receiving a job @@ -128,6 +132,7 @@ def __init__( # noqa: PLR0913 self._backends = dict(backends) self._default_backend = default_backend self._max_parallelism = max_parallelism + self._resources = resource_manager self._priority_fn = priority_fn or self._default_priority self._coalesce_window = coalesce_window @@ -321,6 +326,8 @@ def _mark_job_completed( if job.spec.id in self._running: self._running.remove(job.spec.id) self._running_per_backend[job.backend_key] -= 1 + if self._resources and job.spec.resources: + self._resources.release(job.spec.resources) # Update dependents (jobs that depend on this job), propagating failures if needed. self._update_completed_job_deps(job) @@ -354,6 +361,13 @@ def _update_completed_job_deps(self, job: JobRecord) -> None: async def run(self) -> list[CompletedJobStatus]: """Run all scheduled jobs to completion (unless terminated) and return the results.""" + # Check if we know about all the resources defined by the given jobs, and whether + # initial resource availability can (independently) satisfy all jobs' needs. + # This is an error if we know we are using static resources, and a warning otherwise. + if self._resources: + specs = [job.spec for job in self._jobs.values()] + await self._resources.validate_jobs(specs) + self._install_signal_handlers() for backend in self._backends.values(): @@ -536,17 +550,14 @@ async def _handle_exit_signal(self) -> None: if kill_tasks: await asyncio.gather(*kill_tasks, return_exceptions=True) - async def _schedule_ready_jobs(self) -> None: - """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" - # Find out how many jobs we can dispatch according to the scheduler's parallelism limit - available_slots = ( - self._max_parallelism - len(self._running) - if self._max_parallelism - else len(self._ready_heap) - ) - if available_slots <= 0: - return + async def _get_jobs_to_launch( + self, available_slots: int + ) -> dict[str, list[tuple[Priority, JobRecord]]]: + """Get the sets of jobs to try and launch at this moment. + Returns a mapping of backend names to the lists of jobs to launch for that backend, + where jobs are defined by their priority value and record. + """ # Collect jobs to launch in a dict, grouped per backend, for batched launching. to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) blocked: list[tuple[Priority, str]] = [] @@ -565,6 +576,15 @@ async def _schedule_ready_jobs(self) -> None: blocked.append((neg_priority, job_id)) continue + # Check we have the resources to run the job, and acquire them if so. + if ( + self._resources + and job.spec.resources + and not await self._resources.try_allocate(job.spec.resources) + ): + blocked.append((neg_priority, job_id)) + continue + to_launch[job.backend_key].append((neg_priority, job)) slots_used += 1 @@ -572,6 +592,29 @@ async def _schedule_ready_jobs(self) -> None: for entry in blocked: heapq.heappush(self._ready_heap, entry) + # If nothing is running and nothing was scheduled to run, there must not be + # enough resources to run any jobs. Warn the user. + if blocked and not self._running and slots_used == 0: + log.warning( + "All queued jobs cannot be scheduled due to resource limits, despite no jobs " + "currently being executed." + ) + + return to_launch + + async def _schedule_ready_jobs(self) -> None: + """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" + # Find out how many jobs we can dispatch according to the scheduler's parallelism limit + available_slots = ( + self._max_parallelism - len(self._running) + if self._max_parallelism + else len(self._ready_heap) + ) + if available_slots <= 0: + return + + to_launch = await self._get_jobs_to_launch(available_slots) + # Launch the selected jobs in batches per backend launch_tasks = [] for backend_name, jobs in to_launch.items(): @@ -593,6 +636,8 @@ async def _schedule_ready_jobs(self) -> None: if handle is None: log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + if self._resources and job.spec.resources: + self._resources.release(job.spec.resources) continue job.handle = handle From ffa92b83fb8a7e9b1e02289877e4644294b710f2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 20:10:20 +0100 Subject: [PATCH 7/8] feat: add command line resource options & integrate with the base flow This commit provides command-line options for creating and configuring a `ResourceManager` with a static resource provider to use passed resource limits. This can then be given to the scheduler to provide more fine-grained parallelism limiting than is allowed by `--max-parallel`, where jobs are now only scheduled such that the scheduler will always respect the defined resource limits. Note the TODO about Python 3.11 - when the minimum Python version is bumped we can make the enum a StrEnum which has much better native str() behaviour than the existing Enum type and removes some of the extra glue code. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 44 +++++++++++++++++++++++++++++++++++ src/dvsim/flow/base.py | 15 +++++++++++- src/dvsim/scheduler/runner.py | 30 +++++++++++++++++++++++- 3 files changed, 87 insertions(+), 2 deletions(-) diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index b8ac812f..ab62c5c9 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -42,6 +42,7 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry +from dvsim.scheduler.resources import UnknownResourcePolicy from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, rm_path, run_cmd_with_timeout @@ -155,6 +156,24 @@ def resolve_max_parallel(arg): return 16 +def parse_resource(s: str) -> tuple[str, int | None]: + """Parse a resource limit string from the argparse CLI.""" + unbounded_strs = ("all", "any", "inf", "infinite", "many", "max", "none", "null", "unlimited") + try: + key, val = "=".join(s.split()).split("=") + key, val = key.strip().upper(), val.strip() + if val.lower() in unbounded_strs: + return key, None + val = int(val) + if val <= 0: + msg = f"Resource values should be positive integers or 'INF' / 'NONE', not {val}." + raise argparse.ArgumentTypeError(msg) + return key, int(val) + except (ValueError, KeyError, RuntimeError) as e: + msg = f"Invalid resource format: {s}, expected RESOURCE=COUNT" + raise argparse.ArgumentTypeError(msg) from e + + def resolve_branch(branch): """Choose a branch name for output files. @@ -426,6 +445,31 @@ def parse_args(argv: list[str] | None = None): ), ) + resources = parser.add_argument_group("Resource management") + + resources.add_argument( + "-R", + "--resource", + metavar="RESOURCE=COUNT", + type=parse_resource, + dest="resource_limits", + action="append", + help="Set a limit for a resource (repeatable), e.g. --resource A=30 or -R B=unlimited.", + ) + + resources.add_argument( + "--on-missing-resource", + # TODO: when using Python 3.11+, make UnknownResourcePolicy a StrEnum instead and then + # just use the enum type directly. + type=str.lower, + choices=[p.value for p in UnknownResourcePolicy], + default=UnknownResourcePolicy.IGNORE.value, + help=( + "Behaviour when a job requests a resource with no defined limit. " + "Defaults to %(default)s." + ), + ) + pathg = parser.add_argument_group("File management") pathg.add_argument( diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index cd1438d9..c11ac747 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -21,7 +21,12 @@ from dvsim.job.data import CompletedJobStatus, JobSpec, WorkspaceConfig from dvsim.job.status import JobStatus from dvsim.logging import log -from dvsim.scheduler.runner import build_default_scheduler_backend, run_scheduler +from dvsim.scheduler.resources import UnknownResourcePolicy +from dvsim.scheduler.runner import ( + build_default_scheduler_backend, + build_resource_manager, + run_scheduler, +) from dvsim.utils import ( find_and_substitute_wildcards, rm_path, @@ -471,12 +476,20 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: fake_policy=self._fake_policy, ) + # TODO: For Python 3.11 make this a StrEnum, then this conversion is not needed. + missing_policy = UnknownResourcePolicy(self.args.on_missing_resource) + resource_manager = build_resource_manager( + resource_limits=dict(self.args.resource_limits or ()), + missing_policy=missing_policy, + ) + return asyncio.run( run_scheduler( jobs=jobs, max_parallel=self.args.max_parallel, interactive=self.interactive, backend=backend, + resource_manager=resource_manager, ) ) diff --git a/src/dvsim/scheduler/runner.py b/src/dvsim/scheduler/runner.py index 6149e95c..4a49577c 100644 --- a/src/dvsim/scheduler/runner.py +++ b/src/dvsim/scheduler/runner.py @@ -13,6 +13,12 @@ from dvsim.runtime.registry import backend_registry from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager +from dvsim.scheduler.resources import ( + ResourceManager, + ResourceMapping, + StaticResourceProvider, + UnknownResourcePolicy, +) from dvsim.scheduler.status_printer import create_status_printer __all__ = ( @@ -31,7 +37,7 @@ def build_default_scheduler_backend( fake_policy: policy for generating fake data if using the fake backend Returns: - Runtime backend to use with the scehduler. + Runtime backend to use with the scheduler. """ # Create the runtime backends. TODO: support multiple runtime backends at once @@ -44,12 +50,32 @@ def build_default_scheduler_backend( return default_backend +def build_resource_manager( + *, + resource_limits: ResourceMapping, + missing_policy: UnknownResourcePolicy, +) -> ResourceManager | None: + """Build a resource manager for use with the scheduler and validate the given jobs' resources. + + Args: + resource_limits: The list of static resource limits to impose on the scheduler. + missing_policy: How to handle requested job resources without any defined limits. + + """ + if not resource_limits and missing_policy == UnknownResourcePolicy.IGNORE: + return None + + provider = StaticResourceProvider(resource_limits) + return ResourceManager(provider, missing_policy) + + async def run_scheduler( *, jobs: Iterable[JobSpec], max_parallel: int, interactive: bool, backend: RuntimeBackend, + resource_manager: ResourceManager | None, ) -> list[CompletedJobStatus]: """Run the scheduler with the given set of job specifications. @@ -58,6 +84,7 @@ async def run_scheduler( max_parallel: number of max parallel jobs to run interactive: run the tool in interactive mode? backend: the scheduler backend to use + resource_manager: the scheduler resource manager to use, if any. Returns: List of completed job status objects. @@ -73,6 +100,7 @@ async def run_scheduler( backends={backend.name: backend}, default_backend=backend.name, max_parallelism=max_parallel, + resource_manager=resource_manager, # The scheduler prioritizes jobs in (lexicographically) decreasing order based on # the given `priority_fn`. We hence define a prioritization scheme that prioritizes # first by decreasing weight, then by decreasing timeout, and finally by the decreasing From 0aad34d9e954b2fafdc0b3a9097d8b2e720e8f27 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 15 Apr 2026 20:12:50 +0100 Subject: [PATCH 8/8] test: add scheduler tests for resource-level parallelism Add some extra scheduler tests to cover the functionality of the new resource-level parallelism feature that was introduced. Signed-off-by: Alex Jones --- tests/test_scheduler.py | 60 +++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 3ee0be70..b432cefd 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -25,6 +25,7 @@ from dvsim.report.data import IPMeta, ToolMeta from dvsim.runtime.legacy import LegacyLauncherAdapter from dvsim.scheduler.core import Scheduler +from dvsim.scheduler.resources import ResourceManager, StaticResourceProvider __all__ = () @@ -454,6 +455,16 @@ async def test_duplicate_jobs(fxt: Fxt) -> None: # Check names of all jobs are unique (i.e. no duplicates are returned). assert_that(len(names), equal_to(len(set(names)))) + @staticmethod + async def _parallelism_test_helper( + fxt: Fxt, scheduler: Scheduler, num_jobs: int, expected_parallelism: int + ) -> None: + """Test helper to check that scheduler parallelism reaches the expected level.""" + assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) + result = await scheduler.run() + _assert_result_status(result, num_jobs) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(expected_parallelism)) + @staticmethod @pytest.mark.asyncio @pytest.mark.timeout(DEFAULT_TIMEOUT) @@ -462,10 +473,7 @@ async def test_parallel_dispatch(fxt: Fxt, num_jobs: int) -> None: """Test that many jobs can be dispatched in parallel.""" jobs = make_many_jobs(fxt.tmp_path, num_jobs) scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) - result = await scheduler.run() - _assert_result_status(result, num_jobs) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(num_jobs)) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, num_jobs) @staticmethod @pytest.mark.asyncio @@ -484,10 +492,46 @@ async def test_max_parallel( else: fxt.mock_legacy_backend.max_parallelism = max_parallel scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) - result = await scheduler.run() - _assert_result_status(result, num_jobs) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(min(num_jobs, max_parallel))) + expected_parallel = min(num_jobs, max_parallel) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) + + @staticmethod + @pytest.mark.asyncio + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_a_jobs", [5, 10, 20]) + @pytest.mark.parametrize("num_b_jobs", [7, 13, 26]) + @pytest.mark.parametrize("limit", [2, 20, 35]) + async def test_resource_parallelism( + fxt: Fxt, num_a_jobs: int, num_b_jobs: int, limit: int + ) -> None: + """Test that the parallelism limits imposed via scheduler resources are respected.""" + num_jobs = num_a_jobs + num_b_jobs + resource = ["A" if i < num_a_jobs else "B" for i in range(num_jobs)] + jobs = make_many_jobs( + fxt.tmp_path, num_a_jobs + num_b_jobs, per_job=lambda i: {"resources": {resource[i]: 1}} + ) + # Ensure there are no parallelism limits in the launcher/backend. + fxt.mock_legacy_backend.max_parallelism = 0 + resource_manager = ResourceManager(StaticResourceProvider({"A": limit, "B": limit})) + scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND, resource_manager=resource_manager) + expected_parallel = min(num_a_jobs, limit) + min(num_b_jobs, limit) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) + + @staticmethod + @pytest.mark.asyncio + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_resources", [1, 2, 5]) + @pytest.mark.parametrize("limit", [5, 16, 33, None]) + async def test_resource_usage(fxt: Fxt, num_resources: int, limit: int | None) -> None: + """Test that job resource limits allow jobs to use multiples of resources.""" + num_jobs = limit * 2 if limit else num_resources * 2 + jobs = make_many_jobs(fxt.tmp_path, num_jobs, resources={"TEST": num_resources}) + # Ensure there are no parallelism limits in the launcher/backend. + fxt.mock_legacy_backend.max_parallelism = 0 + resource_manager = ResourceManager(StaticResourceProvider({"TEST": limit})) + scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND, resource_manager=resource_manager) + expected_parallel = limit // num_resources if limit else num_jobs + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) @staticmethod @pytest.mark.asyncio