From 7f27436a74c84a172f8a5a63d8d6df9b413b264a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 10:44:44 -0600 Subject: [PATCH 01/28] Clean up CI --- .github/workflows/unittest-mpich.yml | 2 -- .github/workflows/unittest-openmpi.yml | 2 -- 2 files changed, 4 deletions(-) diff --git a/.github/workflows/unittest-mpich.yml b/.github/workflows/unittest-mpich.yml index 5a0e20d0..5268df86 100644 --- a/.github/workflows/unittest-mpich.yml +++ b/.github/workflows/unittest-mpich.yml @@ -14,8 +14,6 @@ jobs: runs-on: ${{ matrix.operating-system }} strategy: matrix: - operating-system: [ubuntu-latest, macos-latest] - python-version: ['3.11'] include: - operating-system: macos-latest python-version: '3.11' diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index d5837386..35063cf5 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -14,8 +14,6 @@ jobs: runs-on: ${{ matrix.operating-system }} strategy: matrix: - operating-system: [ubuntu-latest, macos-latest] - python-version: ['3.11'] include: - operating-system: macos-latest python-version: '3.11' From cb195d91149ef44a5b69fb111ddf0a6d85784f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 10:48:34 -0600 Subject: [PATCH 02/28] Add flux specific Executor --- .github/workflows/unittest-flux.yml | 62 ++++++++ pympipool/__init__.py | 1 + pympipool/interfaces/fluxbroker.py | 222 ++++++++++++++++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 .github/workflows/unittest-flux.yml create mode 100644 pympipool/interfaces/fluxbroker.py diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml new file mode 100644 index 00000000..a1278dc8 --- /dev/null +++ b/.github/workflows/unittest-flux.yml @@ -0,0 +1,62 @@ +# This workflow is used to run the unittest of pyiron + +name: Unittests-openmpi + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + + runs-on: ${{ matrix.operating-system }} + strategy: + matrix: + include: + - operating-system: ubuntu-latest + python-version: '3.11' + label: linux-64-py-3-11-openmpi + prefix: /Users/runner/miniconda3/envs/my-env + environment-file: .ci_support/environment-openmpi.yml + + - operating-system: ubuntu-latest + python-version: '3.11' + label: linux-64-py-3-11-mich + prefix: /usr/share/miniconda3/envs/my-env + environment-file: .ci_support/environment-mpich.yml + + steps: + - uses: actions/checkout@v2 + - uses: conda-incubator/setup-miniconda@v2.2.0 + with: + python-version: ${{ matrix.python-version }} + mamba-version: "*" + channels: conda-forge + miniforge-variant: Mambaforge + channel-priority: strict + auto-update-conda: true + environment-file: ${{ matrix.environment-file }} + - name: Install flux + shell: bash -l {0} + run: mamba install -y flux-core + - name: Setup + shell: bash -l {0} + run: pip install --no-deps . + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done + env: + OMPI_MCA_plm: 'isolated' + OMPI_MCA_rmaps_base_oversubscribe: 'yes' + OMPI_MCA_btl_vader_single_copy_mechanism: 'none' + - name: Coverage + if: matrix.label == 'linux-64-py-3-11-openmpi' + continue-on-error: True + shell: bash -l {0} + run: | + coverage combine + coveralls + coverage xml diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 6a6ff0aa..fb39ce25 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -7,6 +7,7 @@ interface_receive, ) from pympipool.interfaces.taskbroker import HPCExecutor +from pympipool.interfaces.fluxbroker import PyFluxExecutor from pympipool.interfaces.taskexecutor import Executor from pympipool.legacy.interfaces.executor import PoolExecutor from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py new file mode 100644 index 00000000..93f1be21 --- /dev/null +++ b/pympipool/interfaces/fluxbroker.py @@ -0,0 +1,222 @@ +import os +import queue +import socket +from time import sleep + +from pympipool.shared.broker import MetaExecutorFuture, _get_future_done, _execute_task_dict +from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.thread import RaisingThread +from pympipool.shared.taskexecutor import ( + cloudpickle_register, + _execute_parallel_tasks_loop +) +from pympipool.shared.connections import FluxPythonInterface +from pympipool.shared.communication import SocketInterface + + +class SingleTaskExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + + Simple example: + ``` + import numpy as np + from pympipool import Executor + + def calc(i, j, k): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return np.array([i, j, k]), size, rank + + def init_k(): + return {"k": 3} + + with Executor(cores=2, init_function=init_k) as p: + fs = p.submit(calc, 2, j=4) + print(fs.result()) + + >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores, + gpus_per_task=0, + init_function=None, + cwd=None, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=execute_parallel_tasks, + kwargs={ + "future_queue": self._future_queue, + "cores": cores, + "gpus_per_task": gpus_per_task, + "cwd": cwd, + "executor": executor, + }, + ) + self._process.start() + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + cloudpickle_register(ind=3) + + +class PyFluxExecutor(ExecutorBase): + def __init__( + self, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + "future_queue": self._future_queue, + "max_workers": max_workers, + "cores_per_worker": cores_per_worker, + "gpus_per_worker": gpus_per_worker, + "init_function": init_function, + "cwd": cwd, + "sleep_interval": sleep_interval, + "executor": executor, + }, + ) + self._process.start() + + +def execute_parallel_tasks( + future_queue, + cores, + gpus_per_task=0, + cwd=None, + executor=None, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional + """ + command_lst = [ + "python", + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), + ] + interface = interface_bootup( + command_lst=command_lst, + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + executor=executor, + ) + _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + + +def interface_bootup( + command_lst, + cwd=None, + cores=1, + gpus_per_core=0, + executor=None, +): + command_lst += [ + "--host", + socket.gethostname(), + ] + connections = FluxPythonInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=False, + executor=executor, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) + return interface + + +def executor_broker( + future_queue, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, +): + meta_future_lst = _get_executor_list( + max_workers=max_workers, + cores_per_worker=cores_per_worker, + gpus_per_worker=gpus_per_worker, + init_function=init_function, + cwd=cwd, + executor=executor, + ) + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + sleep(sleep_interval) + else: + if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + future_queue.task_done() + else: + future_queue.task_done() + break + + +def _get_executor_list( + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + executor=None, +): + return [ + MetaExecutorFuture( + future=_get_future_done(), + executor=SingleTaskExecutor( + cores=cores_per_worker, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + init_function=init_function, + cwd=cwd, + executor=executor, + ), + ) + for _ in range(max_workers) + ] \ No newline at end of file From 06c9a1aa662888a3de8fe6fc9afcec5a433c2a6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:19:01 -0600 Subject: [PATCH 03/28] push coverage only on flux --- .github/workflows/unittest-openmpi.yml | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index 35063cf5..7a99f59d 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -57,16 +57,8 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 5 - run: for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done + run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - - name: Coverage - if: matrix.label == 'linux-64-py-3-10-openmpi' - continue-on-error: True - shell: bash -l {0} - run: | - coverage combine - coveralls - coverage xml From b3ed7f4368e0ffa2901d2b3944ad0d0deb86ed72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:22:10 -0600 Subject: [PATCH 04/28] install coverage only for flux --- .ci_support/environment-openmpi.yml | 2 -- .github/workflows/unittest-flux.yml | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 160402fc..501bf88c 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -2,8 +2,6 @@ channels: - conda-forge dependencies: - python -- coverage -- coveralls =3.3.1 - numpy - openmpi - cloudpickle =2.2.1 diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index a1278dc8..4be690d4 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -40,7 +40,7 @@ jobs: environment-file: ${{ matrix.environment-file }} - name: Install flux shell: bash -l {0} - run: mamba install -y flux-core + run: mamba install -y flux-core coverage coveralls =3.3.1 - name: Setup shell: bash -l {0} run: pip install --no-deps . From b8567d5b199e3a7e93eef8d810f2f8e65989434e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:24:43 -0600 Subject: [PATCH 05/28] fix flux CI --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 4be690d4..f304d17d 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -40,7 +40,7 @@ jobs: environment-file: ${{ matrix.environment-file }} - name: Install flux shell: bash -l {0} - run: mamba install -y flux-core coverage coveralls =3.3.1 + run: mamba install -y flux-core coverage coveralls=3.3.1 - name: Setup shell: bash -l {0} run: pip install --no-deps . From 20e6d5fdde887d6918445453ae219647dc008296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:30:21 -0600 Subject: [PATCH 06/28] black formatting --- .github/workflows/unittest-flux.yml | 2 +- pympipool/interfaces/fluxbroker.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index f304d17d..85044370 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -1,6 +1,6 @@ # This workflow is used to run the unittest of pyiron -name: Unittests-openmpi +name: Unittests-flux on: push: diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 93f1be21..051564ed 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -3,12 +3,16 @@ import socket from time import sleep -from pympipool.shared.broker import MetaExecutorFuture, _get_future_done, _execute_task_dict +from pympipool.shared.broker import ( + MetaExecutorFuture, + _get_future_done, + _execute_task_dict, +) from pympipool.interfaces.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import ( cloudpickle_register, - _execute_parallel_tasks_loop + _execute_parallel_tasks_loop, ) from pympipool.shared.connections import FluxPythonInterface from pympipool.shared.communication import SocketInterface @@ -219,4 +223,4 @@ def _get_executor_list( ), ) for _ in range(max_workers) - ] \ No newline at end of file + ] From 7f4f46b9220186bd5e960599eec98bdebfda0a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:38:05 -0600 Subject: [PATCH 07/28] Add the first set of flux specific tests --- .github/workflows/unittest-flux.yml | 2 +- tests/test_flux.py | 62 +++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 tests/test_flux.py diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 85044370..e776a474 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -47,7 +47,7 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 5 - run: for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done + run: flux start for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' diff --git a/tests/test_flux.py b/tests/test_flux.py new file mode 100644 index 00000000..6e409085 --- /dev/null +++ b/tests/test_flux.py @@ -0,0 +1,62 @@ +from concurrent.futures import Future +from queue import Queue + +import numpy as np +import unittest +from flux.job import FluxExecutor +from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.interfaces.fluxbroker import SingleTaskExecutor, PyFluxExecutor, execute_parallel_tasks, executor_broker + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +class TestFlux(unittest.TestCase): + def setUp(self): + self.executor = FluxExecutor() + + def test_flux_executor(self): + with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_single_task(self): + with SingleTaskExecutor(cores=2, executor=self.executor) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + + def test_execute_task(self): + f = Future() + q = Queue() + q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + executor=self.executor + ) + self.assertEqual(f.result(), np.array(4)) + q.join() + + def test_executor_broker(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + executor_broker(future_queue=q, max_workers=1, executor=self.executor) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + q.join() From e86c50c829929822836dca6121306779d95f82d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:44:23 -0600 Subject: [PATCH 08/28] update CI to only use flux for the flux tests --- .github/workflows/unittest-flux.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index e776a474..b6e6e064 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -47,7 +47,15 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 5 - run: flux start for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done + run: > + for f in $(ls tests/test_*.py); do + echo $f; + if [[ $f -eq tests/test_flux.py ]]; then + flux start coverage run --omit pympipool/_version.py -m unittest $f; + else; + coverage run --omit pympipool/_version.py -m unittest $f; + fi + done env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' From 95865e26c6e670b1935efed3d4f28147c673e958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:47:48 -0600 Subject: [PATCH 09/28] skip flux tests when flux is not installed --- tests/test_flux.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 6e409085..7cabec2f 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -3,11 +3,18 @@ import numpy as np import unittest -from flux.job import FluxExecutor + from pympipool.shared.taskexecutor import cloudpickle_register from pympipool.interfaces.fluxbroker import SingleTaskExecutor, PyFluxExecutor, execute_parallel_tasks, executor_broker +try: + from flux.job import FluxExecutor + skip_flux_test = False +except ImportError: + skip_flux_test = True + + def calc(i): return i @@ -19,6 +26,7 @@ def mpi_funct(i): return i, size, rank +@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") class TestFlux(unittest.TestCase): def setUp(self): self.executor = FluxExecutor() From 5e8cff29af0fd886fdccfd887736bae1f9b8016b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:49:00 -0600 Subject: [PATCH 10/28] fix syntax error --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index b6e6e064..14e1d0f3 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -52,7 +52,7 @@ jobs: echo $f; if [[ $f -eq tests/test_flux.py ]]; then flux start coverage run --omit pympipool/_version.py -m unittest $f; - else; + else coverage run --omit pympipool/_version.py -m unittest $f; fi done From 4f6d1ebcf276854c5c3a4ed386eec62aa663cbb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 11:55:13 -0600 Subject: [PATCH 11/28] more fixes --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 14e1d0f3..ffc7ccb5 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -50,7 +50,7 @@ jobs: run: > for f in $(ls tests/test_*.py); do echo $f; - if [[ $f -eq tests/test_flux.py ]]; then + if [ $f == "tests/test_flux.py" ]; then flux start coverage run --omit pympipool/_version.py -m unittest $f; else coverage run --omit pympipool/_version.py -m unittest $f; From d61e60fa04b9d70a430c01803c9dc07a594cb2b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 12:16:25 -0600 Subject: [PATCH 12/28] separate flux test --- .github/workflows/unittest-flux.yml | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index ffc7ccb5..7b54b1ae 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -50,9 +50,7 @@ jobs: run: > for f in $(ls tests/test_*.py); do echo $f; - if [ $f == "tests/test_flux.py" ]; then - flux start coverage run --omit pympipool/_version.py -m unittest $f; - else + if [ $f != "tests/test_flux.py" ]; then coverage run --omit pympipool/_version.py -m unittest $f; fi done @@ -60,6 +58,16 @@ jobs: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: | + flux start + coverage run --omit pympipool/_version.py -m unittest $f; + env: + OMPI_MCA_plm: 'isolated' + OMPI_MCA_rmaps_base_oversubscribe: 'yes' + OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - name: Coverage if: matrix.label == 'linux-64-py-3-11-openmpi' continue-on-error: True From 3d5f9f86c1c1db994dbc2a3e52f521cd1b3dcfbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 12:21:46 -0600 Subject: [PATCH 13/28] Enable tty --- .github/workflows/unittest-flux.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 7b54b1ae..22adb1bb 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -58,8 +58,8 @@ jobs: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - - name: Test - shell: bash -l {0} + - name: Test Flux + shell: 'script -q -e -c "bash {0}"' timeout-minutes: 5 run: | flux start From 2419009666f07769608f218d97304289c7ee3b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 12:27:26 -0600 Subject: [PATCH 14/28] try bash -l --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 22adb1bb..5e0e7166 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -59,7 +59,7 @@ jobs: OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - name: Test Flux - shell: 'script -q -e -c "bash {0}"' + shell: 'script -q -e -c "bash -l {0}"' timeout-minutes: 5 run: | flux start From 0f70684101f4cf2e0a869856ea9269a7267a0cf0 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 30 Jul 2023 15:44:12 -0600 Subject: [PATCH 15/28] Update unittest-flux.yml --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 5e0e7166..45881613 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -61,7 +61,7 @@ jobs: - name: Test Flux shell: 'script -q -e -c "bash -l {0}"' timeout-minutes: 5 - run: | + run: > flux start coverage run --omit pympipool/_version.py -m unittest $f; env: From dcbd6eff65c08c9702ef453340a32420ca446d68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 15:53:30 -0600 Subject: [PATCH 16/28] fix flux --- .github/workflows/unittest-flux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 45881613..d1d492d7 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -63,7 +63,7 @@ jobs: timeout-minutes: 5 run: > flux start - coverage run --omit pympipool/_version.py -m unittest $f; + coverage run --omit pympipool/_version.py -m unittest tests/test_flux.py; env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' From 032d6c2dcc0580f0238f607da721e399e79c603c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 16:16:17 -0600 Subject: [PATCH 17/28] try commands as list of strings --- pympipool/shared/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 8ed61982..b7d63c10 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -171,7 +171,7 @@ def bootup(self, command_lst): if self._executor is None: self._executor = flux.job.FluxExecutor() jobspec = flux.job.JobspecV1.from_command( - command=" ".join(command_lst), + command=command_lst, num_tasks=1, cores_per_task=self._cores, gpus_per_task=self._gpus_per_core, From b6de76ee8c0db9017f9112ca1a7405b1c5cf3402 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 16:24:02 -0600 Subject: [PATCH 18/28] Try debugging --- .github/workflows/unittest-flux.yml | 2 +- pympipool/shared/connections.py | 1 + tests/test_flux.py | 42 ++++++++++++++--------------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index d1d492d7..9b1a096f 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -59,7 +59,7 @@ jobs: OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - name: Test Flux - shell: 'script -q -e -c "bash -l {0}"' + shell: bash -l {0} timeout-minutes: 5 run: > flux start diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index b7d63c10..5be6f235 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -170,6 +170,7 @@ def bootup(self, command_lst): ) if self._executor is None: self._executor = flux.job.FluxExecutor() + print("Debug flux command:", command_lst) jobspec = flux.job.JobspecV1.from_command( command=command_lst, num_tasks=1, diff --git a/tests/test_flux.py b/tests/test_flux.py index 7cabec2f..d792e67f 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -31,19 +31,19 @@ class TestFlux(unittest.TestCase): def setUp(self): self.executor = FluxExecutor() - def test_flux_executor(self): - with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) + # def test_flux_executor(self): + # with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + # fs_1 = exe.submit(calc, 1) + # fs_2 = exe.submit(calc, 2) + # self.assertEqual(fs_1.result(), 1) + # self.assertEqual(fs_2.result(), 2) + # self.assertTrue(fs_1.done()) + # self.assertTrue(fs_2.done()) - def test_single_task(self): - with SingleTaskExecutor(cores=2, executor=self.executor) as p: - output = p.map(mpi_funct, [1, 2, 3]) - self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + # def test_single_task(self): + # with SingleTaskExecutor(cores=2, executor=self.executor) as p: + # output = p.map(mpi_funct, [1, 2, 3]) + # self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) def test_execute_task(self): f = Future() @@ -59,12 +59,12 @@ def test_execute_task(self): self.assertEqual(f.result(), np.array(4)) q.join() - def test_executor_broker(self): - q = Queue() - f = Future() - q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) - q.put({"shutdown": True, "wait": True}) - executor_broker(future_queue=q, max_workers=1, executor=self.executor) - self.assertTrue(f.done()) - self.assertEqual(f.result(), 1) - q.join() + # def test_executor_broker(self): + # q = Queue() + # f = Future() + # q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + # q.put({"shutdown": True, "wait": True}) + # executor_broker(future_queue=q, max_workers=1, executor=self.executor) + # self.assertTrue(f.done()) + # self.assertEqual(f.result(), 1) + # q.join() From 975de1093d1b2fcd9af8b696cb9379c4d23cd1de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 16:54:57 -0600 Subject: [PATCH 19/28] Link python executable as sys.executable rather than python --- pympipool/interfaces/fluxbroker.py | 3 ++- tests/test_flux.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 051564ed..06d68ec7 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -1,6 +1,7 @@ import os import queue import socket +import sys from time import sleep from pympipool.shared.broker import ( @@ -132,7 +133,7 @@ def execute_parallel_tasks( executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional """ command_lst = [ - "python", + sys.executable, os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), ] interface = interface_bootup( diff --git a/tests/test_flux.py b/tests/test_flux.py index d792e67f..056e30af 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -26,10 +26,11 @@ def mpi_funct(i): return i, size, rank -@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") +# @unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") class TestFlux(unittest.TestCase): def setUp(self): - self.executor = FluxExecutor() + # self.executor = FluxExecutor() + self.executor = None # def test_flux_executor(self): # with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: From de1554f93e273fc69cca4859ccea520b7ce9a88c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 16:55:31 -0600 Subject: [PATCH 20/28] skip flux test when flux is not installed --- tests/test_flux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 056e30af..f16180e3 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -26,7 +26,7 @@ def mpi_funct(i): return i, size, rank -# @unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") +@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") class TestFlux(unittest.TestCase): def setUp(self): # self.executor = FluxExecutor() From f257e6e4fbeb243cdb00d539938050688a20924c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:06:51 -0600 Subject: [PATCH 21/28] Enable all tests again --- pympipool/shared/connections.py | 1 - tests/test_flux.py | 45 ++++++++++++++++----------------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 5be6f235..b7d63c10 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -170,7 +170,6 @@ def bootup(self, command_lst): ) if self._executor is None: self._executor = flux.job.FluxExecutor() - print("Debug flux command:", command_lst) jobspec = flux.job.JobspecV1.from_command( command=command_lst, num_tasks=1, diff --git a/tests/test_flux.py b/tests/test_flux.py index f16180e3..7cabec2f 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -29,22 +29,21 @@ def mpi_funct(i): @unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") class TestFlux(unittest.TestCase): def setUp(self): - # self.executor = FluxExecutor() - self.executor = None + self.executor = FluxExecutor() - # def test_flux_executor(self): - # with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: - # fs_1 = exe.submit(calc, 1) - # fs_2 = exe.submit(calc, 2) - # self.assertEqual(fs_1.result(), 1) - # self.assertEqual(fs_2.result(), 2) - # self.assertTrue(fs_1.done()) - # self.assertTrue(fs_2.done()) + def test_flux_executor(self): + with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) - # def test_single_task(self): - # with SingleTaskExecutor(cores=2, executor=self.executor) as p: - # output = p.map(mpi_funct, [1, 2, 3]) - # self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + def test_single_task(self): + with SingleTaskExecutor(cores=2, executor=self.executor) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) def test_execute_task(self): f = Future() @@ -60,12 +59,12 @@ def test_execute_task(self): self.assertEqual(f.result(), np.array(4)) q.join() - # def test_executor_broker(self): - # q = Queue() - # f = Future() - # q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) - # q.put({"shutdown": True, "wait": True}) - # executor_broker(future_queue=q, max_workers=1, executor=self.executor) - # self.assertTrue(f.done()) - # self.assertEqual(f.result(), 1) - # q.join() + def test_executor_broker(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + executor_broker(future_queue=q, max_workers=1, executor=self.executor) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + q.join() From ff6d943bab34a7f753ba9d0bb903f542b4e45279 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:35:31 -0600 Subject: [PATCH 22/28] fix working directory is none --- pympipool/shared/connections.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index b7d63c10..ad34a6ed 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -178,7 +178,8 @@ def bootup(self, command_lst): num_nodes=None, exclusive=False, ) - jobspec.cwd = self._cwd + if self._cwd is not None: + jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) def shutdown(self, wait=True): From 2829030caba08824e31488ca2d7a3ffe0c6f2ac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:39:10 -0600 Subject: [PATCH 23/28] fix environment --- pympipool/shared/connections.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index ad34a6ed..8ff968b1 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -1,4 +1,5 @@ from abc import ABC +import os import subprocess @@ -178,6 +179,7 @@ def bootup(self, command_lst): num_nodes=None, exclusive=False, ) + jobspec.environment = dict(os.environ) if self._cwd is not None: jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) From 77e266ad9b159d3420f33c7d7a2036fc628e635f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:52:19 -0600 Subject: [PATCH 24/28] some fixes --- pympipool/shared/connections.py | 4 ++-- tests/test_flux.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 8ff968b1..befa8fc6 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -173,8 +173,8 @@ def bootup(self, command_lst): self._executor = flux.job.FluxExecutor() jobspec = flux.job.JobspecV1.from_command( command=command_lst, - num_tasks=1, - cores_per_task=self._cores, + num_tasks=self._cores, + cores_per_task=1, gpus_per_task=self._gpus_per_core, num_nodes=None, exclusive=False, diff --git a/tests/test_flux.py b/tests/test_flux.py index 7cabec2f..6a954b69 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -56,7 +56,7 @@ def test_execute_task(self): cores=1, executor=self.executor ) - self.assertEqual(f.result(), np.array(4)) + self.assertEqual(f.result(), 2) q.join() def test_executor_broker(self): From 84c45efbc83f7f3819734b6ec981e69d74425403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:55:53 -0600 Subject: [PATCH 25/28] fix poll function --- pympipool/shared/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index befa8fc6..4e2bc4e6 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -188,7 +188,7 @@ def shutdown(self, wait=True): self._executor.shutdown(wait=wait) def poll(self): - return self._executor is not None + return self._future is not None and not self._future.done() def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): From c431b8802e9c750fa30b37fc8741c2bcfb44b3ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 17:56:48 -0600 Subject: [PATCH 26/28] disable full test --- tests/test_flux.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 6a954b69..4dfaefe5 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -31,14 +31,14 @@ class TestFlux(unittest.TestCase): def setUp(self): self.executor = FluxExecutor() - def test_flux_executor(self): - with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) + # def test_flux_executor(self): + # with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + # fs_1 = exe.submit(calc, 1) + # fs_2 = exe.submit(calc, 2) + # self.assertEqual(fs_1.result(), 1) + # self.assertEqual(fs_2.result(), 2) + # self.assertTrue(fs_1.done()) + # self.assertTrue(fs_2.done()) def test_single_task(self): with SingleTaskExecutor(cores=2, executor=self.executor) as p: From c7610fd8334fd4d26d72dbdda08ff318c2a4874c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 18:23:31 -0600 Subject: [PATCH 27/28] fix last test --- pympipool/shared/connections.py | 7 ++++++- tests/test_flux.py | 16 ++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 4e2bc4e6..ac0dd7dc 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -185,7 +185,12 @@ def bootup(self, command_lst): self._future = self._executor.submit(jobspec) def shutdown(self, wait=True): - self._executor.shutdown(wait=wait) + if self.poll(): + self._future.cancel() + # The flux future objects are not instantly updated, + # still showing running after cancel was called, + # so we wait until the execution is completed. + self._future.result() def poll(self): return self._future is not None and not self._future.done() diff --git a/tests/test_flux.py b/tests/test_flux.py index 4dfaefe5..6a954b69 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -31,14 +31,14 @@ class TestFlux(unittest.TestCase): def setUp(self): self.executor = FluxExecutor() - # def test_flux_executor(self): - # with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: - # fs_1 = exe.submit(calc, 1) - # fs_2 = exe.submit(calc, 2) - # self.assertEqual(fs_1.result(), 1) - # self.assertEqual(fs_2.result(), 2) - # self.assertTrue(fs_1.done()) - # self.assertTrue(fs_2.done()) + def test_flux_executor(self): + with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) def test_single_task(self): with SingleTaskExecutor(cores=2, executor=self.executor) as p: From 6efcc9d6f24db14a86fbe576f3c7eb8dd54468ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 18:36:09 -0600 Subject: [PATCH 28/28] add test for internal memory --- tests/test_flux.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_flux.py b/tests/test_flux.py index 6a954b69..837893e6 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -26,6 +26,14 @@ def mpi_funct(i): return i, size, rank +def get_global(memory=None): + return memory + + +def set_global(): + return {"memory": np.array([5])} + + @unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") class TestFlux(unittest.TestCase): def setUp(self): @@ -59,6 +67,13 @@ def test_execute_task(self): self.assertEqual(f.result(), 2) q.join() + def test_internal_memory(self): + with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p: + f = p.submit(get_global) + self.assertFalse(f.done()) + self.assertEqual(f.result(), np.array([5])) + self.assertTrue(f.done()) + def test_executor_broker(self): q = Queue() f = Future()