Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions executorlib/interactive/fluxspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ class FluxPythonSpawner(BaseSpawner):
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to
False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15
are restricted to the instance owner.
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
Expand All @@ -46,6 +49,7 @@ def __init__(
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
priority: Optional[int] = None,
openmpi_oversubscribe: bool = False,
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -65,6 +69,7 @@ def __init__(
self._flux_executor_pmi_mode = flux_executor_pmi_mode
self._flux_executor_nesting = flux_executor_nesting
self._flux_log_files = flux_log_files
self._priority = priority
self._future = None

def bootup(
Expand Down Expand Up @@ -114,7 +119,12 @@ def bootup(
elif self._flux_log_files:
jobspec.stderr = os.path.abspath("flux.err")
jobspec.stdout = os.path.abspath("flux.out")
self._future = self._flux_executor.submit(jobspec)
if self._priority is not None:
self._future = self._flux_executor.submit(
jobspec=jobspec, urgency=self._priority
)
else:
self._future = self._flux_executor.submit(jobspec=jobspec)

def shutdown(self, wait: bool = True):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_flux_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def setUp(self):
def test_flux_executor_serial(self):
with BlockAllocationExecutor(
max_workers=2,
executor_kwargs={"flux_executor": self.flux_executor},
executor_kwargs={"flux_executor": self.flux_executor, "priority": 20},
spawner=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(calc, 1)
Expand Down
Loading