diff --git a/executorlib/interactive/fluxspawner.py b/executorlib/interactive/fluxspawner.py index e8af304c..9cb4ed55 100644 --- a/executorlib/interactive/fluxspawner.py +++ b/executorlib/interactive/fluxspawner.py @@ -30,8 +30,11 @@ class FluxPythonSpawner(BaseSpawner): threads_per_core (int, optional): The number of threads per base. Defaults to 1. gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0. num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None. - exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. + exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to + False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False. + priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 + are restricted to the instance owner. flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. @@ -46,6 +49,7 @@ def __init__( gpus_per_core: int = 0, num_nodes: Optional[int] = None, exclusive: bool = False, + priority: Optional[int] = None, openmpi_oversubscribe: bool = False, flux_executor: Optional[flux.job.FluxExecutor] = None, flux_executor_pmi_mode: Optional[str] = None, @@ -65,6 +69,7 @@ def __init__( self._flux_executor_pmi_mode = flux_executor_pmi_mode self._flux_executor_nesting = flux_executor_nesting self._flux_log_files = flux_log_files + self._priority = priority self._future = None def bootup( @@ -114,7 +119,12 @@ def bootup( elif self._flux_log_files: jobspec.stderr = os.path.abspath("flux.err") jobspec.stdout = os.path.abspath("flux.out") - self._future = self._flux_executor.submit(jobspec) + if self._priority is not None: + self._future = self._flux_executor.submit( + jobspec=jobspec, urgency=self._priority + ) + else: + self._future = self._flux_executor.submit(jobspec=jobspec) def shutdown(self, wait: bool = True): """ diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 95e7bfe9..7cf0ad89 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -50,7 +50,7 @@ def setUp(self): def test_flux_executor_serial(self): with BlockAllocationExecutor( max_workers=2, - executor_kwargs={"flux_executor": self.flux_executor}, + executor_kwargs={"flux_executor": self.flux_executor, "priority": 20}, spawner=FluxPythonSpawner, ) as exe: fs_1 = exe.submit(calc, 1)