diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index b8ac812f..ab62c5c9 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -42,6 +42,7 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry +from dvsim.scheduler.resources import UnknownResourcePolicy from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, rm_path, run_cmd_with_timeout @@ -155,6 +156,24 @@ def resolve_max_parallel(arg): return 16 +def parse_resource(s: str) -> tuple[str, int | None]: + """Parse a resource limit string from the argparse CLI.""" + unbounded_strs = ("all", "any", "inf", "infinite", "many", "max", "none", "null", "unlimited") + try: + key, val = "=".join(s.split()).split("=") + key, val = key.strip().upper(), val.strip() + if val.lower() in unbounded_strs: + return key, None + val = int(val) + if val <= 0: + msg = f"Resource values should be positive integers or 'INF' / 'NONE', not {val}." + raise argparse.ArgumentTypeError(msg) + return key, int(val) + except (ValueError, KeyError, RuntimeError) as e: + msg = f"Invalid resource format: {s}, expected RESOURCE=COUNT" + raise argparse.ArgumentTypeError(msg) from e + + def resolve_branch(branch): """Choose a branch name for output files. @@ -426,6 +445,31 @@ def parse_args(argv: list[str] | None = None): ), ) + resources = parser.add_argument_group("Resource management") + + resources.add_argument( + "-R", + "--resource", + metavar="RESOURCE=COUNT", + type=parse_resource, + dest="resource_limits", + action="append", + help="Set a limit for a resource (repeatable), e.g. --resource A=30 or -R B=unlimited.", + ) + + resources.add_argument( + "--on-missing-resource", + # TODO: when using Python 3.11+, make UnknownResourcePolicy a StrEnum instead and then + # just use the enum type directly. + type=str.lower, + choices=[p.value for p in UnknownResourcePolicy], + default=UnknownResourcePolicy.IGNORE.value, + help=( + "Behaviour when a job requests a resource with no defined limit. " + "Defaults to %(default)s." + ), + ) + pathg = parser.add_argument_group("File management") pathg.add_argument( diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index cd1438d9..c11ac747 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -21,7 +21,12 @@ from dvsim.job.data import CompletedJobStatus, JobSpec, WorkspaceConfig from dvsim.job.status import JobStatus from dvsim.logging import log -from dvsim.scheduler.runner import build_default_scheduler_backend, run_scheduler +from dvsim.scheduler.resources import UnknownResourcePolicy +from dvsim.scheduler.runner import ( + build_default_scheduler_backend, + build_resource_manager, + run_scheduler, +) from dvsim.utils import ( find_and_substitute_wildcards, rm_path, @@ -471,12 +476,20 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: fake_policy=self._fake_policy, ) + # TODO: For Python 3.11 make this a StrEnum, then this conversion is not needed. + missing_policy = UnknownResourcePolicy(self.args.on_missing_resource) + resource_manager = build_resource_manager( + resource_limits=dict(self.args.resource_limits or ()), + missing_policy=missing_policy, + ) + return asyncio.run( run_scheduler( jobs=jobs, max_parallel=self.args.max_parallel, interactive=self.interactive, backend=backend, + resource_manager=resource_manager, ) ) diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index 4dcffb75..bb269bd3 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -11,6 +11,7 @@ from collections.abc import Callable, Mapping, Sequence from pathlib import Path +from typing import TypeAlias from pydantic import BaseModel, ConfigDict @@ -41,6 +42,10 @@ class WorkspaceConfig(BaseModel): """Path within the scratch directory to use for this run.""" +# A mapping of resource names to the max number of that resource available (or None if unbounded). +ResourceMapping: TypeAlias = dict[str, int | None] + + class JobSpec(BaseModel): """Job specification.""" @@ -59,6 +64,12 @@ class JobSpec(BaseModel): indicates that whatever is configured as the 'default' backend should be used. """ + resources: ResourceMapping | None + """Resource requirements of the job. Maps the name of the resource to the amount + of that resource that is required to run the job. If the scheduler is instructed + to run with enforced resource limits, this limits per-resource parallelism. + """ + seed: int | None """Seed if there is one.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 42aeb55a..9dfb149b 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -149,6 +149,7 @@ def get_job_spec(self) -> "JobSpec": # TODO: for now we always use the default configured backend, but it might be good # to allow different jobs to run on different backends in the future? backend=None, + resources=self.resources, seed=getattr(self, "seed", None), full_name=self.full_name, qual_name=self.qual_name, @@ -236,6 +237,12 @@ def _set_attrs(self) -> None: """ self._extract_attrs(self.sim_cfg.__dict__) + # Use the configured tool to determine the resources (licenses) that are required. + # For now, we just assume that the tool itself is the only resource needed. + self.resources = None + if hasattr(self.sim_cfg, "tool") and self.sim_cfg.tool: + self.resources = {self.sim_cfg.tool.upper(): 1} + # Enable GUI mode, also when GUI debug mode has been invoked. self.gui = self.sim_cfg.gui diff --git a/src/dvsim/scheduler/core.py b/src/dvsim/scheduler/core.py index fef9571a..026ddd0e 100644 --- a/src/dvsim/scheduler/core.py +++ b/src/dvsim/scheduler/core.py @@ -18,6 +18,7 @@ from dvsim.logging import log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.data import JobCompletionEvent, JobHandle +from dvsim.scheduler.resources import ResourceManager __all__ = ( "JobPriorityFn", @@ -94,7 +95,8 @@ def __init__( # noqa: PLR0913 backends: Mapping[str, RuntimeBackend], default_backend: str, *, - max_parallelism: int = 0, + max_parallelism: int | None = None, + resource_manager: ResourceManager | None = None, priority_fn: JobPriorityFn | None = None, coalesce_window: float | None = 0.001, ) -> None: @@ -106,7 +108,9 @@ def __init__( # noqa: PLR0913 backends: The mapping (name -> backend) of backends available to the scheduler. default_backend: The name of the default backend to use if not specified by a job. max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch - at once, across all backends. The default value of `0` indicates no upper limit. + at once, across all backends. The default value of `None` indicates no upper limit. + resource_manager: The scheduler's resource manager, through which per-job resources + are allocated to enforce additional limits on scheduler parallelism. priority_fn: A function to calculate the priority of a given job. If no function is given, this defaults to using the job's weight. coalesce_window: If specified, the time in seconds to wait on receiving a job @@ -115,8 +119,8 @@ def __init__( # noqa: PLR0913 extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. """ - if max_parallelism < 0: - err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" + if max_parallelism is not None and max_parallelism <= 0: + err = f"max_parallelism must be some positive integer or None, not {max_parallelism}" raise ValueError(err) if default_backend not in backends: err = f"Default backend '{default_backend}' is not in the mapping of given backends" @@ -128,6 +132,7 @@ def __init__( # noqa: PLR0913 self._backends = dict(backends) self._default_backend = default_backend self._max_parallelism = max_parallelism + self._resources = resource_manager self._priority_fn = priority_fn or self._default_priority self._coalesce_window = coalesce_window @@ -321,6 +326,8 @@ def _mark_job_completed( if job.spec.id in self._running: self._running.remove(job.spec.id) self._running_per_backend[job.backend_key] -= 1 + if self._resources and job.spec.resources: + self._resources.release(job.spec.resources) # Update dependents (jobs that depend on this job), propagating failures if needed. self._update_completed_job_deps(job) @@ -354,6 +361,13 @@ def _update_completed_job_deps(self, job: JobRecord) -> None: async def run(self) -> list[CompletedJobStatus]: """Run all scheduled jobs to completion (unless terminated) and return the results.""" + # Check if we know about all the resources defined by the given jobs, and whether + # initial resource availability can (independently) satisfy all jobs' needs. + # This is an error if we know we are using static resources, and a warning otherwise. + if self._resources: + specs = [job.spec for job in self._jobs.values()] + await self._resources.validate_jobs(specs) + self._install_signal_handlers() for backend in self._backends.values(): @@ -536,17 +550,14 @@ async def _handle_exit_signal(self) -> None: if kill_tasks: await asyncio.gather(*kill_tasks, return_exceptions=True) - async def _schedule_ready_jobs(self) -> None: - """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" - # Find out how many jobs we can dispatch according to the scheduler's parallelism limit - available_slots = ( - self._max_parallelism - len(self._running) - if self._max_parallelism - else len(self._ready_heap) - ) - if available_slots <= 0: - return + async def _get_jobs_to_launch( + self, available_slots: int + ) -> dict[str, list[tuple[Priority, JobRecord]]]: + """Get the sets of jobs to try and launch at this moment. + Returns a mapping of backend names to the lists of jobs to launch for that backend, + where jobs are defined by their priority value and record. + """ # Collect jobs to launch in a dict, grouped per backend, for batched launching. to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) blocked: list[tuple[Priority, str]] = [] @@ -565,6 +576,15 @@ async def _schedule_ready_jobs(self) -> None: blocked.append((neg_priority, job_id)) continue + # Check we have the resources to run the job, and acquire them if so. + if ( + self._resources + and job.spec.resources + and not await self._resources.try_allocate(job.spec.resources) + ): + blocked.append((neg_priority, job_id)) + continue + to_launch[job.backend_key].append((neg_priority, job)) slots_used += 1 @@ -572,6 +592,29 @@ async def _schedule_ready_jobs(self) -> None: for entry in blocked: heapq.heappush(self._ready_heap, entry) + # If nothing is running and nothing was scheduled to run, there must not be + # enough resources to run any jobs. Warn the user. + if blocked and not self._running and slots_used == 0: + log.warning( + "All queued jobs cannot be scheduled due to resource limits, despite no jobs " + "currently being executed." + ) + + return to_launch + + async def _schedule_ready_jobs(self) -> None: + """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" + # Find out how many jobs we can dispatch according to the scheduler's parallelism limit + available_slots = ( + self._max_parallelism - len(self._running) + if self._max_parallelism + else len(self._ready_heap) + ) + if available_slots <= 0: + return + + to_launch = await self._get_jobs_to_launch(available_slots) + # Launch the selected jobs in batches per backend launch_tasks = [] for backend_name, jobs in to_launch.items(): @@ -593,6 +636,8 @@ async def _schedule_ready_jobs(self) -> None: if handle is None: log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + if self._resources and job.spec.resources: + self._resources.release(job.spec.resources) continue job.handle = handle diff --git a/src/dvsim/scheduler/resources.py b/src/dvsim/scheduler/resources.py new file mode 100644 index 00000000..8914c204 --- /dev/null +++ b/src/dvsim/scheduler/resources.py @@ -0,0 +1,203 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler resource (parallelism limitation) management.""" + +from collections import defaultdict +from collections.abc import Iterable +from enum import Enum +from typing import Protocol + +from dvsim.job.data import JobSpec, ResourceMapping +from dvsim.logging import log + +__all__ = ( + "ResourceManager", + "ResourceProvider", + "StaticResourceProvider", +) + + +class ResourceProvider(Protocol): + """An abstraction of something that provides resource availability info to the scheduler.""" + + is_dynamic: bool + """Whether the resources returned by the provider can change over time.""" + + async def get_capacity(self) -> ResourceMapping: + """Get the capacity of resources available from this provider. + + An integer capacity defines a strict limit for parallelism of that resource, whereas `None` + indicates no upper bound on parallelism. + """ + ... + + +class StaticResourceProvider: + """Provides information about static (unchanging) resource availability.""" + + is_dynamic = False + + def __init__(self, limits: ResourceMapping) -> None: + """Construct a ResourceProvider with defined static limits.""" + self._limits = limits + + async def get_capacity(self) -> ResourceMapping: + """Get the static capacity of the resources available from this provider. + + An integer capacity defines a strict limit for parallelism of that resource, whereas `None` + indicates no upper bound on parallelism. + """ + return self._limits + + +class UnknownResourcePolicy(str, Enum): + """Behaviour upon a job requesting a resource without any defined limit.""" + + IGNORE = "ignore" + WARN = "warn" + ERROR = "error" + FATAL = "fatal" + + +class ResourceManager: + """Manages scheduler resources, limiting parallelism on a per-resource basis.""" + + def __init__( + self, + provider: ResourceProvider, + missing_policy: UnknownResourcePolicy = UnknownResourcePolicy.IGNORE, + ) -> None: + """Construct a ResourceManager instance.""" + self._provider = provider + self._missing_policy = missing_policy + self._usage = defaultdict(int) + + async def can_allocate(self, request: ResourceMapping) -> bool: + """Check if a given resource request can be allocated, given current usage and limits.""" + capacity = await self._provider.get_capacity() + for resource, needed in request.items(): + limit = capacity.get(resource) + if limit is None: + continue + if needed is None or self._usage[resource] + needed > limit: + return False + return True + + async def try_allocate(self, request: ResourceMapping) -> bool: + """Attempt to allocate the requested resources, recording their usage. + + Returns: + True if successfully allocated, false otherwise. + + """ + # Note: we use no lock here on in `ResourceManager.release()` because we assume the + # invariant that there is never an await between the check and the mutation. If the + # code is changed in this manner, then both of these methods should acquire an + # asyncio.Lock() first around their operation. + if not await self.can_allocate(request): + return False + for resource, amount in request.items(): + if amount is not None: + self._usage[resource] += amount + return True + + def release(self, request: ResourceMapping) -> None: + """Release a set of allocated resources.""" + for resource, amount in request.items(): + if amount is not None: + self._usage[resource] -= amount + + def _log_usage(self, capacity: ResourceMapping, used: ResourceMapping) -> None: + """Debug log individual job resource usage aggregates.""" + if log.isEnabledFor(log.DEBUG): + for resource, usage in used.items(): + limit = capacity.get(resource) + usage_str = str(usage) if usage is not None else "unlimited" + limit_str = str(limit) if limit is not None else "unlimited" + log.debug( + "Total '%s' resources used: %s, limit: %s", resource, usage_str, limit_str + ) + + def _handle_missing_resource(self, job: JobSpec, resource: str, errors: list[str]) -> None: + """Handle a job using an undefined resource, according to the configured policy. + + Args: + job: the job with a missing resource + resource: the name of the missing resource + errors: the list of errors to append an error to if needed. + + """ + if not job.resources: + return + + needed = job.resources.get(resource) + message = f"Job '{job.full_name}' uses unknown resource '{resource}' ({needed} requested)" + match self._missing_policy: + case UnknownResourcePolicy.WARN: + log.warning(message) + case UnknownResourcePolicy.ERROR: + log.error(message) + case UnknownResourcePolicy.FATAL: + errors.append(message) + + def _emit_validation_errors( + self, missing_resources: list[str], limit_exceeded: list[str] + ) -> None: + """Emit aggregated job validation error messages according to the manager's configuration. + + Args: + missing_resources: error messages for any resources used that were not defined. + limit_exceeded: error messages for any job whose resources exceeded the defined limits. + + """ + if missing_resources: + msg = "Job resources had errors:\n" + "\n".join(missing_resources + limit_exceeded) + raise ValueError(msg) + if limit_exceeded: + msg = "Invalid job resource requirements:\n" + "\n".join(limit_exceeded) + # If we know the available resources are static, this should be a fatal error. + if not self._provider.is_dynamic: + raise ValueError(msg) + log.warning("%s", msg) + + async def validate_jobs(self, jobs: Iterable[JobSpec]) -> None: + """Validate given jobs against known (initial) resource limits. + + Validate that the resources required by a list of jobs are less than those that are + initially available from some resource providers. Note that if resources are not static, + this is not a guarantee that resources will remain available. + """ + capacity = await self._provider.get_capacity() + aggregate: ResourceMapping = defaultdict(int) + # Collect all errors before reporting to give more detailed info + missing_resource_errors: list[str] = [] + limit_exceeded_errors: list[str] = [] + + for job in jobs: + if not job.resources: + continue + + for resource, needed in job.resources.items(): + used = aggregate[resource] + if needed is None: + aggregate[resource] = None + elif used is not None: + aggregate[resource] = used + needed + + if resource not in capacity: + self._handle_missing_resource(job, resource, missing_resource_errors) + continue + + limit = capacity[resource] + if limit is not None and (needed is None or needed > limit): + amount = "unlimited" if needed is None else f"{needed} of" + msg = ( + f"Job '{job.full_name}' requires {amount} '{resource}' " + f"but the max available is {limit}" + ) + limit_exceeded_errors.append(msg) + + self._log_usage(capacity, aggregate) + self._emit_validation_errors(missing_resource_errors, limit_exceeded_errors) diff --git a/src/dvsim/scheduler/runner.py b/src/dvsim/scheduler/runner.py index 6149e95c..4a49577c 100644 --- a/src/dvsim/scheduler/runner.py +++ b/src/dvsim/scheduler/runner.py @@ -13,6 +13,12 @@ from dvsim.runtime.registry import backend_registry from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager +from dvsim.scheduler.resources import ( + ResourceManager, + ResourceMapping, + StaticResourceProvider, + UnknownResourcePolicy, +) from dvsim.scheduler.status_printer import create_status_printer __all__ = ( @@ -31,7 +37,7 @@ def build_default_scheduler_backend( fake_policy: policy for generating fake data if using the fake backend Returns: - Runtime backend to use with the scehduler. + Runtime backend to use with the scheduler. """ # Create the runtime backends. TODO: support multiple runtime backends at once @@ -44,12 +50,32 @@ def build_default_scheduler_backend( return default_backend +def build_resource_manager( + *, + resource_limits: ResourceMapping, + missing_policy: UnknownResourcePolicy, +) -> ResourceManager | None: + """Build a resource manager for use with the scheduler and validate the given jobs' resources. + + Args: + resource_limits: The list of static resource limits to impose on the scheduler. + missing_policy: How to handle requested job resources without any defined limits. + + """ + if not resource_limits and missing_policy == UnknownResourcePolicy.IGNORE: + return None + + provider = StaticResourceProvider(resource_limits) + return ResourceManager(provider, missing_policy) + + async def run_scheduler( *, jobs: Iterable[JobSpec], max_parallel: int, interactive: bool, backend: RuntimeBackend, + resource_manager: ResourceManager | None, ) -> list[CompletedJobStatus]: """Run the scheduler with the given set of job specifications. @@ -58,6 +84,7 @@ async def run_scheduler( max_parallel: number of max parallel jobs to run interactive: run the tool in interactive mode? backend: the scheduler backend to use + resource_manager: the scheduler resource manager to use, if any. Returns: List of completed job status objects. @@ -73,6 +100,7 @@ async def run_scheduler( backends={backend.name: backend}, default_backend=backend.name, max_parallelism=max_parallel, + resource_manager=resource_manager, # The scheduler prioritizes jobs in (lexicographically) decreasing order based on # the given `priority_fn`. We hence define a prioritization scheme that prioritizes # first by decreasing weight, then by decreasing timeout, and finally by the decreasing diff --git a/src/dvsim/sim/tool/xcelium.py b/src/dvsim/sim/tool/xcelium.py index 9dd1bd4a..ef0abd93 100644 --- a/src/dvsim/sim/tool/xcelium.py +++ b/src/dvsim/sim/tool/xcelium.py @@ -5,7 +5,7 @@ """EDA tool plugin providing Xcelium support to DVSim.""" import re -from collections import OrderedDict +from collections import defaultdict from collections.abc import Mapping, Sequence from pathlib import Path from typing import TYPE_CHECKING @@ -32,7 +32,7 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] tuple of, List of metrics and values, and final coverage total """ - with Path(cov_report_path).open() as buf: + with cov_report_path.open() as buf: for line in buf: if "name" in line: # Strip the line and remove the unwanted "* Covered" string. @@ -41,9 +41,8 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] metrics[0] = "Score" # Gather the list of metrics. - items = OrderedDict() + items: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for metric in metrics: - items[metric] = {} items[metric]["covered"] = 0 items[metric]["total"] = 0 @@ -75,8 +74,10 @@ def get_cov_summary_table(cov_report_path: Path) -> tuple[Sequence[Sequence[str] values.append(value) if metric == "Score": cov_total = value + if cov_total is None: + break - return [items.keys(), values], cov_total + return [list(items.keys()), values], cov_total # If we reached here, then we were unable to extract the coverage. msg = f"Coverage data not found in {buf.name}!" diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 1497161b..b432cefd 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -25,6 +25,7 @@ from dvsim.report.data import IPMeta, ToolMeta from dvsim.runtime.legacy import LegacyLauncherAdapter from dvsim.scheduler.core import Scheduler +from dvsim.scheduler.resources import ResourceManager, StaticResourceProvider __all__ = () @@ -298,6 +299,7 @@ def job_spec_factory( "job_type": "mock_type", "target": "mock_target", "backend": None, + "resources": None, "seed": None, "dependencies": [], "needs_all_dependencies_passing": True, @@ -453,6 +455,16 @@ async def test_duplicate_jobs(fxt: Fxt) -> None: # Check names of all jobs are unique (i.e. no duplicates are returned). assert_that(len(names), equal_to(len(set(names)))) + @staticmethod + async def _parallelism_test_helper( + fxt: Fxt, scheduler: Scheduler, num_jobs: int, expected_parallelism: int + ) -> None: + """Test helper to check that scheduler parallelism reaches the expected level.""" + assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) + result = await scheduler.run() + _assert_result_status(result, num_jobs) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(expected_parallelism)) + @staticmethod @pytest.mark.asyncio @pytest.mark.timeout(DEFAULT_TIMEOUT) @@ -461,10 +473,7 @@ async def test_parallel_dispatch(fxt: Fxt, num_jobs: int) -> None: """Test that many jobs can be dispatched in parallel.""" jobs = make_many_jobs(fxt.tmp_path, num_jobs) scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) - result = await scheduler.run() - _assert_result_status(result, num_jobs) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(num_jobs)) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, num_jobs) @staticmethod @pytest.mark.asyncio @@ -483,10 +492,46 @@ async def test_max_parallel( else: fxt.mock_legacy_backend.max_parallelism = max_parallel scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) - result = await scheduler.run() - _assert_result_status(result, num_jobs) - assert_that(fxt.mock_ctx.max_concurrent, equal_to(min(num_jobs, max_parallel))) + expected_parallel = min(num_jobs, max_parallel) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) + + @staticmethod + @pytest.mark.asyncio + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_a_jobs", [5, 10, 20]) + @pytest.mark.parametrize("num_b_jobs", [7, 13, 26]) + @pytest.mark.parametrize("limit", [2, 20, 35]) + async def test_resource_parallelism( + fxt: Fxt, num_a_jobs: int, num_b_jobs: int, limit: int + ) -> None: + """Test that the parallelism limits imposed via scheduler resources are respected.""" + num_jobs = num_a_jobs + num_b_jobs + resource = ["A" if i < num_a_jobs else "B" for i in range(num_jobs)] + jobs = make_many_jobs( + fxt.tmp_path, num_a_jobs + num_b_jobs, per_job=lambda i: {"resources": {resource[i]: 1}} + ) + # Ensure there are no parallelism limits in the launcher/backend. + fxt.mock_legacy_backend.max_parallelism = 0 + resource_manager = ResourceManager(StaticResourceProvider({"A": limit, "B": limit})) + scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND, resource_manager=resource_manager) + expected_parallel = min(num_a_jobs, limit) + min(num_b_jobs, limit) + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) + + @staticmethod + @pytest.mark.asyncio + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_resources", [1, 2, 5]) + @pytest.mark.parametrize("limit", [5, 16, 33, None]) + async def test_resource_usage(fxt: Fxt, num_resources: int, limit: int | None) -> None: + """Test that job resource limits allow jobs to use multiples of resources.""" + num_jobs = limit * 2 if limit else num_resources * 2 + jobs = make_many_jobs(fxt.tmp_path, num_jobs, resources={"TEST": num_resources}) + # Ensure there are no parallelism limits in the launcher/backend. + fxt.mock_legacy_backend.max_parallelism = 0 + resource_manager = ResourceManager(StaticResourceProvider({"TEST": limit})) + scheduler = Scheduler(jobs, fxt.backends, MOCK_BACKEND, resource_manager=resource_manager) + expected_parallel = limit // num_resources if limit else num_jobs + await TestScheduling._parallelism_test_helper(fxt, scheduler, num_jobs, expected_parallel) @staticmethod @pytest.mark.asyncio