diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/fluxtask.py index 89925cea..b7a59331 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux/fluxtask.py @@ -73,10 +73,7 @@ def __init__( }, ) self._process.start() - if init_function is not None: - self._future_queue.put( - {"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) + self._set_init_function(init_function=init_function) cloudpickle_register(ind=3) diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/mpitask.py index 61834f19..fdcc17b7 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi/mpitask.py @@ -72,10 +72,7 @@ def __init__( }, ) self._process.start() - if init_function is not None: - self._future_queue.put( - {"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) + self._set_init_function(init_function=init_function) cloudpickle_register(ind=3) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index ae4b687b..c950eae1 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -58,6 +58,12 @@ def shutdown(self, wait=True, *, cancel_futures=False): def __len__(self): return self._future_queue.qsize() + def _set_init_function(self, init_function): + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + def cancel_items_in_queue(que): """