From 2902b6c5faa5594aea682c9fca752330186b7853 Mon Sep 17 00:00:00 2001 From: Tim Fuller Date: Thu, 2 Oct 2025 14:24:54 -0600 Subject: [PATCH 1/4] add default machine:resource scope --- src/hpc_connect/config.py | 229 +++++++------------------------ src/hpc_connect/discover.py | 31 +++++ src/hpc_connect/hookspec.py | 6 + src/hpc_connect/pluginmanager.py | 2 + src/hpc_connect/schemas.py | 141 +++++++++++++++++++ src/hpcc_flux/__init__.py | 13 ++ src/hpcc_flux/discover.py | 61 ++++++++ src/hpcc_flux/submit_api.py | 62 ++------- src/hpcc_pbs/__init__.py | 12 ++ src/hpcc_pbs/discover.py | 52 +++++++ src/hpcc_pbs/submit.py | 49 +------ src/hpcc_slurm/__init__.py | 13 ++ src/hpcc_slurm/discover.py | 101 ++++++++++++++ src/hpcc_slurm/submit.py | 98 +------------ 14 files changed, 504 insertions(+), 366 deletions(-) create mode 100644 src/hpc_connect/discover.py create mode 100644 src/hpc_connect/schemas.py create mode 100644 src/hpcc_flux/discover.py create mode 100644 src/hpcc_pbs/discover.py create mode 100644 src/hpcc_slurm/discover.py diff --git a/src/hpc_connect/config.py b/src/hpc_connect/config.py index a4ef0ad..9770a4b 100644 --- a/src/hpc_connect/config.py +++ b/src/hpc_connect/config.py @@ -5,8 +5,6 @@ import logging import math import os -import shlex -import shutil import sys from collections.abc import ValuesView from functools import cached_property @@ -14,130 +12,36 @@ from typing import Any import pluggy -import psutil +import schema import yaml -from schema import Optional -from schema import Or -from schema import Schema -from schema import Use from .pluginmanager import HPCConnectPluginManager +from .schemas import config_schema +from .schemas import launch_schema +from .schemas import machine_schema +from .schemas import submit_schema from .util import collections from .util import safe_loads from .util.string import strip_quotes logger = logging.getLogger("hpc_connect") - -def flag_splitter(arg: list[str] | str) -> list[str]: - if isinstance(arg, str): - return shlex.split(arg) - elif not isinstance(arg, list) and not all(isinstance(str, _) for _ in arg): - raise ValueError("expected list[str]") - return arg - - -def dict_str_str(arg: Any) -> bool: - f = isinstance - return f(arg, dict) and all([f(_, str) for k, v in arg.items() for _ in (k, v)]) - - -class choose_from: - def __init__(self, *choices: str | None): - self.choices = set(choices) - - def __call__(self, arg: str | None) -> str | None: - if arg not in self.choices: - raise ValueError(f"Invalid choice {arg!r}, choose from {self.choices!r}") - return arg - - -def which(arg: str) -> str: - if path := shutil.which(arg): - return path - logger.debug(f"{arg} not found on PATH") - return arg - - -# Resource spec have the following form: -# machine: -# resources: -# - type: node -# count: node_count -# resources: -# - type: socket -# count: sockets_per_node -# resources: -# - type: resource_name (like cpus) -# count: type_per_socket -# additional_properties: (optional) -# - type: slots -# count: 1 - -resource_spec = { - "type": "node", - "count": int, - Optional("additional_properties"): Or(dict, None), - "resources": [ - { - "type": str, - "count": int, - Optional("additional_properties"): Or(dict, None), - Optional("resources"): [ - { - "type": str, - "count": int, - Optional("additional_properties"): Or(dict, None), - }, - ], - }, - ], +section_schemas: dict[str, schema.Schema] = { + "config": config_schema, + "machine": machine_schema, + "submit": submit_schema, + "launch": launch_schema, } -launch_spec = { - Optional("numproc_flag"): str, - Optional("default_options"): Use(flag_splitter), - Optional("local_options"): Use(flag_splitter), - Optional("pre_options"): Use(flag_splitter), - Optional("mappings"): dict_str_str, -} - -schema = Schema( - { - "hpc_connect": { - Optional("config"): { - Optional("debug"): bool, - }, - Optional("submit"): { - Optional("backend"): Use( - choose_from(None, "shell", "slurm", "sbatch", "pbs", "qsub", "flux") - ), - Optional("default_options"): Use(flag_splitter), - Optional(str): { - Optional("default_options"): Use(flag_splitter), - }, - }, - Optional("machine"): { - Optional("resources"): Or([resource_spec], None), - }, - Optional("launch"): { - Optional("exec"): Use(which), - **launch_spec, - Optional(str): launch_spec, - }, - } - }, - ignore_extra_keys=True, - description="HPC connect configuration schema", -) - - class ConfigScope: def __init__(self, name: str, file: str | None, data: dict[str, Any]) -> None: self.name = name self.file = file - self.data = schema.validate({"hpc_connect": data})["hpc_connect"] + self.data: dict[str, Any] = {} + for section, data in data.items(): + schema = section_schemas[section] + self.data[section] = schema.validate(data) def __repr__(self): file = self.file or "" @@ -151,6 +55,9 @@ def __eq__(self, other): def __iter__(self): return iter(self.data) + def __contains__(self, section: str) -> bool: + return section in self.data + def get_section(self, section: str) -> Any: return self.data.get(section) @@ -161,34 +68,33 @@ def dump(self) -> None: yaml.dump({"hpc_connect": self.data}, fh, default_flow_style=False) -config_defaults = { - "config": { - "debug": False, - }, - "machine": { - "resources": None, - }, - "submit": { - "backend": None, - "default_options": [], - }, - "launch": { - "exec": "mpiexec", - "numproc_flag": "-n", - "default_options": [], - "local_options": [], - "pre_options": [], - "mappings": {}, - }, -} - - class Config: def __init__(self) -> None: self.pluginmanager: pluggy.PluginManager = HPCConnectPluginManager() - self.scopes: dict[str, ConfigScope] = { - "defaults": ConfigScope("defaults", None, config_defaults) + rspec = self.pluginmanager.hook.hpc_connect_discover_resources() + defaults = { + "config": { + "debug": False, + }, + "machine": { + "resources": rspec, + }, + "submit": { + "backend": None, + "default_options": [], + }, + "launch": { + "exec": "mpiexec", + "numproc_flag": "-n", + "default_options": [], + "local_options": [], + "pre_options": [], + "mappings": {}, + }, } + self.scopes: dict[str, ConfigScope] = {} + default_scope = ConfigScope("defaults", None, defaults) + self.push_scope(default_scope) for scope in ("site", "global", "local"): config_scope = read_config_scope(scope) self.push_scope(config_scope) @@ -223,6 +129,8 @@ def get_config(self, section: str, scope: str | None = None) -> Any: return merged_section[section] def get(self, path: str, default: Any = None, scope: str | None = None) -> Any: + if path == "machine:resources": + return self.resource_specs parts = process_config_path(path) section = parts.pop(0) value = self.get_config(section, scope=scope) @@ -336,18 +244,12 @@ def set_main_options(self, args: argparse.Namespace) -> None: @property def resource_specs(self) -> list[dict]: - from .submit import factory - - if resource_specs := self.get("machine:resources"): - return resource_specs - if self.get("submit:backend"): - # backend may set resources - factory(config=self) - if resource_specs := self.get("machine:resources"): - return resource_specs - resource_specs = default_resource_spec() - self.set("machine:resources", resource_specs, scope="defaults") - return resource_specs + scope: ConfigScope + for scope in reversed(self.scopes.values()): + if "machine" in scope: + if resources := scope.data["machine"].get("resources"): + return resources + raise ValueError("No machine resources set") def resource_types(self) -> list[str]: """Return the types of resources available""" @@ -486,20 +388,15 @@ def compute_required_resources( return reqd_resources def dump(self, stream: IO[Any], scope: str | None = None, **kwargs: Any) -> None: - from .submit import factory - - # initialize the resource spec - if self.get("machine:resources") is None: - if self.get("submit:backend"): - factory(self) - if not self.get("machine:resources"): - self.set("machine:resources", default_resource_spec(), scope="defaults") data: dict[str, Any] = {} for section in self.scopes["defaults"]: + if section == "machine": + continue section_data = self.get_config(section, scope=scope) if not section_data and scope is not None: continue data[section] = section_data + data.setdefault("machine", {})["resources"] = self.resource_specs yaml.dump({"hpc_connect": data}, stream, **kwargs) @@ -548,7 +445,7 @@ def load_mappings(arg: str) -> dict[str, str]: key = "_".join(parts) except ValueError: continue - if section not in config_defaults: + if section not in section_schemas: continue value: Any if key == "mappings": @@ -609,25 +506,3 @@ def set_logging_level(levelname: str) -> None: for h in logger.handlers: h.setLevel(level) logger.setLevel(level) - - -def default_resource_spec() -> list[dict]: - resource_spec: list[dict] = [ - { - "type": "node", - "count": 1, - "resources": [ - { - "type": "socket", - "count": 1, - "resources": [ - { - "type": "cpu", - "count": psutil.cpu_count(), - }, - ], - }, - ], - } - ] - return resource_spec diff --git a/src/hpc_connect/discover.py b/src/hpc_connect/discover.py new file mode 100644 index 0000000..5632849 --- /dev/null +++ b/src/hpc_connect/discover.py @@ -0,0 +1,31 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + +import fnmatch +import json +import os +from typing import Any + +import psutil + +from .hookspec import hookimpl + + +@hookimpl(trylast=True, specname="hpc_connect_discover_resources") +def default_resource_set() -> list[dict[str, Any]]: + local_resource = {"type": "cpu", "count": psutil.cpu_count()} + socket_resource = {"type": "socket", "count": 1, "resources": [local_resource]} + return [{"type": "node", "count": 1, "resources": [socket_resource]}] + + +@hookimpl(specname="hpc_connect_discover_resources") +def read_resources_from_hostfile() -> dict[str, list] | None: + if file := os.getenv("HPC_CONNECT_HOSTFILE"): + with open(file) as fh: + data = json.load(fh) + nodename = os.uname().nodename + for pattern, rspec in data.items(): + if fnmatch.fnmatch(nodename, pattern): + return rspec + return None diff --git a/src/hpc_connect/hookspec.py b/src/hpc_connect/hookspec.py index 87d81c0..22bfc23 100644 --- a/src/hpc_connect/hookspec.py +++ b/src/hpc_connect/hookspec.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT from typing import TYPE_CHECKING +from typing import Any import pluggy @@ -25,3 +26,8 @@ def hpc_connect_submission_manager(config: "Config") -> "HPCSubmissionManager": def hpc_connect_launcher(config: "Config") -> "HPCLauncher": """HPC scheduler implementation""" raise NotImplementedError + + +@hookspec(firstresult=True) +def hpc_connect_discover_resources() -> list[dict[str, Any]]: + raise NotImplementedError diff --git a/src/hpc_connect/pluginmanager.py b/src/hpc_connect/pluginmanager.py index a7ec43e..575bc2e 100644 --- a/src/hpc_connect/pluginmanager.py +++ b/src/hpc_connect/pluginmanager.py @@ -6,8 +6,10 @@ class HPCConnectPluginManager(pluggy.PluginManager): def __init__(self): + from . import discover from . import hookspec super().__init__("hpc_connect") self.add_hookspecs(hookspec) + self.register(discover) self.load_setuptools_entrypoints("hpc_connect") diff --git a/src/hpc_connect/schemas.py b/src/hpc_connect/schemas.py new file mode 100644 index 0000000..e88acac --- /dev/null +++ b/src/hpc_connect/schemas.py @@ -0,0 +1,141 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + +import logging +import shlex +import shutil +from typing import Any + +from schema import Optional +from schema import Or +from schema import Schema +from schema import Use + +logger = logging.getLogger(__name__) + + +def flag_splitter(arg: list[str] | str) -> list[str]: + if isinstance(arg, str): + return shlex.split(arg) + elif not isinstance(arg, list) and not all(isinstance(str, _) for _ in arg): + raise ValueError("expected list[str]") + return arg + + +def dict_str_str(arg: Any) -> bool: + f = isinstance + return f(arg, dict) and all([f(_, str) for k, v in arg.items() for _ in (k, v)]) + + +class choose_from: + def __init__(self, *choices: str | None): + self.choices = set(choices) + + def __call__(self, arg: str | None) -> str | None: + if arg not in self.choices: + raise ValueError(f"Invalid choice {arg!r}, choose from {self.choices!r}") + return arg + + +def which(arg: str) -> str: + if path := shutil.which(arg): + return path + logger.debug(f"{arg} not found on PATH") + return arg + + +# Resource spec have the following form: +# machine: +# resources: +# - type: node +# count: node_count +# resources: +# - type: socket +# count: sockets_per_node +# resources: +# - type: resource_name (like cpus) +# count: type_per_socket +# additional_properties: (optional) +# - type: slots +# count: 1 + +resource_spec = { + "type": "node", + "count": int, + Optional("additional_properties"): Or(dict, None), + "resources": [ + { + "type": str, + "count": int, + Optional("additional_properties"): Or(dict, None), + Optional("resources"): [ + { + "type": str, + "count": int, + Optional("additional_properties"): Or(dict, None), + }, + ], + }, + ], +} + + +config_schema = Schema({Optional("debug"): bool}) +launch_spec = { + Optional("numproc_flag"): str, + Optional("default_options"): Use(flag_splitter), + Optional("local_options"): Use(flag_splitter), + Optional("pre_options"): Use(flag_splitter), + Optional("mappings"): dict_str_str, +} +launch_schema = Schema( + { + Optional("exec"): Use(which), + Optional(str): launch_spec, + **launch_spec, + } +) +machine_schema = Schema({"resources": [resource_spec]}) +submit_schema = Schema( + { + Optional("backend"): Use( + choose_from(None, "shell", "slurm", "sbatch", "pbs", "qsub", "flux") + ), + Optional("default_options"): Use(flag_splitter), + Optional(str): { + Optional("default_options"): Use(flag_splitter), + }, + }, +) + +hpc_connect_schema = Schema( + { + "hpc_connect": { + Optional("config"): { + Optional("debug"): bool, + }, + Optional("submit"): { + Optional("backend"): Use( + choose_from(None, "shell", "slurm", "sbatch", "pbs", "qsub", "flux") + ), + Optional("default_options"): Use(flag_splitter), + Optional(str): { + Optional("default_options"): Use(flag_splitter), + }, + }, + Optional("machine"): { + Optional("resources"): Or([resource_spec], None), + }, + Optional("launch"): { + Optional("exec"): Use(which), + **launch_spec, + Optional(str): launch_spec, + }, + } + }, + ignore_extra_keys=True, + description="HPC connect configuration schema", +) + +resource_schema = Schema({"resources": [resource_spec]}) diff --git a/src/hpcc_flux/__init__.py b/src/hpcc_flux/__init__.py index c2b2e90..4c2f961 100644 --- a/src/hpcc_flux/__init__.py +++ b/src/hpcc_flux/__init__.py @@ -1,7 +1,13 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + from typing import TYPE_CHECKING +from typing import Any from hpc_connect.hookspec import hookimpl +from .discover import read_resource_info from .submit_hl import FluxSubmissionManager if TYPE_CHECKING: @@ -13,3 +19,10 @@ def hpc_connect_submission_manager(config) -> "HPCSubmissionManager | None": if FluxSubmissionManager.matches(config.get("submit:backend")): return FluxSubmissionManager(config=config) return None + + +@hookimpl +def hpc_connect_discover_resources() -> list[dict[str, Any]] | None: + if info := read_resource_info(): + return [info] + return None diff --git a/src/hpcc_flux/discover.py b/src/hpcc_flux/discover.py new file mode 100644 index 0000000..7227a31 --- /dev/null +++ b/src/hpcc_flux/discover.py @@ -0,0 +1,61 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + +import logging +import shutil +import subprocess +from typing import Any + +logger = logging.getLogger("hpc_connect") + + +def parse_resource_info(output: str) -> dict[str, int] | None: + """Parses the output from `flux resource info` and returns a dictionary of resource values. + + The expected output format is "1 Nodes, 32 Cores, 1 GPUs". + + Returns: + dict: A dictionary containing the resource values with the following keys: + - nodes (int): The number of nodes. + - cpu (int): The number of CPU cores. + - gpu (int): The number of GPU devices. + """ + parts = output.split(", ") + vals = [int(p.split()[0]) for p in parts] + if len(vals) != 3: + return None + return {"nodes": vals[0], "cpu": vals[1], "gpu": vals[2]} + + +def read_resource_info() -> dict[str, Any] | None: + if flux := shutil.which("flux"): + try: + output = subprocess.check_output([flux, "resource", "info"], encoding="utf-8") + except subprocess.CalledProcessError: + return None + if totals := parse_resource_info(output): + # assume homogenous resources + nodes = totals["nodes"] + info: dict = { + "type": "node", + "count": nodes, + "resources": [ + { + "type": "socket", + "count": 1, + "resources": [ + { + "type": "cpu", + "count": int(totals["cpu"] / nodes), + }, + { + "type": "gpu", + "count": int(totals["gpu"] / nodes), + }, + ], + } + ], + } + return info + return None diff --git a/src/hpcc_flux/submit_api.py b/src/hpcc_flux/submit_api.py index 6e66025..d1b5de4 100644 --- a/src/hpcc_flux/submit_api.py +++ b/src/hpcc_flux/submit_api.py @@ -8,7 +8,6 @@ import multiprocessing import multiprocessing.synchronize import os -import subprocess import time from concurrent.futures import CancelledError from datetime import timedelta @@ -22,11 +21,14 @@ from flux.job import JobspecV1 # type: ignore from hpc_connect.config import Config +from hpc_connect.config import ConfigScope from hpc_connect.submit import HPCProcess from hpc_connect.submit import HPCSubmissionFailedError from hpc_connect.submit import HPCSubmissionManager from hpc_connect.util import time_in_seconds +from .discover import read_resource_info + logger = logging.getLogger("hpc_connect") @@ -129,56 +131,6 @@ def poll(self) -> int | None: return max(stat) # type: ignore -def parse_resource_info(output: str) -> dict[str, int] | None: - """Parses the output from `flux resource info` and returns a dictionary of resource values. - - The expected output format is "1 Nodes, 32 Cores, 1 GPUs". - - Returns: - dict: A dictionary containing the resource values with the following keys: - - nodes (int): The number of nodes. - - cpu (int): The number of CPU cores. - - gpu (int): The number of GPU devices. - """ - parts = output.split(", ") - vals = [int(p.split()[0]) for p in parts] - if len(vals) != 3: - return None - return {"nodes": vals[0], "cpu": vals[1], "gpu": vals[2]} - - -def read_resource_info() -> dict[str, Any] | None: - try: - output = subprocess.check_output(["flux", "resource", "info"], encoding="utf-8") - except subprocess.CalledProcessError: - return None - if totals := parse_resource_info(output): - # assume homogenous resources - nodes = totals["nodes"] - info: dict = { - "type": "node", - "count": nodes, - "resources": [ - { - "type": "socket", - "count": 1, - "resources": [ - { - "type": "cpu", - "count": int(totals["cpu"] / nodes), - }, - { - "type": "gpu", - "count": int(totals["gpu"] / nodes), - }, - ], - } - ], - } - return info - return None - - class FluxSubmissionManager(HPCSubmissionManager): """Setup and submit jobs to the Flux scheduler""" @@ -189,9 +141,11 @@ def __init__(self, config: Config | None = None) -> None: super().__init__(config=config) self.flux: FluxExecutor | None = FluxExecutor() self.fh = Flux() - if self.config.get("machine:resources") is None: - if info := read_resource_info(): - self.config.set("machine:resources", [info], scope="defaults") + if info := read_resource_info(): + scope = ConfigScope("flux", None, {"machine": {"resources": [info]}}) + self.config.push_scope(scope) + else: + logger.warning("Unable to determine system configuration from flux, using default") @property def supports_subscheduling(self) -> bool: diff --git a/src/hpcc_pbs/__init__.py b/src/hpcc_pbs/__init__.py index e695fee..c5b7492 100644 --- a/src/hpcc_pbs/__init__.py +++ b/src/hpcc_pbs/__init__.py @@ -1,7 +1,12 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + from typing import TYPE_CHECKING from hpc_connect.hookspec import hookimpl +from .discover import read_pbsnodes from .submit import PBSSubmissionManager if TYPE_CHECKING: @@ -13,3 +18,10 @@ def hpc_connect_submission_manager(config) -> "HPCSubmissionManager | None": if PBSSubmissionManager.matches(config.get("submit:backend")): return PBSSubmissionManager(config=config) return None + + +@hookimpl +def hpc_connect_discover_resources() -> dict[str, list] | None: + if info := read_pbsnodes(): + return {"resources": info} + return None diff --git a/src/hpcc_pbs/discover.py b/src/hpcc_pbs/discover.py new file mode 100644 index 0000000..63d1783 --- /dev/null +++ b/src/hpcc_pbs/discover.py @@ -0,0 +1,52 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + +import json +import logging +import os +import shutil +import subprocess +from typing import Any + +logger = logging.getLogger(__name__) + + +def read_pbsnodes() -> list[dict[str, Any]] | None: + if pbsnodes := shutil.which("pbsnodes"): + args = [pbsnodes, "-a", "-F", "json"] + allocated_nodes: list[str] | None = None + if var := os.getenv("PBS_NODEFILE"): + with open(var) as fh: + allocated_nodes = [line.strip() for line in fh if line.split()] + try: + proc = subprocess.run(args, check=True, encoding="utf-8", capture_output=True) + except subprocess.CalledProcessError: + return None + else: + resources: list[dict[str, Any]] = [] + data = json.loads(proc.stdout) + for nodename, nodeinfo in data["nodes"].items(): + if allocated_nodes is not None and nodename not in allocated_nodes: + continue + cpus_on_node = nodeinfo["pcpus"] + resource: dict[str, Any] = { + "type": "node", + "count": 1, + "additional_properties": {"name": nodename}, + "resources": [ + { + "type": "socket", + "count": 1, + "resources": [ + { + "type": "cpu", + "count": cpus_on_node, + }, + ], + }, + ], + } + resources.append(resource) + return resources + return None diff --git a/src/hpcc_pbs/submit.py b/src/hpcc_pbs/submit.py index 7385db7..9cb9276 100644 --- a/src/hpcc_pbs/submit.py +++ b/src/hpcc_pbs/submit.py @@ -12,10 +12,13 @@ from typing import Any from hpc_connect.config import Config +from hpc_connect.config import ConfigScope from hpc_connect.submit import HPCProcess from hpc_connect.submit import HPCSubmissionFailedError from hpc_connect.submit import HPCSubmissionManager +from .discover import read_pbsnodes + logger = logging.getLogger(__name__) @@ -112,9 +115,9 @@ def __init__(self, config: Config | None = None) -> None: qdel = shutil.which("qdel") if qdel is None: raise ValueError("qdel not found on PATH") - if self.config.get("machine:resources") is None: - if resources := read_pbsnodes(): - self.config.set("machine:resources", resources, scope="defaults") + if resources := read_pbsnodes(): + scope = ConfigScope("pbs", None, {"machine": {"resources": resources}}) + self.config.push_scope(scope) else: logger.warning("Unable to determine system configuration from pbsnodes, using default") @@ -161,43 +164,3 @@ def submit( ) assert script is not None return PBSProcess(script) - - -def read_pbsnodes() -> list[dict[str, Any]] | None: - if pbsnodes := shutil.which("pbsnodes"): - args = [pbsnodes, "-a", "-F", "json"] - allocated_nodes: list[str] | None = None - if var := os.getenv("PBS_NODEFILE"): - with open(var) as fh: - allocated_nodes = [line.strip() for line in fh if line.split()] - try: - proc = subprocess.run(args, check=True, encoding="utf-8", capture_output=True) - except subprocess.CalledProcessError: - return None - else: - resources: list[dict[str, Any]] = [] - data = json.loads(proc.stdout) - for nodename, nodeinfo in data["nodes"].items(): - if allocated_nodes is not None and nodename not in allocated_nodes: - continue - cpus_on_node = nodeinfo["pcpus"] - resource: dict[str, Any] = { - "type": "node", - "count": 1, - "additional_properties": {"name": nodename}, - "resources": [ - { - "type": "socket", - "count": 1, - "resources": [ - { - "type": "cpu", - "count": cpus_on_node, - }, - ], - }, - ], - } - resources.append(resource) - return resources - return None diff --git a/src/hpcc_slurm/__init__.py b/src/hpcc_slurm/__init__.py index e280c37..39c67b9 100644 --- a/src/hpcc_slurm/__init__.py +++ b/src/hpcc_slurm/__init__.py @@ -1,7 +1,13 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + from typing import TYPE_CHECKING +from typing import Any from hpc_connect.hookspec import hookimpl +from .discover import read_sinfo from .launch import SrunLauncher from .submit import SlurmSubmissionManager @@ -23,3 +29,10 @@ def hpc_connect_launcher(config: "Config") -> "HPCLauncher | None": if SrunLauncher.matches(config.get("launch:exec")): return SrunLauncher(config=config) return None + + +@hookimpl +def hpc_connect_discover_resources() -> list[dict[str, Any]] | None: + if info := read_sinfo(): + return [info] + return None diff --git a/src/hpcc_slurm/discover.py b/src/hpcc_slurm/discover.py new file mode 100644 index 0000000..f881421 --- /dev/null +++ b/src/hpcc_slurm/discover.py @@ -0,0 +1,101 @@ +# Copyright NTESS. See COPYRIGHT file for details. +# +# SPDX-License-Identifier: MIT + +import json +import logging +import os +import shutil +import subprocess +from typing import Any + +logger = logging.getLogger(__name__) + + +def read_sinfo() -> dict[str, Any] | None: + if sinfo := shutil.which("sinfo"): + opts = [ + "%X", # Number of sockets per node + "%Y", # Number of cores per socket + "%Z", # Number of threads per core + "%c", # Number of CPUs per node + "%D", # Number of nodes + "%G", # General resources + ] + format = " ".join(opts) + args = [sinfo, "-o", format] + try: + proc = subprocess.run(args, check=True, encoding="utf-8", capture_output=True) + except subprocess.CalledProcessError: + return None + else: + sockets_per_node: int + cores_per_socket: int + threads_per_core: int + cpus_per_node: int + node_count: int + for line in proc.stdout.split("\n"): + parts = line.split() + if not parts: + continue + elif parts and parts[0].startswith("SOCKETS"): + continue + data = [safe_loads(part) for part in parts] + sockets_per_node = data[0] + cores_per_socket = data[1] + threads_per_core = data[2] + cpus_per_node = data[3] + node_count = data[4] + gres = data[5:] + break + else: + raise ValueError(f"Unable to read sinfo output:\n{proc.stdout}") + if var := os.getenv("SLURM_NNODES"): + node_count = int(var) + info: dict[str, Any] = { + "type": "node", + "count": node_count, + "resources": [ + { + "type": "socket", + "count": sockets_per_node, + "resources": [ + { + "type": "cpu", + "count": int(cpus_per_node / sockets_per_node), + }, + ], + } + ], + "additional_properties": { + "sockets_per_node": sockets_per_node, + "cores_per_socket": cores_per_socket, + "threads_per_core": threads_per_core, + "cpus_per_node": cpus_per_node, + "gres": " ".join(str(_) for _ in gres), + }, + } + for res in gres: + if not res: + continue + parts = res.split(":") + resource: dict[str, Any] = { + "type": parts[0], + "count": safe_loads(parts[-1]), + } + if len(parts) > 2: + resource["gres"] = ":".join(parts[1:-1]) + info["resources"].append(resource) + return info + return None + + +def safe_loads(arg: str) -> Any: + if arg == "(null)": + return None + if arg.endswith("+"): + return safe_loads(arg[:-1]) + try: + return json.loads(arg) + except json.JSONDecodeError: + return arg diff --git a/src/hpcc_slurm/submit.py b/src/hpcc_slurm/submit.py index f069c46..b68d1a8 100644 --- a/src/hpcc_slurm/submit.py +++ b/src/hpcc_slurm/submit.py @@ -15,10 +15,13 @@ from typing import Any from hpc_connect.config import Config +from hpc_connect.config import ConfigScope from hpc_connect.submit import HPCProcess from hpc_connect.submit import HPCSubmissionFailedError from hpc_connect.submit import HPCSubmissionManager +from .discover import read_sinfo + logger = logging.getLogger(__name__) @@ -153,9 +156,9 @@ def __init__(self, config: Config | None = None) -> None: sacct = shutil.which("sacct") if sacct is None: raise ValueError("sacct not found on PATH") - if self.config.get("machine:resources") is None: - if sinfo := read_sinfo(): - self.config.set("machine:resources", [sinfo], scope="defaults") + if sinfo := read_sinfo(): + scope = ConfigScope("slurm", None, {"machine": {"resources": [sinfo]}}) + self.config.push_scope(scope) else: logger.warning("Unable to determine system configuration from sinfo, using default") @@ -202,92 +205,3 @@ def submit( ) assert script is not None return SlurmProcess(script) - - -def read_sinfo() -> dict[str, Any] | None: - if sinfo := shutil.which("sinfo"): - opts = [ - "%X", # Number of sockets per node - "%Y", # Number of cores per socket - "%Z", # Number of threads per core - "%c", # Number of CPUs per node - "%D", # Number of nodes - "%G", # General resources - ] - format = " ".join(opts) - args = [sinfo, "-o", format] - try: - proc = subprocess.run(args, check=True, encoding="utf-8", capture_output=True) - except subprocess.CalledProcessError: - return None - else: - sockets_per_node: int - cores_per_socket: int - threads_per_core: int - cpus_per_node: int - node_count: int - for line in proc.stdout.split("\n"): - parts = line.split() - if not parts: - continue - elif parts and parts[0].startswith("SOCKETS"): - continue - data = [safe_loads(part) for part in parts] - sockets_per_node = data[0] - cores_per_socket = data[1] - threads_per_core = data[2] - cpus_per_node = data[3] - node_count = data[4] - gres = data[5:] - break - else: - raise ValueError(f"Unable to read sinfo output:\n{proc.stdout}") - if var := os.getenv("SLURM_NNODES"): - node_count = int(var) - info: dict[str, Any] = { - "type": "node", - "count": node_count, - "resources": [ - { - "type": "socket", - "count": sockets_per_node, - "resources": [ - { - "type": "cpu", - "count": int(cpus_per_node / sockets_per_node), - }, - ], - } - ], - "additional_properties": { - "sockets_per_node": sockets_per_node, - "cores_per_socket": cores_per_socket, - "threads_per_core": threads_per_core, - "cpus_per_node": cpus_per_node, - "gres": " ".join(str(_) for _ in gres), - }, - } - for res in gres: - if not res: - continue - parts = res.split(":") - resource: dict[str, Any] = { - "type": parts[0], - "count": safe_loads(parts[-1]), - } - if len(parts) > 2: - resource["gres"] = ":".join(parts[1:-1]) - info["resources"].append(resource) - return info - return None - - -def safe_loads(arg: str) -> Any: - if arg == "(null)": - return None - if arg.endswith("+"): - return safe_loads(arg[:-1]) - try: - return json.loads(arg) - except json.JSONDecodeError: - return arg From 95a05d4113ed2605d11b2a5e95a9e951a79016eb Mon Sep 17 00:00:00 2001 From: Tim Fuller Date: Fri, 3 Oct 2025 08:13:23 -0600 Subject: [PATCH 2/4] update schema for environment variables --- src/hpc_connect/config.py | 48 ++++++++---------- src/hpc_connect/hookspec.py | 5 +- src/hpc_connect/pluginmanager.py | 46 ++++++++++++++++-- src/hpc_connect/schemas.py | 83 ++++++++++++++++++++------------ src/hpcc_pbs/discover.py | 7 ++- tests/launch.py | 12 ++--- 6 files changed, 128 insertions(+), 73 deletions(-) diff --git a/src/hpc_connect/config.py b/src/hpc_connect/config.py index 9770a4b..cd9af1e 100644 --- a/src/hpc_connect/config.py +++ b/src/hpc_connect/config.py @@ -11,12 +11,12 @@ from typing import IO from typing import Any -import pluggy import schema import yaml from .pluginmanager import HPCConnectPluginManager from .schemas import config_schema +from .schemas import environment_variable_schema from .schemas import launch_schema from .schemas import machine_schema from .schemas import submit_schema @@ -61,6 +61,9 @@ def __contains__(self, section: str) -> bool: def get_section(self, section: str) -> Any: return self.data.get(section) + def pop_section(self, section: str) -> Any: + return self.data.pop(section, None) + def dump(self) -> None: if self.file is None: return @@ -70,11 +73,12 @@ def dump(self) -> None: class Config: def __init__(self) -> None: - self.pluginmanager: pluggy.PluginManager = HPCConnectPluginManager() + self.pluginmanager: HPCConnectPluginManager = HPCConnectPluginManager() rspec = self.pluginmanager.hook.hpc_connect_discover_resources() defaults = { "config": { "debug": False, + "plugins": [], }, "machine": { "resources": rspec, @@ -108,6 +112,10 @@ def read_only_scope(self, scope: str) -> bool: def push_scope(self, scope: ConfigScope) -> None: self.scopes[scope.name] = scope + if cfg := scope.get_section("config"): + if plugins := cfg.get("plugins"): + for f in plugins: + self.pluginmanager.consider_plugin(f) def pop_scope(self, scope: ConfigScope) -> ConfigScope | None: return self.scopes.pop(scope.name, None) @@ -143,6 +151,14 @@ def get(self, path: str, default: Any = None, scope: str | None = None) -> Any: value = value[key] return value + def get_highest_priority(self, path: str, default: Any = None) -> tuple[Any, str]: + sentinel = object() + for scope in reversed(self.scopes.keys()): + value = self.get(path, default=sentinel, scope=scope) + if value is not sentinel: + return value, scope + return default, "none" + def set(self, path: str, value: Any, scope: str | None = None) -> None: parts = process_config_path(path) section = parts.pop(0) @@ -429,32 +445,10 @@ def get_scope_filename(scope: str) -> str | None: def read_env_config() -> ConfigScope | None: - def load_mappings(arg: str) -> dict[str, str]: - mappings: dict[str, str] = {} - for kv in arg.split(","): - k, v = [_.strip() for _ in kv.split(":") if _.split()] - mappings[k] = v - return mappings - - data: dict[str, Any] = {} - for var in os.environ: - if not var.startswith("HPCC_"): - continue - try: - section, *parts = var[5:].lower().split("_") - key = "_".join(parts) - except ValueError: - continue - if section not in section_schemas: - continue - value: Any - if key == "mappings": - value = load_mappings(os.environ[var]) - else: - value = safe_loads(os.environ[var]) - data.setdefault(section, {}).update({key: value}) - if not data: + variables = {key: var for key, var in os.environ.items() if key.startswith("HPC_CONNECT_")} + if not variables: return None + data = environment_variable_schema.validate(variables) return ConfigScope("environment", None, data) diff --git a/src/hpc_connect/hookspec.py b/src/hpc_connect/hookspec.py index 22bfc23..49bb506 100644 --- a/src/hpc_connect/hookspec.py +++ b/src/hpc_connect/hookspec.py @@ -11,9 +11,10 @@ from .launch import HPCLauncher from .submit import HPCSubmissionManager +project_name = "hpc_connect" -hookspec = pluggy.HookspecMarker("hpc_connect") -hookimpl = pluggy.HookimplMarker("hpc_connect") +hookspec = pluggy.HookspecMarker(project_name) +hookimpl = pluggy.HookimplMarker(project_name) @hookspec(firstresult=True) diff --git a/src/hpc_connect/pluginmanager.py b/src/hpc_connect/pluginmanager.py index 575bc2e..08421ad 100644 --- a/src/hpc_connect/pluginmanager.py +++ b/src/hpc_connect/pluginmanager.py @@ -1,15 +1,51 @@ # Copyright NTESS. See COPYRIGHT file for details. # # SPDX-License-Identifier: MIT +import sys +import warnings + import pluggy +from . import discover +from . import hookspec + +warnings.simplefilter("once", DeprecationWarning) + class HPCConnectPluginManager(pluggy.PluginManager): def __init__(self): - from . import discover - from . import hookspec - - super().__init__("hpc_connect") + super().__init__(hookspec.project_name) self.add_hookspecs(hookspec) self.register(discover) - self.load_setuptools_entrypoints("hpc_connect") + self.load_setuptools_entrypoints(hookspec.project_name) + + def consider_plugin(self, name: str) -> None: + assert isinstance(name, str), f"module name as text required, got {name!r}" + if name.startswith("no:"): + self.unregister(name=name[3:]) + self.set_blocked(name[3:]) + else: + self.import_plugin(name) + + def import_plugin(self, name: str) -> None: + """Import a plugin with ``name``.""" + assert isinstance(name, str), f"module name as text required, got {name!r}" + + if self.is_blocked(name) or self.get_plugin(name) is not None: + return + + try: + __import__(name) + except ImportError as e: + msg = f"Error importing plugin {name!r}: {e.args[0]}" + raise ImportError(msg).with_traceback(e.__traceback__) from e + else: + mod = sys.modules[name] + if mod in self._name2plugin.values(): + other = next(k for k, v in self._name2plugin.items() if v == mod) + msg = f"Plugin {name} already registered under the name {other}" + raise PluginAlreadyImportedError(msg) + self.register(mod, name) + + +class PluginAlreadyImportedError(Exception): ... diff --git a/src/hpc_connect/schemas.py b/src/hpc_connect/schemas.py index e88acac..3ba4673 100644 --- a/src/hpc_connect/schemas.py +++ b/src/hpc_connect/schemas.py @@ -5,7 +5,7 @@ import logging import shlex import shutil -from typing import Any +import typing from schema import Optional from schema import Or @@ -23,11 +23,15 @@ def flag_splitter(arg: list[str] | str) -> list[str]: return arg -def dict_str_str(arg: Any) -> bool: +def dict_str_str(arg: typing.Any) -> bool: f = isinstance return f(arg, dict) and all([f(_, str) for k, v in arg.items() for _ in (k, v)]) +def list_of_str(arg: typing.Any) -> bool: + return isinstance(arg, list) and all([isinstance(_, str) for _ in arg]) + + class choose_from: def __init__(self, *choices: str | None): self.choices = set(choices) @@ -45,6 +49,20 @@ def which(arg: str) -> str: return arg +def boolean(arg: typing.Any) -> bool: + if isinstance(arg, str): + return arg.lower() not in ("0", "off", "false", "no") + return bool(arg) + + +def load_mappings(arg: str) -> dict[str, str]: + mappings: dict[str, str] = {} + for kv in arg.split(","): + k, v = [_.strip() for _ in kv.split(":") if _.split()] + mappings[k] = v + return mappings + + # Resource spec have the following form: # machine: # resources: @@ -79,9 +97,8 @@ def which(arg: str) -> str: }, ], } - - -config_schema = Schema({Optional("debug"): bool}) +resource_schema = Schema({"resources": [resource_spec]}) +config_schema = Schema({Optional("debug"): bool, Optional("plugins"): list_of_str}) launch_spec = { Optional("numproc_flag"): str, Optional("default_options"): Use(flag_splitter), @@ -109,33 +126,37 @@ def which(arg: str) -> str: }, ) -hpc_connect_schema = Schema( + +class EnvarSchema(Schema): + def validate(self, data, is_root_eval=True): + data = super().validate(data, is_root_eval=False) + if is_root_eval: + final = {} + for key, value in data.items(): + name = key[12:].lower() + if name.startswith(("launch_", "submit_")): + section, _, field = name.partition("_") + final.setdefault(section, {})[field] = value + else: + final.setdefault("config", {})[name] = value + return final + return data + + +environment_variable_schema = EnvarSchema( { - "hpc_connect": { - Optional("config"): { - Optional("debug"): bool, - }, - Optional("submit"): { - Optional("backend"): Use( - choose_from(None, "shell", "slurm", "sbatch", "pbs", "qsub", "flux") - ), - Optional("default_options"): Use(flag_splitter), - Optional(str): { - Optional("default_options"): Use(flag_splitter), - }, - }, - Optional("machine"): { - Optional("resources"): Or([resource_spec], None), - }, - Optional("launch"): { - Optional("exec"): Use(which), - **launch_spec, - Optional(str): launch_spec, - }, - } + Optional("HPC_CONNECT_DEBUG"): Use(boolean), + Optional("HPC_CONNECT_PLUGINS"): Use( + lambda x: [_.strip() for _ in x.split(",") if _.split()] + ), + Optional("HPC_CONNECT_LAUNCH_EXEC"): Use(which), + Optional("HPC_CONNECT_LAUNCH_NUMPROC_FLAG"): Use(str), + Optional("HPC_CONNECT_LAUNCH_DEFAULT_OPTIONS"): Use(flag_splitter), + Optional("HPC_CONNECT_LAUNCH_LOCAL_OPTIONS"): Use(flag_splitter), + Optional("HPC_CONNECT_LAUNCH_PRE_OPTIONS"): Use(flag_splitter), + Optional("HPC_CONNECT_LAUNCH_MAPPINGS"): Use(load_mappings), + Optional("HPC_CONNECT_SUBMIT_BACKEND"): Use(str), + Optional("HPC_CONNECT_SUBMIT_DEFAULT_OPTIONS"): Use(flag_splitter), }, ignore_extra_keys=True, - description="HPC connect configuration schema", ) - -resource_schema = Schema({"resources": [resource_spec]}) diff --git a/src/hpcc_pbs/discover.py b/src/hpcc_pbs/discover.py index 63d1783..8de4389 100644 --- a/src/hpcc_pbs/discover.py +++ b/src/hpcc_pbs/discover.py @@ -26,14 +26,17 @@ def read_pbsnodes() -> list[dict[str, Any]] | None: else: resources: list[dict[str, Any]] = [] data = json.loads(proc.stdout) + config: dict[int, list[str]] = {} for nodename, nodeinfo in data["nodes"].items(): if allocated_nodes is not None and nodename not in allocated_nodes: continue cpus_on_node = nodeinfo["pcpus"] + config.setdefault(cpus_on_node, []).append(nodename) + for cpus_on_node, nodenames in config.items(): resource: dict[str, Any] = { "type": "node", - "count": 1, - "additional_properties": {"name": nodename}, + "count": len(nodenames), + "additional_properties": {"nodes": nodenames}, "resources": [ { "type": "socket", diff --git a/tests/launch.py b/tests/launch.py index 45880f9..256a0a9 100644 --- a/tests/launch.py +++ b/tests/launch.py @@ -22,7 +22,7 @@ def envmods(**kwargs): try: save_env = os.environ.copy() for key in os.environ: - if key.startswith("HPCC_"): + if key.startswith("HPC_CONNECT_"): os.environ.pop(key) os.environ.update(kwargs) yield @@ -40,16 +40,16 @@ def launch(args, **kwargs): def test_envar_config(capfd): - env = {"HPCC_LAUNCH_EXEC": "srun", "HPCC_LAUNCH_NUMPROC_FLAG": "-np"} + env = {"HPC_CONNECT_LAUNCH_EXEC": "srun", "HPC_CONNECT_LAUNCH_NUMPROC_FLAG": "-np"} with envmods(**env): launch(["-n", "4", "-flag", "file", "executable", "--option"]) captured = capfd.readouterr() out = captured.out.strip() assert out == f"{mock_bin}/srun -np 4 -flag file executable --option" env = { - "HPCC_LAUNCH_EXEC": "mpiexec", - "HPCC_LAUNCH_NUMPROC_FLAG": "-np", - "HPCC_LAUNCH_LOCAL_OPTIONS": "--map-by ppr:%(np)d:cores", + "HPC_CONNECT_LAUNCH_EXEC": "mpiexec", + "HPC_CONNECT_LAUNCH_NUMPROC_FLAG": "-np", + "HPC_CONNECT_LAUNCH_LOCAL_OPTIONS": "--map-by ppr:%(np)d:cores", } with envmods(**env): launch(["-n", "4", "-flag", "file", "executable", "--option"]) @@ -116,7 +116,7 @@ def test_default(capfd): def test_envar_mappings(capfd): - with envmods(HPCC_LAUNCH_MAPPINGS="-spam:-ham,-eggs:-bacon"): + with envmods(HPC_CONNECT_LAUNCH_MAPPINGS="-spam:-ham,-eggs:-bacon"): launch(["-n", "4", "-spam", "ham", "-eggs", "bacon", "executable", "--option"]) captured = capfd.readouterr() out = captured.out.strip() From ea2ec66a48b6e1e649e49ec8855193b00bd55c40 Mon Sep 17 00:00:00 2001 From: Tim Fuller Date: Fri, 3 Oct 2025 08:34:06 -0600 Subject: [PATCH 3/4] dont discover the default resource spec so that it can be loaded lazily --- src/hpc_connect/config.py | 15 +++++++-------- src/hpc_connect/discover.py | 3 +-- src/hpc_connect/schemas.py | 2 +- src/hpcc_flux/submit_api.py | 11 ++++++----- src/hpcc_pbs/submit.py | 13 ++++++++----- src/hpcc_slurm/submit.py | 13 +++++++------ 6 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/hpc_connect/config.py b/src/hpc_connect/config.py index cd9af1e..431bb2e 100644 --- a/src/hpc_connect/config.py +++ b/src/hpc_connect/config.py @@ -14,6 +14,7 @@ import schema import yaml +from .discover import default_resource_set from .pluginmanager import HPCConnectPluginManager from .schemas import config_schema from .schemas import environment_variable_schema @@ -137,8 +138,6 @@ def get_config(self, section: str, scope: str | None = None) -> Any: return merged_section[section] def get(self, path: str, default: Any = None, scope: str | None = None) -> Any: - if path == "machine:resources": - return self.resource_specs parts = process_config_path(path) section = parts.pop(0) value = self.get_config(section, scope=scope) @@ -260,12 +259,12 @@ def set_main_options(self, args: argparse.Namespace) -> None: @property def resource_specs(self) -> list[dict]: - scope: ConfigScope - for scope in reversed(self.scopes.values()): - if "machine" in scope: - if resources := scope.data["machine"].get("resources"): - return resources - raise ValueError("No machine resources set") + specs, _ = self.get_highest_priority("machine:resources") + if specs is not None: + return specs + resources = default_resource_set() + self.set("machine:resources", specs, scope="defaults") + return resources def resource_types(self) -> list[str]: """Return the types of resources available""" diff --git a/src/hpc_connect/discover.py b/src/hpc_connect/discover.py index 5632849..97bf818 100644 --- a/src/hpc_connect/discover.py +++ b/src/hpc_connect/discover.py @@ -12,14 +12,13 @@ from .hookspec import hookimpl -@hookimpl(trylast=True, specname="hpc_connect_discover_resources") def default_resource_set() -> list[dict[str, Any]]: local_resource = {"type": "cpu", "count": psutil.cpu_count()} socket_resource = {"type": "socket", "count": 1, "resources": [local_resource]} return [{"type": "node", "count": 1, "resources": [socket_resource]}] -@hookimpl(specname="hpc_connect_discover_resources") +@hookimpl(tryfirst=True, specname="hpc_connect_discover_resources") def read_resources_from_hostfile() -> dict[str, list] | None: if file := os.getenv("HPC_CONNECT_HOSTFILE"): with open(file) as fh: diff --git a/src/hpc_connect/schemas.py b/src/hpc_connect/schemas.py index 3ba4673..c35b926 100644 --- a/src/hpc_connect/schemas.py +++ b/src/hpc_connect/schemas.py @@ -113,7 +113,7 @@ def load_mappings(arg: str) -> dict[str, str]: **launch_spec, } ) -machine_schema = Schema({"resources": [resource_spec]}) +machine_schema = Schema({"resources": Or([resource_spec], None)}) submit_schema = Schema( { Optional("backend"): Use( diff --git a/src/hpcc_flux/submit_api.py b/src/hpcc_flux/submit_api.py index d1b5de4..c7453b4 100644 --- a/src/hpcc_flux/submit_api.py +++ b/src/hpcc_flux/submit_api.py @@ -141,11 +141,12 @@ def __init__(self, config: Config | None = None) -> None: super().__init__(config=config) self.flux: FluxExecutor | None = FluxExecutor() self.fh = Flux() - if info := read_resource_info(): - scope = ConfigScope("flux", None, {"machine": {"resources": [info]}}) - self.config.push_scope(scope) - else: - logger.warning("Unable to determine system configuration from flux, using default") + if self.config.get("machine:resources") is None: + if info := read_resource_info(): + scope = ConfigScope("flux", None, {"machine": {"resources": [info]}}) + self.config.push_scope(scope) + else: + logger.warning("Unable to determine system configuration from flux, using default") @property def supports_subscheduling(self) -> bool: diff --git a/src/hpcc_pbs/submit.py b/src/hpcc_pbs/submit.py index 9cb9276..7391b00 100644 --- a/src/hpcc_pbs/submit.py +++ b/src/hpcc_pbs/submit.py @@ -115,11 +115,14 @@ def __init__(self, config: Config | None = None) -> None: qdel = shutil.which("qdel") if qdel is None: raise ValueError("qdel not found on PATH") - if resources := read_pbsnodes(): - scope = ConfigScope("pbs", None, {"machine": {"resources": resources}}) - self.config.push_scope(scope) - else: - logger.warning("Unable to determine system configuration from pbsnodes, using default") + if self.config.get("machine:resources") is None: + if resources := read_pbsnodes(): + scope = ConfigScope("pbs", None, {"machine": {"resources": resources}}) + self.config.push_scope(scope) + else: + logger.warning( + "Unable to determine system configuration from pbsnodes, using default" + ) @property def submission_template(self) -> str: diff --git a/src/hpcc_slurm/submit.py b/src/hpcc_slurm/submit.py index b68d1a8..2d89a02 100644 --- a/src/hpcc_slurm/submit.py +++ b/src/hpcc_slurm/submit.py @@ -67,7 +67,7 @@ def parse_script_args(script: str) -> argparse.Namespace: args = [] with open(script, "r") as file: for line in file: - if match := re.search("^#SBATCH\s+(.*)$", line): + if match := re.search(r"^#SBATCH\s+(.*)$", line): args.append(match.group(1).strip()) p = argparse.ArgumentParser() p.add_argument("-M", "--cluster", "--clusters", dest="clusters") @@ -156,11 +156,12 @@ def __init__(self, config: Config | None = None) -> None: sacct = shutil.which("sacct") if sacct is None: raise ValueError("sacct not found on PATH") - if sinfo := read_sinfo(): - scope = ConfigScope("slurm", None, {"machine": {"resources": [sinfo]}}) - self.config.push_scope(scope) - else: - logger.warning("Unable to determine system configuration from sinfo, using default") + if self.config.get("machine:resources") is None: + if sinfo := read_sinfo(): + scope = ConfigScope("slurm", None, {"machine": {"resources": [sinfo]}}) + self.config.push_scope(scope) + else: + logger.warning("Unable to determine system configuration from sinfo, using default") @property def submission_template(self) -> str: From bb4c9c0356ceb26f20fb3cc0fafe17777801fdff Mon Sep 17 00:00:00 2001 From: Tim Fuller Date: Fri, 3 Oct 2025 08:59:46 -0600 Subject: [PATCH 4/4] Update discover.py --- src/hpc_connect/discover.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hpc_connect/discover.py b/src/hpc_connect/discover.py index 97bf818..22a8cde 100644 --- a/src/hpc_connect/discover.py +++ b/src/hpc_connect/discover.py @@ -23,8 +23,8 @@ def read_resources_from_hostfile() -> dict[str, list] | None: if file := os.getenv("HPC_CONNECT_HOSTFILE"): with open(file) as fh: data = json.load(fh) - nodename = os.uname().nodename + host: str = os.getenv("HPC_CONNECT_HOSTNAME") or os.uname().nodename for pattern, rspec in data.items(): - if fnmatch.fnmatch(nodename, pattern): + if fnmatch.fnmatch(host, pattern): return rspec return None