From 5c42ade7781dd829cbb4a43efd286a940bb18deb Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 19 Feb 2025 10:18:57 +0100 Subject: [PATCH 1/3] [Feature] Implement flux job scheduling priorities --- executorlib/interactive/fluxspawner.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/executorlib/interactive/fluxspawner.py b/executorlib/interactive/fluxspawner.py index e8af304c..09743738 100644 --- a/executorlib/interactive/fluxspawner.py +++ b/executorlib/interactive/fluxspawner.py @@ -30,8 +30,11 @@ class FluxPythonSpawner(BaseSpawner): threads_per_core (int, optional): The number of threads per base. Defaults to 1. gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0. num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None. - exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. + exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to + False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False. + priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 + are restricted to the instance owner. flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. @@ -46,6 +49,7 @@ def __init__( gpus_per_core: int = 0, num_nodes: Optional[int] = None, exclusive: bool = False, + priority: Optional[int] = None, openmpi_oversubscribe: bool = False, flux_executor: Optional[flux.job.FluxExecutor] = None, flux_executor_pmi_mode: Optional[str] = None, @@ -65,6 +69,7 @@ def __init__( self._flux_executor_pmi_mode = flux_executor_pmi_mode self._flux_executor_nesting = flux_executor_nesting self._flux_log_files = flux_log_files + self._priority = priority self._future = None def bootup( @@ -114,7 +119,10 @@ def bootup( elif self._flux_log_files: jobspec.stderr = os.path.abspath("flux.err") jobspec.stdout = os.path.abspath("flux.out") - self._future = self._flux_executor.submit(jobspec) + if self._priority is not None: + self._future = self._flux_executor.submit(jobspec=jobspec, urgency=self._priority) + else: + self._future = self._flux_executor.submit(jobspec=jobspec) def shutdown(self, wait: bool = True): """ From f3bc549e8ccc56a30e9b82ba23c2c9a285b7ef7e Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Wed, 19 Feb 2025 09:20:10 +0000 Subject: [PATCH 2/3] Format black --- executorlib/interactive/fluxspawner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executorlib/interactive/fluxspawner.py b/executorlib/interactive/fluxspawner.py index 09743738..9cb4ed55 100644 --- a/executorlib/interactive/fluxspawner.py +++ b/executorlib/interactive/fluxspawner.py @@ -120,7 +120,9 @@ def bootup( jobspec.stderr = os.path.abspath("flux.err") jobspec.stdout = os.path.abspath("flux.out") if self._priority is not None: - self._future = self._flux_executor.submit(jobspec=jobspec, urgency=self._priority) + self._future = self._flux_executor.submit( + jobspec=jobspec, urgency=self._priority + ) else: self._future = self._flux_executor.submit(jobspec=jobspec) From c1977362ca6c80ffa96e66ef48346820f3f77dae Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 19 Feb 2025 10:26:14 +0100 Subject: [PATCH 3/3] Add test for priority in flux --- tests/test_flux_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 95e7bfe9..7cf0ad89 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -50,7 +50,7 @@ def setUp(self): def test_flux_executor_serial(self): with BlockAllocationExecutor( max_workers=2, - executor_kwargs={"flux_executor": self.flux_executor}, + executor_kwargs={"flux_executor": self.flux_executor, "priority": 20}, spawner=FluxPythonSpawner, ) as exe: fs_1 = exe.submit(calc, 1)