diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 160402fc..501bf88c 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -2,8 +2,6 @@ channels: - conda-forge dependencies: - python -- coverage -- coveralls =3.3.1 - numpy - openmpi - cloudpickle =2.2.1 diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml new file mode 100644 index 00000000..9b1a096f --- /dev/null +++ b/.github/workflows/unittest-flux.yml @@ -0,0 +1,78 @@ +# This workflow is used to run the unittest of pyiron + +name: Unittests-flux + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + + runs-on: ${{ matrix.operating-system }} + strategy: + matrix: + include: + - operating-system: ubuntu-latest + python-version: '3.11' + label: linux-64-py-3-11-openmpi + prefix: /Users/runner/miniconda3/envs/my-env + environment-file: .ci_support/environment-openmpi.yml + + - operating-system: ubuntu-latest + python-version: '3.11' + label: linux-64-py-3-11-mich + prefix: /usr/share/miniconda3/envs/my-env + environment-file: .ci_support/environment-mpich.yml + + steps: + - uses: actions/checkout@v2 + - uses: conda-incubator/setup-miniconda@v2.2.0 + with: + python-version: ${{ matrix.python-version }} + mamba-version: "*" + channels: conda-forge + miniforge-variant: Mambaforge + channel-priority: strict + auto-update-conda: true + environment-file: ${{ matrix.environment-file }} + - name: Install flux + shell: bash -l {0} + run: mamba install -y flux-core coverage coveralls=3.3.1 + - name: Setup + shell: bash -l {0} + run: pip install --no-deps . + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: > + for f in $(ls tests/test_*.py); do + echo $f; + if [ $f != "tests/test_flux.py" ]; then + coverage run --omit pympipool/_version.py -m unittest $f; + fi + done + env: + OMPI_MCA_plm: 'isolated' + OMPI_MCA_rmaps_base_oversubscribe: 'yes' + OMPI_MCA_btl_vader_single_copy_mechanism: 'none' + - name: Test Flux + shell: bash -l {0} + timeout-minutes: 5 + run: > + flux start + coverage run --omit pympipool/_version.py -m unittest tests/test_flux.py; + env: + OMPI_MCA_plm: 'isolated' + OMPI_MCA_rmaps_base_oversubscribe: 'yes' + OMPI_MCA_btl_vader_single_copy_mechanism: 'none' + - name: Coverage + if: matrix.label == 'linux-64-py-3-11-openmpi' + continue-on-error: True + shell: bash -l {0} + run: | + coverage combine + coveralls + coverage xml diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index 35063cf5..7a99f59d 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -57,16 +57,8 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 5 - run: for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done + run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' OMPI_MCA_btl_vader_single_copy_mechanism: 'none' - - name: Coverage - if: matrix.label == 'linux-64-py-3-10-openmpi' - continue-on-error: True - shell: bash -l {0} - run: | - coverage combine - coveralls - coverage xml diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 6a6ff0aa..fb39ce25 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -7,6 +7,7 @@ interface_receive, ) from pympipool.interfaces.taskbroker import HPCExecutor +from pympipool.interfaces.fluxbroker import PyFluxExecutor from pympipool.interfaces.taskexecutor import Executor from pympipool.legacy.interfaces.executor import PoolExecutor from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py new file mode 100644 index 00000000..06d68ec7 --- /dev/null +++ b/pympipool/interfaces/fluxbroker.py @@ -0,0 +1,227 @@ +import os +import queue +import socket +import sys +from time import sleep + +from pympipool.shared.broker import ( + MetaExecutorFuture, + _get_future_done, + _execute_task_dict, +) +from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.thread import RaisingThread +from pympipool.shared.taskexecutor import ( + cloudpickle_register, + _execute_parallel_tasks_loop, +) +from pympipool.shared.connections import FluxPythonInterface +from pympipool.shared.communication import SocketInterface + + +class SingleTaskExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + + Simple example: + ``` + import numpy as np + from pympipool import Executor + + def calc(i, j, k): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return np.array([i, j, k]), size, rank + + def init_k(): + return {"k": 3} + + with Executor(cores=2, init_function=init_k) as p: + fs = p.submit(calc, 2, j=4) + print(fs.result()) + + >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores, + gpus_per_task=0, + init_function=None, + cwd=None, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=execute_parallel_tasks, + kwargs={ + "future_queue": self._future_queue, + "cores": cores, + "gpus_per_task": gpus_per_task, + "cwd": cwd, + "executor": executor, + }, + ) + self._process.start() + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + cloudpickle_register(ind=3) + + +class PyFluxExecutor(ExecutorBase): + def __init__( + self, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + "future_queue": self._future_queue, + "max_workers": max_workers, + "cores_per_worker": cores_per_worker, + "gpus_per_worker": gpus_per_worker, + "init_function": init_function, + "cwd": cwd, + "sleep_interval": sleep_interval, + "executor": executor, + }, + ) + self._process.start() + + +def execute_parallel_tasks( + future_queue, + cores, + gpus_per_task=0, + cwd=None, + executor=None, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional + """ + command_lst = [ + sys.executable, + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), + ] + interface = interface_bootup( + command_lst=command_lst, + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + executor=executor, + ) + _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + + +def interface_bootup( + command_lst, + cwd=None, + cores=1, + gpus_per_core=0, + executor=None, +): + command_lst += [ + "--host", + socket.gethostname(), + ] + connections = FluxPythonInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=False, + executor=executor, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) + return interface + + +def executor_broker( + future_queue, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, +): + meta_future_lst = _get_executor_list( + max_workers=max_workers, + cores_per_worker=cores_per_worker, + gpus_per_worker=gpus_per_worker, + init_function=init_function, + cwd=cwd, + executor=executor, + ) + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + sleep(sleep_interval) + else: + if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + future_queue.task_done() + else: + future_queue.task_done() + break + + +def _get_executor_list( + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + executor=None, +): + return [ + MetaExecutorFuture( + future=_get_future_done(), + executor=SingleTaskExecutor( + cores=cores_per_worker, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + init_function=init_function, + cwd=cwd, + executor=executor, + ), + ) + for _ in range(max_workers) + ] diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 8ed61982..ac0dd7dc 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -1,4 +1,5 @@ from abc import ABC +import os import subprocess @@ -171,21 +172,28 @@ def bootup(self, command_lst): if self._executor is None: self._executor = flux.job.FluxExecutor() jobspec = flux.job.JobspecV1.from_command( - command=" ".join(command_lst), - num_tasks=1, - cores_per_task=self._cores, + command=command_lst, + num_tasks=self._cores, + cores_per_task=1, gpus_per_task=self._gpus_per_core, num_nodes=None, exclusive=False, ) - jobspec.cwd = self._cwd + jobspec.environment = dict(os.environ) + if self._cwd is not None: + jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) def shutdown(self, wait=True): - self._executor.shutdown(wait=wait) + if self.poll(): + self._future.cancel() + # The flux future objects are not instantly updated, + # still showing running after cancel was called, + # so we wait until the execution is completed. + self._future.result() def poll(self): - return self._executor is not None + return self._future is not None and not self._future.done() def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): diff --git a/tests/test_flux.py b/tests/test_flux.py new file mode 100644 index 00000000..837893e6 --- /dev/null +++ b/tests/test_flux.py @@ -0,0 +1,85 @@ +from concurrent.futures import Future +from queue import Queue + +import numpy as np +import unittest + +from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.interfaces.fluxbroker import SingleTaskExecutor, PyFluxExecutor, execute_parallel_tasks, executor_broker + + +try: + from flux.job import FluxExecutor + skip_flux_test = False +except ImportError: + skip_flux_test = True + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +def get_global(memory=None): + return memory + + +def set_global(): + return {"memory": np.array([5])} + + +@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") +class TestFlux(unittest.TestCase): + def setUp(self): + self.executor = FluxExecutor() + + def test_flux_executor(self): + with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_single_task(self): + with SingleTaskExecutor(cores=2, executor=self.executor) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + + def test_execute_task(self): + f = Future() + q = Queue() + q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + executor=self.executor + ) + self.assertEqual(f.result(), 2) + q.join() + + def test_internal_memory(self): + with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p: + f = p.submit(get_global) + self.assertFalse(f.done()) + self.assertEqual(f.result(), np.array([5])) + self.assertTrue(f.done()) + + def test_executor_broker(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + executor_broker(future_queue=q, max_workers=1, executor=self.executor) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + q.join()