Skip to content
44 changes: 44 additions & 0 deletions src/dvsim/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dvsim.logging import LOG_LEVELS, configure_logging, log
from dvsim.runtime.backend import RuntimeBackend
from dvsim.runtime.registry import BackendType, backend_registry
from dvsim.scheduler.resources import UnknownResourcePolicy
from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer
from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, rm_path, run_cmd_with_timeout

Expand Down Expand Up @@ -155,6 +156,24 @@ def resolve_max_parallel(arg):
return 16


def parse_resource(s: str) -> tuple[str, int | None]:
"""Parse a resource limit string from the argparse CLI."""
unbounded_strs = ("all", "any", "inf", "infinite", "many", "max", "none", "null", "unlimited")
try:
key, val = "=".join(s.split()).split("=")
key, val = key.strip().upper(), val.strip()
if val.lower() in unbounded_strs:
return key, None
val = int(val)
if val <= 0:
msg = f"Resource values should be positive integers or 'INF' / 'NONE', not {val}."
raise argparse.ArgumentTypeError(msg)
return key, int(val)
except (ValueError, KeyError, RuntimeError) as e:
msg = f"Invalid resource format: {s}, expected RESOURCE=COUNT"
raise argparse.ArgumentTypeError(msg) from e


def resolve_branch(branch):
"""Choose a branch name for output files.

Expand Down Expand Up @@ -426,6 +445,31 @@ def parse_args(argv: list[str] | None = None):
),
)

resources = parser.add_argument_group("Resource management")

resources.add_argument(
"-R",
"--resource",
metavar="RESOURCE=COUNT",
type=parse_resource,
dest="resource_limits",
action="append",
help="Set a limit for a resource (repeatable), e.g. --resource A=30 or -R B=unlimited.",
)

resources.add_argument(
"--on-missing-resource",
# TODO: when using Python 3.11+, make UnknownResourcePolicy a StrEnum instead and then
# just use the enum type directly.
type=str.lower,
choices=[p.value for p in UnknownResourcePolicy],
default=UnknownResourcePolicy.IGNORE.value,
help=(
"Behaviour when a job requests a resource with no defined limit. "
"Defaults to %(default)s."
),
)

pathg = parser.add_argument_group("File management")

pathg.add_argument(
Expand Down
15 changes: 14 additions & 1 deletion src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from dvsim.job.data import CompletedJobStatus, JobSpec, WorkspaceConfig
from dvsim.job.status import JobStatus
from dvsim.logging import log
from dvsim.scheduler.runner import build_default_scheduler_backend, run_scheduler
from dvsim.scheduler.resources import UnknownResourcePolicy
from dvsim.scheduler.runner import (
build_default_scheduler_backend,
build_resource_manager,
run_scheduler,
)
from dvsim.utils import (
find_and_substitute_wildcards,
rm_path,
Expand Down Expand Up @@ -471,12 +476,20 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]:
fake_policy=self._fake_policy,
)

# TODO: For Python 3.11 make this a StrEnum, then this conversion is not needed.
missing_policy = UnknownResourcePolicy(self.args.on_missing_resource)
resource_manager = build_resource_manager(
resource_limits=dict(self.args.resource_limits or ()),
missing_policy=missing_policy,
)

return asyncio.run(
run_scheduler(
jobs=jobs,
max_parallel=self.args.max_parallel,
interactive=self.interactive,
backend=backend,
resource_manager=resource_manager,
)
)

Expand Down
11 changes: 11 additions & 0 deletions src/dvsim/job/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from collections.abc import Callable, Mapping, Sequence
from pathlib import Path
from typing import TypeAlias

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -41,6 +42,10 @@ class WorkspaceConfig(BaseModel):
"""Path within the scratch directory to use for this run."""


# A mapping of resource names to the max number of that resource available (or None if unbounded).
ResourceMapping: TypeAlias = dict[str, int | None]


class JobSpec(BaseModel):
"""Job specification."""

Expand All @@ -59,6 +64,12 @@ class JobSpec(BaseModel):
indicates that whatever is configured as the 'default' backend should be used.
"""

resources: ResourceMapping | None
"""Resource requirements of the job. Maps the name of the resource to the amount
of that resource that is required to run the job. If the scheduler is instructed
to run with enforced resource limits, this limits per-resource parallelism.
"""

seed: int | None
"""Seed if there is one."""

Expand Down
7 changes: 7 additions & 0 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def get_job_spec(self) -> "JobSpec":
# TODO: for now we always use the default configured backend, but it might be good
# to allow different jobs to run on different backends in the future?
backend=None,
resources=self.resources,
seed=getattr(self, "seed", None),
full_name=self.full_name,
qual_name=self.qual_name,
Expand Down Expand Up @@ -236,6 +237,12 @@ def _set_attrs(self) -> None:
"""
self._extract_attrs(self.sim_cfg.__dict__)

# Use the configured tool to determine the resources (licenses) that are required.
# For now, we just assume that the tool itself is the only resource needed.
self.resources = None
if hasattr(self.sim_cfg, "tool") and self.sim_cfg.tool:
self.resources = {self.sim_cfg.tool.upper(): 1}

# Enable GUI mode, also when GUI debug mode has been invoked.
self.gui = self.sim_cfg.gui

Expand Down
73 changes: 59 additions & 14 deletions src/dvsim/scheduler/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dvsim.logging import log
from dvsim.runtime.backend import RuntimeBackend
from dvsim.runtime.data import JobCompletionEvent, JobHandle
from dvsim.scheduler.resources import ResourceManager

__all__ = (
"JobPriorityFn",
Expand Down Expand Up @@ -94,7 +95,8 @@ def __init__( # noqa: PLR0913
backends: Mapping[str, RuntimeBackend],
default_backend: str,
*,
max_parallelism: int = 0,
max_parallelism: int | None = None,
resource_manager: ResourceManager | None = None,
priority_fn: JobPriorityFn | None = None,
coalesce_window: float | None = 0.001,
) -> None:
Expand All @@ -106,7 +108,9 @@ def __init__( # noqa: PLR0913
backends: The mapping (name -> backend) of backends available to the scheduler.
default_backend: The name of the default backend to use if not specified by a job.
max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch
at once, across all backends. The default value of `0` indicates no upper limit.
at once, across all backends. The default value of `None` indicates no upper limit.
resource_manager: The scheduler's resource manager, through which per-job resources
are allocated to enforce additional limits on scheduler parallelism.
priority_fn: A function to calculate the priority of a given job. If no function is
given, this defaults to using the job's weight.
coalesce_window: If specified, the time in seconds to wait on receiving a job
Expand All @@ -115,8 +119,8 @@ def __init__( # noqa: PLR0913
extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`.

"""
if max_parallelism < 0:
err = f"max_parallelism must be some non-negative integer, not {max_parallelism}"
if max_parallelism is not None and max_parallelism <= 0:
err = f"max_parallelism must be some positive integer or None, not {max_parallelism}"
raise ValueError(err)
if default_backend not in backends:
err = f"Default backend '{default_backend}' is not in the mapping of given backends"
Expand All @@ -128,6 +132,7 @@ def __init__( # noqa: PLR0913
self._backends = dict(backends)
self._default_backend = default_backend
self._max_parallelism = max_parallelism
self._resources = resource_manager
self._priority_fn = priority_fn or self._default_priority
self._coalesce_window = coalesce_window

Expand Down Expand Up @@ -321,6 +326,8 @@ def _mark_job_completed(
if job.spec.id in self._running:
self._running.remove(job.spec.id)
self._running_per_backend[job.backend_key] -= 1
if self._resources and job.spec.resources:
self._resources.release(job.spec.resources)

# Update dependents (jobs that depend on this job), propagating failures if needed.
self._update_completed_job_deps(job)
Expand Down Expand Up @@ -354,6 +361,13 @@ def _update_completed_job_deps(self, job: JobRecord) -> None:

async def run(self) -> list[CompletedJobStatus]:
"""Run all scheduled jobs to completion (unless terminated) and return the results."""
# Check if we know about all the resources defined by the given jobs, and whether
# initial resource availability can (independently) satisfy all jobs' needs.
# This is an error if we know we are using static resources, and a warning otherwise.
if self._resources:
specs = [job.spec for job in self._jobs.values()]
await self._resources.validate_jobs(specs)

self._install_signal_handlers()

for backend in self._backends.values():
Expand Down Expand Up @@ -536,17 +550,14 @@ async def _handle_exit_signal(self) -> None:
if kill_tasks:
await asyncio.gather(*kill_tasks, return_exceptions=True)

async def _schedule_ready_jobs(self) -> None:
"""Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism."""
# Find out how many jobs we can dispatch according to the scheduler's parallelism limit
available_slots = (
self._max_parallelism - len(self._running)
if self._max_parallelism
else len(self._ready_heap)
)
if available_slots <= 0:
return
async def _get_jobs_to_launch(
self, available_slots: int
) -> dict[str, list[tuple[Priority, JobRecord]]]:
"""Get the sets of jobs to try and launch at this moment.

Returns a mapping of backend names to the lists of jobs to launch for that backend,
where jobs are defined by their priority value and record.
"""
# Collect jobs to launch in a dict, grouped per backend, for batched launching.
to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list)
blocked: list[tuple[Priority, str]] = []
Expand All @@ -565,13 +576,45 @@ async def _schedule_ready_jobs(self) -> None:
blocked.append((neg_priority, job_id))
continue

# Check we have the resources to run the job, and acquire them if so.
if (
self._resources
and job.spec.resources
and not await self._resources.try_allocate(job.spec.resources)
):
blocked.append((neg_priority, job_id))
continue

to_launch[job.backend_key].append((neg_priority, job))
slots_used += 1

# Requeue any blocked jobs.
for entry in blocked:
heapq.heappush(self._ready_heap, entry)

# If nothing is running and nothing was scheduled to run, there must not be
# enough resources to run any jobs. Warn the user.
if blocked and not self._running and slots_used == 0:
log.warning(
"All queued jobs cannot be scheduled due to resource limits, despite no jobs "
"currently being executed."
)

return to_launch

async def _schedule_ready_jobs(self) -> None:
"""Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism."""
# Find out how many jobs we can dispatch according to the scheduler's parallelism limit
available_slots = (
self._max_parallelism - len(self._running)
if self._max_parallelism
else len(self._ready_heap)
)
if available_slots <= 0:
return

to_launch = await self._get_jobs_to_launch(available_slots)

# Launch the selected jobs in batches per backend
launch_tasks = []
for backend_name, jobs in to_launch.items():
Expand All @@ -593,6 +636,8 @@ async def _schedule_ready_jobs(self) -> None:
if handle is None:
log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name)
heapq.heappush(self._ready_heap, (neg_priority, job.spec.id))
if self._resources and job.spec.resources:
self._resources.release(job.spec.resources)
continue

job.handle = handle
Expand Down
Loading
Loading