diff --git a/.ci_support/environment-docs.yml b/.ci_support/environment-docs.yml index ec953769..b07fcece 100644 --- a/.ci_support/environment-docs.yml +++ b/.ci_support/environment-docs.yml @@ -12,3 +12,4 @@ dependencies: - tqdm =4.66.2 - pyzmq =26.0.2 - flux-core +- jupyter-book =1.0.0 diff --git a/.readthedocs.yml b/.readthedocs.yml index 38c2541e..bee0cd7a 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -11,16 +11,20 @@ build: python: "mambaforge-22.9" jobs: pre_build: + # Generate the Sphinx configuration for this Jupyter Book so it builds. - pip install versioneer[toml]==0.29 - pip install . --no-deps --no-build-isolation + - "cp README.md docs" + - "cp notebooks/*.ipynb docs" + - "jupyter-book config sphinx docs/" # Build documentation in the docs/ directory with Sphinx sphinx: - configuration: docs/source/conf.py + builder: html # Optionally build your docs in additional formats such as PDF and ePub formats: [] # Install pyiron from conda conda: - environment: .ci_support/environment-docs.yml \ No newline at end of file + environment: .ci_support/environment-docs.yml diff --git a/binder/postBuild b/binder/postBuild index b5542e1b..460fa15e 100644 --- a/binder/postBuild +++ b/binder/postBuild @@ -4,3 +4,9 @@ cp binder/kernel.json /home/jovyan/.local/share/jupyter/kernels/flux # install pympipool pip install . --no-deps --no-build-isolation + +# copy notebooks +mv notebooks/*.ipynb . + +# clean up +rm -rf .ci_support .github binder docs notebooks pympipool pympipool.egg-info tests .coveralls.yml .gitignore .readthedocs.yml LICENSE MANIFEST.in README.md pyproject.toml setup.py build \ No newline at end of file diff --git a/docs/Makefile b/docs/Makefile deleted file mode 100644 index d0c3cbf1..00000000 --- a/docs/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -# Minimal makefile for Sphinx documentation -# - -# You can set these variables from the command line, and also -# from the environment for the first two. -SPHINXOPTS ?= -SPHINXBUILD ?= sphinx-build -SOURCEDIR = source -BUILDDIR = build - -# Put it first so that "make" without argument is like "make help". -help: - @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) - -.PHONY: help Makefile - -# Catch-all target: route all unknown targets to Sphinx using the new -# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). -%: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/_config.yml b/docs/_config.yml new file mode 100644 index 00000000..638a7c17 --- /dev/null +++ b/docs/_config.yml @@ -0,0 +1,14 @@ +title: pympipool +author: Jan Janssen +logo: images/pyiron-logo.png + +execute: + execute_notebooks : off + +repository: + url : https://github.com/pyiron/pympipool + path_to_book : "" + +launch_buttons: + notebook_interface : jupyterlab + binderhub_url : https://mybinder.org \ No newline at end of file diff --git a/docs/_static/pyiron_logo.ico b/docs/_static/pyiron_logo.ico deleted file mode 100644 index f5270a0c..00000000 Binary files a/docs/_static/pyiron_logo.ico and /dev/null differ diff --git a/docs/_toc.yml b/docs/_toc.yml new file mode 100644 index 00000000..f7324a0d --- /dev/null +++ b/docs/_toc.yml @@ -0,0 +1,6 @@ +format: jb-book +root: README +chapters: +- file: installation.md +- file: examples.ipynb +- file: development.md \ No newline at end of file diff --git a/docs/source/development.md b/docs/development.md similarity index 100% rename from docs/source/development.md rename to docs/development.md diff --git a/docs/_static/pyiron-logo.png b/docs/images/pyiron-logo.png similarity index 100% rename from docs/_static/pyiron-logo.png rename to docs/images/pyiron-logo.png diff --git a/docs/source/installation.md b/docs/installation.md similarity index 100% rename from docs/source/installation.md rename to docs/installation.md diff --git a/docs/make.bat b/docs/make.bat deleted file mode 100644 index dc1312ab..00000000 --- a/docs/make.bat +++ /dev/null @@ -1,35 +0,0 @@ -@ECHO OFF - -pushd %~dp0 - -REM Command file for Sphinx documentation - -if "%SPHINXBUILD%" == "" ( - set SPHINXBUILD=sphinx-build -) -set SOURCEDIR=source -set BUILDDIR=build - -%SPHINXBUILD% >NUL 2>NUL -if errorlevel 9009 ( - echo. - echo.The 'sphinx-build' command was not found. Make sure you have Sphinx - echo.installed, then set the SPHINXBUILD environment variable to point - echo.to the full path of the 'sphinx-build' executable. Alternatively you - echo.may add the Sphinx directory to PATH. - echo. - echo.If you don't have Sphinx installed, grab it from - echo.https://www.sphinx-doc.org/ - exit /b 1 -) - -if "%1" == "" goto help - -%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% -goto end - -:help -%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% - -:end -popd diff --git a/docs/source/conf.py b/docs/source/conf.py deleted file mode 100644 index 20816f54..00000000 --- a/docs/source/conf.py +++ /dev/null @@ -1,41 +0,0 @@ -# Configuration file for the Sphinx documentation builder. -# -# For the full list of built-in configuration values, see the documentation: -# https://www.sphinx-doc.org/en/master/usage/configuration.html - -# -- Project information ----------------------------------------------------- -# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information - -project = 'pympipool' -copyright = '2023, Jan Janssen' -author = 'Jan Janssen' - -# -- General configuration --------------------------------------------------- -# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration - -extensions = ["myst_parser", 'sphinx.ext.autodoc', 'sphinx.ext.napoleon'] - -templates_path = ['_templates'] -exclude_patterns = [] - - - -# -- Options for HTML output ------------------------------------------------- -# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output - -try: - import sphinx_rtd_theme - html_theme = 'sphinx_rtd_theme' - html_logo = "../_static/pyiron-logo.png" - html_favicon = "../_static/pyiron_logo.ico" -except ImportError: - html_theme = 'alabaster' - -html_static_path = ['_static'] - - -# -- Generate API documentation ---------------------------------------------- -# https://www.sphinx-doc.org/en/master/man/sphinx-apidoc.html - -from sphinx.ext.apidoc import main -main(['-e', '-o', 'apidoc', '../../pympipool/', '--force']) diff --git a/docs/source/examples.md b/docs/source/examples.md deleted file mode 100644 index 4d8461ec..00000000 --- a/docs/source/examples.md +++ /dev/null @@ -1,467 +0,0 @@ -# Examples -The `pympipool.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -to simplify the up-scaling of individual functions in a given workflow. - -## Compatibility -Starting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -standard library this can be written as - `test_thread.py`: -```python -from concurrent.futures import ThreadPoolExecutor - -with ThreadPoolExecutor(max_workers=1) as exe: - future = exe.submit(sum, [1, 1]) - print(future.result()) -``` -In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()` -function is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -object is returned. The `Future` object allows to check the status of the execution with the `done()` method which -returns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is -completed by calling `result()`. - -This example stored in a python file named `test_thread.py` can be executed using the python interpreter: -``` -python test_thread.py ->>> 2 -``` -The result of the calculation is `1+1=2`. - -The `pympipool.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -class by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number -of workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based -parallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per -worker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the -`oversubscribe` parameter. All these parameters are optional, so the `pympipool.Executor` can be used as a drop-in -replacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures). - -The previous example is rewritten for the `pympipool.Executor` in - `test_sum.py`: -```python -import flux.job -from pympipool import Executor - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=1, executor=flux_exe) as exe: - future = exe.submit(sum, [1,1]) - print(future.result()) -``` -Again this example can be executed with the python interpreter: -``` -python test_sum.py ->>> 2 -``` -The result of the calculation is again `1+1=2`. - -Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined -functions. In the `test_serial.py` example a custom summation function is defined: -```python -import flux.job -from pympipool import Executor - -def calc(*args): - return sum(*args) - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, executor=flux_exe) as exe: - fs_1 = exe.submit(calc, [2, 1]) - fs_2 = exe.submit(calc, [2, 2]) - fs_3 = exe.submit(calc, [2, 3]) - fs_4 = exe.submit(calc, [2, 4]) - print([ - fs_1.result(), - fs_2.result(), - fs_3.result(), - fs_4.result(), - ]) -``` -In contrast to the previous example where just a single function was submitted to a single worker, in this case a total -of four functions is submitted to a group of two workers `max_workers=2`. Consequently, the functions are executed as a -set of two pairs. - -The script can be executed with any python interpreter: -``` -python test_serial.py ->>> [3, 4, 5, 6] -``` -It returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -classes. Still one advantage of using the `pympipool.Executor` rather than the built-in ones, is the ability to execute -the same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using -[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than -the regular pickle package. - -For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) -class the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -also implements the `map()` function to map a series of inputs to a function. The same `map()` function is also -available in the `pympipool.Executor` - `test_map.py`: -```python -import flux.job -from pympipool import Executor - -def calc(*args): - return sum(*args) - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, executor=flux_exe) as exe: - print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]]))) -``` -Again the script can be executed with any python interpreter: -``` -python test_map.py ->>> [3, 4, 5, 6] -``` -The results remain the same. - -## Resource Assignment -By default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. -This is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter -run-time `pympipool` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple -python functions with similar resource requirements in the same flux job (or SLURM job step). - -The following example illustrates the resource definition on both level. This is redundant. For block allocations the -resources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level** -or on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the -**Executor level**. - -```python -import flux.job -from pympipool import Executor - - -def calc_function(parameter_a, parameter_b): - return parameter_a + parameter_b - - -with flux.job.FluxExecutor() as flux_exe: - with Executor( - # Resource definition on the executor level - max_cores=2, # total number of cores available to the Executor - # Optional resource definition - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, # not available with flux - cwd="/home/jovyan/notebooks", - executor=flux_exe, - hostname_localhost=False, # only required on MacOS - backend="flux", # optional in case the backend is not recognized - block_allocation=False, - init_function=None, # only available with block_allocation=True - command_line_argument_lst=[], # additional command line arguments for SLURM - disable_dependencies=False, # option to disable input dependency resolution - refresh_rate=0.01, # refresh frequency in seconds for the dependency resolution - ) as exe: - future_obj = exe.submit( - calc_function, - 1, # parameter_a - parameter_b=2, - # Resource definition on the submission level - resource_dict={ - "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, # here it is gpus_per_core rather than gpus_per_worker - "oversubscribe": False, # not available with flux - "cwd": "/home/jovyan/notebooks", - "executor": flux_exe, - "hostname_localhost": False, # only required on MacOS - # "command_line_argument_lst": [], # additional command line arguments for SLURM - }, - ) - print(future_obj.result()) -``` -The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other -resource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io) -or any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` -and `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to -enable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python -process for each submission. In this case the above example can be reduced to: -```python -import flux.job -from pympipool import Executor - - -def calc_function(parameter_a, parameter_b): - return parameter_a + parameter_b - - -with flux.job.FluxExecutor() as flux_exe: - with Executor( - # Resource definition on the executor level - max_cores=2, # total number of cores available to the Executor - block_allocation=True, # reuse python processes - executor=flux_exe, - ) as exe: - future_obj = exe.submit( - calc_function, - 1, # parameter_a - parameter_b=2, - ) - print(future_obj.result()) -``` -The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task -is executed in which folder, but for most python functions it is not required. - -## Data Handling -A limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of -reading the same dataset repetitively, the `pympipool.Executor` in block allocation mode (`block_allocation=True`) -loads the dataset only once per worker and afterward, each function submitted to this worker has access to the dataset, -as it is already loaded in memory. To achieve this the user defines an initialization function `init_function` which -returns a dictionary with one key per dataset. The keys of the dictionary can then be used as additional input -parameters in each function submitted to the `pympipool.Executor`. When block allocation is disabled this functionality -is not available, as each function is executed in a separate process, so no data can be preloaded. - -This functionality is illustrated in the `test_data.py` example: -```python -import flux.job -from pympipool import Executor - -def calc(i, j, k): - return i + j + k - -def init_function(): - return {"j": 4, "k": 3, "l": 2} - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe: - fs = exe.submit(calc, 2, j=5) - print(fs.result()) -``` -The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only -two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the -second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the -`pympipool.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned -dictionary `{"j": 4, "k": 3, "l": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function, -`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()` -function does not define the `l` parameter this one is also ignored. - -Again the script can be executed with any python interpreter: -``` -python test_data.py ->>> 10 -``` -The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` -function. - -## Up-Scaling -[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`. - -### Thread-based Parallelism -The number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the -`pympipool.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library -uses for thread based parallelism, so it might be necessary to set certain environment variables manually: - -* `OMP_NUM_THREADS`: for openmp -* `OPENBLAS_NUM_THREADS`: for openblas -* `MKL_NUM_THREADS`: for mkl -* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X -* `NUMEXPR_NUM_THREADS`: for numexpr - -At the current stage `pympipool.Executor` does not set these parameters itself, so you have to add them in the function -you submit before importing the corresponding library: - -```python -def calc(i): - import os - os.environ["OMP_NUM_THREADS"] = "2" - os.environ["OPENBLAS_NUM_THREADS"] = "2" - os.environ["MKL_NUM_THREADS"] = "2" - os.environ["VECLIB_MAXIMUM_THREADS"] = "2" - os.environ["NUMEXPR_NUM_THREADS"] = "2" - import numpy as np - return i -``` - -Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to -the number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable -default. Just be careful if the number of threads is not specified it is possible that all workers try to access all -cores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization -with increasing number of workers. - -Specific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require -a higher number of threads per core for optimal performance. - -### MPI Parallel Python Functions -Beyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in -scientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used -to parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI -communication standards to coordinate the way how information is distributed. Just like the `pympipool.Executor` the -[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) -implements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -interface. Still in this case eah python function submitted to the executor is still limited to serial execution. The -novel approach of the `pympipool.Executor` is mixing these two types of parallelism. Individual functions can use -the [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this -function while these functions can still me submitted to the `pympipool.Executor` just like any other function. The -advantage of this approach is that the users can parallelize their workflows one function at the time. - -The example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: -```python -import flux.job -from pympipool import Executor - -def calc(i): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe: - fs = exe.submit(calc, 3) - print(fs.result()) -``` -The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the -allocation and the rank of the current process within the MPI allocation. This function is then submitted to an -`pympipool.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function -call is going to have access to two cores. - -Just like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io) -library in the background it is not necessary to execute the script with `mpiexec` or `mpirun`: -``` -python test_mpi.py ->>> [(3, 2, 0), (3, 2, 1)] -``` -The response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple -being the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2` -and finally the index of the specific process `0` or `1`. - -### GPU Assignment -With the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular. -Consequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the -`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: -```python -import socket -import flux.job -from pympipool import Executor -from tensorflow.python.client import device_lib - -def get_available_gpus(): - local_device_protos = device_lib.list_local_devices() - return [ - (x.name, x.physical_device_desc, socket.gethostname()) - for x in local_device_protos if x.device_type == 'GPU' - ] - -with flux.job.FluxExecutor() as flux_exe: - with Executor( - max_workers=2, - gpus_per_worker=1, - executor=flux_exe, - ) as exe: - fs_1 = exe.submit(get_available_gpus) - fs_2 = exe.submit(get_available_gpus) - print(fs_1.result(), fs_2.result()) -``` -The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality -requires `pympipool` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com) -or preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, -as two functions are submitted and the results are printed. - -To clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com) -the submission script is given below: -```shell -#!/bin/bash -#SBATCH --nodes=2 -#SBATCH --gpus-per-node=1 -#SBATCH --get-user-env=L - -python test_gpu.py -``` -The important part is that for using the `pympipool.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not -need to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `pympipool` -internally calls `srun` to assign the individual resources to a given worker. - -For the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler -within the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the -[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved -by calling `srun flux start` in the submission script: -```shell -#!/bin/bash -#SBATCH --nodes=2 -#SBATCH --gpus-per-node=1 -#SBATCH --get-user-env=L - -srun flux start python test_gpu.py -``` -As a result the GPUs available on the two compute nodes are reported: -``` ->>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'), ->>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')] -``` -In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`. - -## Coupled Functions -For submitting two functions with rather different computing resource requirements it is essential to represent this -dependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of -individual python functions and including the `concurrent.futures.Future` object of the first submitted function as -input for the second function during the submission. Consequently, this functionality can be used for directed acyclic -graphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one -and two: -```python -import flux.job -from pympipool import Executor - -def calc_function(parameter_a, parameter_b): - return parameter_a + parameter_b - -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, executor=flux_exe) as exe: - future_1 = exe.submit( - calc_function, - 1, - parameter_b=2, - resource_dict={"cores": 1}, - ) - future_2 = exe.submit( - calc_function, - 1, - parameter_b=future_1, - resource_dict={"cores": 1}, - ) - print(future_2.result()) -``` -Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still -before the computation of this addition is completed already the next addition is submitted which uses the future object -as an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. - -To disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the -current stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this -functionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the -queue of submitted functions is queried. Typically, there is no need to change these default parameters. - -## SLURM Job Scheduler -Using `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in -`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun` -call represents a request to the central database of SLURM this can drastically reduce the performance, especially for -large numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org) -is recommended as secondary job scheduler even within the context of the SLURM job manager. - -Still the general usage of `pympipool` remains similar even with SLURM as backend: -```python -from pympipool import Executor - -with Executor(max_cores=1, backend="slurm") as exe: - future = exe.submit(sum, [1,1]) - print(future.result()) -``` -The `backend="slurm"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) -or SLURM are available. - -In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide -a list of command line arguments for the `srun` command. - -## Workstation Support -While the high performance computing (HPC) setup is limited to the Linux operating system, `pympipool` can also be used -in combination with MacOS and Windows. These setups are limited to a single compute node. - -Still the general usage of `pympipool` remains similar: -```python -from pympipool import Executor - -with Executor(max_cores=1, backend="mpi") as exe: - future = exe.submit(sum, [1,1], resource_dict={"cores": 1}) - print(future.result()) -``` -The `backend="mpi"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) -or SLURM are available. - -Workstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the -look up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather -than using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is -introduced. diff --git a/docs/source/index.rst b/docs/source/index.rst deleted file mode 100644 index 6ed15bee..00000000 --- a/docs/source/index.rst +++ /dev/null @@ -1,124 +0,0 @@ -==================================================================== -pympipool - up-scale python functions for high performance computing -==================================================================== - -:Author: Jan Janssen -:Contact: janssen@lanl.gov - -Challenges ----------- -In high performance computing (HPC) the Python programming language is commonly used as high-level language to -orchestrate the coupling of scientific applications. Still the efficient usage of highly parallel HPC clusters remains -challenging, in primarily three aspects: - -* **Communication**: Distributing python function calls over hundreds of compute node and gathering the results on a shared file system is technically possible, but highly inefficient. A socket-based communication approach is preferable. -* **Resource Management**: Assigning Python functions to GPUs or executing Python functions on multiple CPUs using the message passing interface (MPI) requires major modifications to the python workflow. -* **Integration**: Existing workflow libraries implement a secondary job management infrastructure on the Python level rather than leveraging the existing infrastructure provided by the job scheduler of the HPC. - -pympipool is ... -^^^^^^^^^^^^^^^^ -In a given HPC allocation the :code:`pympipool` library addresses these challenges by extending the Executor interface -of the standard Python library to support the resource assignment in the HPC context. Computing resources can either be -assigned on a per function call basis or as a block allocation on a per Executor basis. The :code:`pympipool` library -is built on top of the `flux-framework `_ to enable fine-grained resource assignment. In -addition `Simple Linux Utility for Resource Management (SLURM) `_ is supported as alternative -queuing system and for workstation installations :code:`pympipool` can be installed without a job scheduler. - -pympipool is not ... -^^^^^^^^^^^^^^^^^^^^ -The pympipool library is not designed to request an allocation from the job scheduler of an HPC. Instead within a given -allocation from the job scheduler the :code:`pympipool` library can be employed to distribute a series of python -function calls over the available computing resources to achieve maximum computing resource utilization. - -Example -------- -The following examples illustrates how :code:`pympipool` can be used to distribute an MPI parallel function within a -queuing system allocation using the `flux-framework `_. :code:`example.py`:: - - import flux.job - from pympipool import Executor - - def calc(i): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank - - with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe: - fs = exe.submit(calc, 3) - print(fs.result()) - -This example can be executed using:: - - python example.py - -Which returns:: - - [(0, 2, 0), (0, 2, 1)], [(1, 2, 0), (1, 2, 1)] - -The important part in this example is that `mpi4py `_ is only used in the :code:`calc()` -function, not in the python script, consequently it is not necessary to call the script with :code:`mpiexec` but instead -a call with the regular python interpreter is sufficient. This highlights how :code:`pympipool` allows the users to -parallelize one function at a time and not having to convert their whole workflow to use `mpi4py `_. -At the same time the function can be distributed over all compute nodes in a given allocation of the job scheduler, in -contrast to the standard `concurrent.futures.Executor `_ -which only supports distribution within one compute node. - -The interface of the standard `concurrent.futures.Executor `_ -is extended by adding the option :code:`cores_per_worker=2` to assign multiple MPI ranks to each function call. To -create two workers the maximum number of cores can be increased to :code:`max_cores=4`. In this case each worker -receives two cores resulting in a total of four CPU cores being utilized. - -After submitting the function :code:`calc()` with the corresponding parameter to the executor :code:`exe.submit(calc, 0)` -a python `concurrent.futures.Future `_ is -returned. Consequently, the :code:`pympipool.Executor` can be used as a drop-in replacement for the -`concurrent.futures.Executor `_ -which allows the user to add parallelism to their workflow one function at a time. - -Disclaimer ----------- -While we try to develop a stable and reliable software library, the development remains a opensource project under the -BSD 3-Clause License without any warranties:: - - BSD 3-Clause License - - Copyright (c) 2022, Jan Janssen - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are met: - - * Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - - * Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE - FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, - OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Documentation -------------- - -.. toctree:: - :maxdepth: 2 - - installation - examples - development - -* :ref:`modindex` diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index 2a5a7c10..7216c8d8 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.3","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"c31c95fe-9af4-42fd-be2c-713afa380e09","cell_type":"markdown","source":"# Examples\nThe `pympipool.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nto simplify the up-scaling of individual functions in a given workflow.","metadata":{}},{"id":"a1c6370e-7c8a-4da2-ac7d-42a36e12b27c","cell_type":"markdown","source":"## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: ","metadata":{}},{"id":"8b663009-60af-4d71-8ef3-2e9c6cd79cce","cell_type":"code","source":"from concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=1) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":1},{"id":"56192fa7-bbd6-43fe-8598-ff764addfbac","cell_type":"markdown","source":"In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. ","metadata":{}},{"id":"99aba5f3-5667-450c-b31f-2b53918b1896","cell_type":"markdown","source":"The `pympipool.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) \nclass by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \nof workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based \nparallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per\nworker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the \n`oversubscribe` parameter. All these parameters are optional, so the `pympipool.Executor` can be used as a drop-in \nreplacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n\nThe previous example is rewritten for the `pympipool.Executor` in:","metadata":{}},{"id":"559f59cf-f074-4399-846d-a5706797ff64","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, executor=flux_exe) as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":2},{"id":"cbe445ae-9f52-4449-a936-a4ca1acc4500","cell_type":"markdown","source":"The result of the calculation is again `1+1=2`.","metadata":{}},{"id":"eb838571-24c6-4516-ab13-66f5943325b9","cell_type":"markdown","source":"Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:","metadata":{}},{"id":"e80ca2d6-4308-4e39-bec7-b55cfb024e79","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n fs_1 = exe.submit(calc, [2, 1])\n fs_2 = exe.submit(calc, [2, 2])\n fs_3 = exe.submit(calc, [2, 3])\n fs_4 = exe.submit(calc, [2, 4])\n print([\n fs_1.result(), \n fs_2.result(), \n fs_3.result(), \n fs_4.result(),\n ])\n","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":3},{"id":"4d97551b-f7c0-416b-bcc3-55392e938ee8","cell_type":"markdown","source":"In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\nof four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\nset of two pairs.\n\nIt returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nclasses. Still one advantage of using the `pympipool.Executor` rather than the built-in ones, is the ability to execute \nthe same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\nthe regular pickle package. ","metadata":{}},{"id":"a97edc41-1396-48a0-8fb5-98d691a69e90","cell_type":"markdown","source":"For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \nclass the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nalso implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \navailable in the `pympipool.Executor`:","metadata":{}},{"id":"3362afef-265f-4432-88ad-e051e6318c77","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":4},{"id":"27af5cc1-8514-4735-8bba-b4b32444901f","cell_type":"markdown","source":"The results remain the same. ","metadata":{}},{"id":"59747b38-64f8-4342-82ad-a771aaf7c4eb","cell_type":"markdown","source":"## Resource Assignment\nBy default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \nThis is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter \nrun-time `pympipool` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple \npython functions with similar resource requirements in the same flux job (or SLURM job step). \n\nThe following example illustrates the resource definition on both level. This is redundant. For block allocations the \nresources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\nor on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n**Executor level**.","metadata":{}},{"id":"d29280d4-c085-47b1-b7fa-602732d60832","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n # Optional resource definition \n cores_per_worker=1,\n threads_per_core=1,\n gpus_per_worker=0,\n oversubscribe=False, # not available with flux\n cwd=\"/home/jovyan/notebooks\",\n executor=flux_exe,\n hostname_localhost=False, # only required on MacOS\n backend=\"flux\", # optional in case the backend is not recognized\n block_allocation=False, \n init_function=None, # only available with block_allocation=True\n command_line_argument_lst=[], # additional command line arguments for SLURM\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n # Resource definition on the submission level\n resource_dict={\n \"cores\": 1,\n \"threads_per_core\": 1,\n \"gpus_per_core\": 0, # here it is gpus_per_core rather than gpus_per_worker\n \"oversubscribe\": False, # not available with flux\n \"cwd\": \"/home/jovyan/notebooks\",\n \"executor\": flux_exe,\n \"hostname_localhost\": False, # only required on MacOS\n # \"command_line_argument_lst\": [], # additional command line arguments for SLURM\n },\n )\n print(future_obj.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":5},{"id":"5c7055ad-d84d-4afc-9023-b53643c4138a","cell_type":"markdown","source":"The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: ","metadata":{}},{"id":"cd8f883f-5faf-43bc-b971-354aa9dcbecb","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n block_allocation=True, # reuse python processes\n executor=flux_exe,\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n )\n print(future_obj.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":6},{"id":"ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3","cell_type":"markdown","source":"The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required.","metadata":{}},{"id":"d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd","cell_type":"markdown","source":"## Data Handling\nA limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\nreading the same dataset repetitively, the `pympipool.Executor` in block allocation mode (`block_allocation=True`) loads the dataset only once per worker and afterwards \neach function submitted to this worker has access to the dataset, as it is already loaded in memory. To achieve this\nthe user defines an initialization function `init_function` which returns a dictionary with one key per dataset. The \nkeys of the dictionary can then be used as additional input parameters in each function submitted to the `pympipool.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded. \n\nThis functionality is illustrated below: ","metadata":{}},{"id":"050c2781-0c8c-436b-949c-580cabf5c63c","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i, j, k):\n return i + j + k\n\ndef init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe:\n fs = exe.submit(calc, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"10\n","output_type":"stream"}],"execution_count":7},{"id":"8386b4e6-290f-4733-8c50-4312f9ba07e4","cell_type":"markdown","source":"The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \ntwo inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\nsecond input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the \n`pympipool.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned \ndictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function,\n`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()`\nfunction does not define the `l` parameter this one is also ignored. \n\nThe result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\nfunction.","metadata":{}},{"id":"0d623365-1b84-4c69-97ee-f6718be8ab39","cell_type":"markdown","source":"## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`.","metadata":{}},{"id":"33f9eee3-e327-43e4-8f15-3cf709f3975c","cell_type":"markdown","source":"### Thread-based Parallelism\nThe number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n`pympipool.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library \nuses for thread based parallelism, so it might be necessary to set certain environment variables manually: \n\n* `OMP_NUM_THREADS`: for openmp\n* `OPENBLAS_NUM_THREADS`: for openblas\n* `MKL_NUM_THREADS`: for mkl\n* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X\n* `NUMEXPR_NUM_THREADS`: for numexpr\n\nAt the current stage `pympipool.Executor` does not set these parameters itself, so you have to add them in the function\nyou submit before importing the corresponding library: \n","metadata":{}},{"id":"a9799f38-b9b8-411e-945d-dae951151d26","cell_type":"code","source":"def calc(i):\n import os\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":8},{"id":"4d2af8e0-8b49-40cc-a9ed-298d6c68870c","cell_type":"markdown","source":"Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n","metadata":{}},{"id":"2faf6399-0230-4cdd-b4d2-2508dee66d47","cell_type":"markdown","source":"### MPI Parallel Python Functions\nBeyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \nscientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used\nto parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI\ncommunication standards to coordinate the way how information is distributed. Just like the `pympipool.Executor` the \n[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) \nimplements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\ninterface. Still in this case eah python function submitted to the executor is still limited to serial execution. The\nnovel approach of the `pympipool.Executor` is mixing these two types of parallelism. Individual functions can use\nthe [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this \nfunction while these functions can still me submitted to the `pympipool.Executor` just like any other function. The\nadvantage of this approach is that the users can parallelize their workflows one function at the time. \n\nThe example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: ","metadata":{}},{"id":"44e510fc-8897-46a8-bef7-f1a5c47e4fbf","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i):\n from mpi4py import MPI\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe:\n fs = exe.submit(calc, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[(3, 2, 0), (3, 2, 1)]\n","output_type":"stream"}],"execution_count":9},{"id":"4fa03544-1dfc-465a-b352-0458b710cbcd","cell_type":"markdown","source":"The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \nallocation and the rank of the current process within the MPI allocation. This function is then submitted to an \n`pympipool.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function\ncall is going to have access to two cores. \n\nJust like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io)\nlibrary in the background it is not necessary to execute the script with `mpiexec` or `mpirun`.\n\nThe response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \nbeing the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\nand finally the index of the specific process `0` or `1`. ","metadata":{}},{"id":"581e948b-8c66-42fb-b4b2-279cc9e1c1f3","cell_type":"markdown","source":"### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: ","metadata":{}},{"id":"a72f5ac7-d4d2-4143-b299-ae1496f5bda8","cell_type":"raw","source":"import socket\nimport flux.job\nfrom pympipool import Executor\nfrom tensorflow.python.client import device_lib\n\ndef get_available_gpus():\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(\n max_workers=2, \n gpus_per_worker=1,\n executor=flux_exe,\n ) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())","metadata":{}},{"id":"23794ff4-916f-4b03-a18a-c232bab68dfa","cell_type":"markdown","source":"The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \nrequires `pympipool` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\nor preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, \nas two functions are submitted and the results are printed. \n\nTo clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\nthe submission script is given below: ","metadata":{}},{"id":"cc992f32-6224-49ce-b331-c7d58abb7738","cell_type":"raw","source":"#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py","metadata":{}},{"id":"5f77c45c-7077-4edf-ace7-1922faecd380","cell_type":"markdown","source":"The important part is that for using the `pympipool.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not \nneed to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `pympipool`\ninternally calls `srun` to assign the individual resources to a given worker. \n\nFor the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler\nwithin the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\nby calling `srun flux start` in the submission script: ","metadata":{}},{"id":"28612f20-164d-47ff-8789-3a3965031cfe","cell_type":"raw","source":"#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py","metadata":{}},{"id":"87529bb6-1fbb-4416-8edb-0b3124f4dec2","cell_type":"markdown","source":"As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n","metadata":{}},{"id":"0e4a6e73-38b1-4a6c-b567-d6b079a58886","cell_type":"markdown","source":"## Coupled Functions \nFor submitting two functions with rather different computing resource requirements it is essential to represent this \ndependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of \nindividual python functions and including the `concurrent.futures.Future` object of the first submitted function as \ninput for the second function during the submission. Consequently, this functionality can be used for directed acyclic \ngraphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\nand two:","metadata":{}},{"id":"c84442ee-68e4-4065-97e7-bfad7582acfc","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n future_1 = exe.submit(\n calc_function, \n 1,\n parameter_b=2,\n resource_dict={\"cores\": 1},\n )\n future_2 = exe.submit(\n calc_function, \n 1,\n parameter_b=future_1,\n resource_dict={\"cores\": 1},\n )\n print(future_2.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"4\n","output_type":"stream"}],"execution_count":10},{"id":"bd3e6eea-3a77-49ec-8fec-d88274aeeda5","cell_type":"markdown","source":"Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. ","metadata":{}},{"id":"d1086337-5291-4e06-96d1-a6e162d28c58","cell_type":"markdown","source":"## SLURM Job Scheduler\nUsing `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in \n`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`\ncall represents a request to the central database of SLURM this can drastically reduce the performance, especially for\nlarge numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org)\nis recommended as secondary job scheduler even within the context of the SLURM job manager. \n\nStill the general usage of `pympipool` remains similar even with SLURM as backend:","metadata":{}},{"id":"976b8c05-4b23-42e6-be00-48ab3cfb3203","cell_type":"raw","source":"from pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"slurm\") as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())","metadata":{}},{"id":"ae8dd860-f90f-47b4-b3e5-664f5c949350","cell_type":"markdown","source":"The `backend=\"slurm\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nIn addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\na list of command line arguments for the `srun` command. ","metadata":{}},{"id":"449d2c7a-67ba-449e-8e0b-98a228707e1c","cell_type":"markdown","source":"## Workstation Support\nWhile the high performance computing (HPC) setup is limited to the Linux operating system, `pympipool` can also be used\nin combination with MacOS and Windows. These setups are limited to a single compute node. \n\nStill the general usage of `pympipool` remains similar:","metadata":{}},{"id":"fa147b3b-61df-4884-b90c-544362bc95d9","cell_type":"code","source":"from pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"mpi\") as exe:\n future = exe.submit(sum, [1,1], resource_dict={\"cores\": 1})\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":11},{"id":"0370b42d-237b-4169-862a-b0bac4bb858b","cell_type":"markdown","source":"The `backend=\"mpi\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nWorkstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the\nlook up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\nthan using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \nintroduced. ","metadata":{}}]} \ No newline at end of file +{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.3","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"c31c95fe-9af4-42fd-be2c-713afa380e09","cell_type":"markdown","source":"# Examples\nThe `pympipool.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nto simplify the up-scaling of individual functions in a given workflow.","metadata":{}},{"id":"a1c6370e-7c8a-4da2-ac7d-42a36e12b27c","cell_type":"markdown","source":"## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: ","metadata":{}},{"id":"8b663009-60af-4d71-8ef3-2e9c6cd79cce","cell_type":"code","source":"from concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=1) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":1},{"id":"56192fa7-bbd6-43fe-8598-ff764addfbac","cell_type":"markdown","source":"In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. ","metadata":{}},{"id":"99aba5f3-5667-450c-b31f-2b53918b1896","cell_type":"markdown","source":"The `pympipool.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) \nclass by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \nof workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based \nparallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per\nworker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the \n`oversubscribe` parameter. All these parameters are optional, so the `pympipool.Executor` can be used as a drop-in \nreplacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n\nThe previous example is rewritten for the `pympipool.Executor` in:","metadata":{}},{"id":"559f59cf-f074-4399-846d-a5706797ff64","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, executor=flux_exe) as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":2},{"id":"cbe445ae-9f52-4449-a936-a4ca1acc4500","cell_type":"markdown","source":"The result of the calculation is again `1+1=2`.","metadata":{}},{"id":"eb838571-24c6-4516-ab13-66f5943325b9","cell_type":"markdown","source":"Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:","metadata":{}},{"id":"e80ca2d6-4308-4e39-bec7-b55cfb024e79","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n fs_1 = exe.submit(calc, [2, 1])\n fs_2 = exe.submit(calc, [2, 2])\n fs_3 = exe.submit(calc, [2, 3])\n fs_4 = exe.submit(calc, [2, 4])\n print([\n fs_1.result(), \n fs_2.result(), \n fs_3.result(), \n fs_4.result(),\n ])\n","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":3},{"id":"4d97551b-f7c0-416b-bcc3-55392e938ee8","cell_type":"markdown","source":"In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\nof four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\nset of two pairs.\n\nIt returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nclasses. Still one advantage of using the `pympipool.Executor` rather than the built-in ones, is the ability to execute \nthe same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\nthe regular pickle package. ","metadata":{}},{"id":"a97edc41-1396-48a0-8fb5-98d691a69e90","cell_type":"markdown","source":"For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \nclass the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nalso implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \navailable in the `pympipool.Executor`:","metadata":{}},{"id":"3362afef-265f-4432-88ad-e051e6318c77","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":4},{"id":"27af5cc1-8514-4735-8bba-b4b32444901f","cell_type":"markdown","source":"The results remain the same. ","metadata":{}},{"id":"59747b38-64f8-4342-82ad-a771aaf7c4eb","cell_type":"markdown","source":"## Resource Assignment\nBy default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \nThis is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter \nrun-time `pympipool` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple \npython functions with similar resource requirements in the same flux job (or SLURM job step). \n\nThe following example illustrates the resource definition on both level. This is redundant. For block allocations the \nresources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\nor on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n**Executor level**.","metadata":{}},{"id":"d29280d4-c085-47b1-b7fa-602732d60832","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n # Optional resource definition \n cores_per_worker=1,\n threads_per_core=1,\n gpus_per_worker=0,\n oversubscribe=False, # not available with flux\n cwd=\"/home/jovyan/notebooks\",\n executor=flux_exe,\n hostname_localhost=False, # only required on MacOS\n backend=\"flux\", # optional in case the backend is not recognized\n block_allocation=False, \n init_function=None, # only available with block_allocation=True\n command_line_argument_lst=[], # additional command line arguments for SLURM\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n # Resource definition on the submission level\n resource_dict={\n \"cores\": 1,\n \"threads_per_core\": 1,\n \"gpus_per_core\": 0, # here it is gpus_per_core rather than gpus_per_worker\n \"oversubscribe\": False, # not available with flux\n \"cwd\": \"/home/jovyan/notebooks\",\n \"executor\": flux_exe,\n \"hostname_localhost\": False, # only required on MacOS\n # \"command_line_argument_lst\": [], # additional command line arguments for SLURM\n },\n )\n print(future_obj.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":5},{"id":"5c7055ad-d84d-4afc-9023-b53643c4138a","cell_type":"markdown","source":"The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: ","metadata":{}},{"id":"cd8f883f-5faf-43bc-b971-354aa9dcbecb","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n block_allocation=True, # reuse python processes\n executor=flux_exe,\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n )\n print(future_obj.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":6},{"id":"ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3","cell_type":"markdown","source":"The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required.","metadata":{}},{"id":"d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd","cell_type":"markdown","source":"## Data Handling\nA limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\nreading the same dataset repetitively, the `pympipool.Executor` in block allocation mode (`block_allocation=True`) loads the dataset only once per worker and afterwards \neach function submitted to this worker has access to the dataset, as it is already loaded in memory. To achieve this\nthe user defines an initialization function `init_function` which returns a dictionary with one key per dataset. The \nkeys of the dictionary can then be used as additional input parameters in each function submitted to the `pympipool.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded. \n\nThis functionality is illustrated below: ","metadata":{}},{"id":"050c2781-0c8c-436b-949c-580cabf5c63c","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i, j, k):\n return i + j + k\n\ndef init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe:\n fs = exe.submit(calc, 2, j=5)\n print(fs.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"10\n","output_type":"stream"}],"execution_count":7},{"id":"8386b4e6-290f-4733-8c50-4312f9ba07e4","cell_type":"markdown","source":"The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \ntwo inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\nsecond input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the \n`pympipool.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned \ndictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function,\n`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()`\nfunction does not define the `l` parameter this one is also ignored. \n\nThe result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\nfunction.","metadata":{}},{"id":"0d623365-1b84-4c69-97ee-f6718be8ab39","cell_type":"markdown","source":"## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`.","metadata":{}},{"id":"33f9eee3-e327-43e4-8f15-3cf709f3975c","cell_type":"markdown","source":"### Thread-based Parallelism\nThe number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n`pympipool.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library \nuses for thread based parallelism, so it might be necessary to set certain environment variables manually: \n\n* `OMP_NUM_THREADS`: for openmp\n* `OPENBLAS_NUM_THREADS`: for openblas\n* `MKL_NUM_THREADS`: for mkl\n* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X\n* `NUMEXPR_NUM_THREADS`: for numexpr\n\nAt the current stage `pympipool.Executor` does not set these parameters itself, so you have to add them in the function\nyou submit before importing the corresponding library: \n","metadata":{}},{"id":"a9799f38-b9b8-411e-945d-dae951151d26","cell_type":"code","source":"def calc(i):\n import os\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n return i","metadata":{"trusted":false},"outputs":[],"execution_count":8},{"id":"4d2af8e0-8b49-40cc-a9ed-298d6c68870c","cell_type":"markdown","source":"Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n","metadata":{}},{"id":"2faf6399-0230-4cdd-b4d2-2508dee66d47","cell_type":"markdown","source":"### MPI Parallel Python Functions\nBeyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \nscientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used\nto parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI\ncommunication standards to coordinate the way how information is distributed. Just like the `pympipool.Executor` the \n[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) \nimplements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\ninterface. Still in this case eah python function submitted to the executor is still limited to serial execution. The\nnovel approach of the `pympipool.Executor` is mixing these two types of parallelism. Individual functions can use\nthe [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this \nfunction while these functions can still me submitted to the `pympipool.Executor` just like any other function. The\nadvantage of this approach is that the users can parallelize their workflows one function at the time. \n\nThe example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: ","metadata":{}},{"id":"44e510fc-8897-46a8-bef7-f1a5c47e4fbf","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i):\n from mpi4py import MPI\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe:\n fs = exe.submit(calc, 3)\n print(fs.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"[(3, 2, 0), (3, 2, 1)]\n","output_type":"stream"}],"execution_count":9},{"id":"4fa03544-1dfc-465a-b352-0458b710cbcd","cell_type":"markdown","source":"The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \nallocation and the rank of the current process within the MPI allocation. This function is then submitted to an \n`pympipool.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function\ncall is going to have access to two cores. \n\nJust like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io)\nlibrary in the background it is not necessary to execute the script with `mpiexec` or `mpirun`.\n\nThe response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \nbeing the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\nand finally the index of the specific process `0` or `1`. ","metadata":{}},{"id":"581e948b-8c66-42fb-b4b2-279cc9e1c1f3","cell_type":"markdown","source":"### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: ","metadata":{}},{"id":"7d1bca64-14fb-4d40-997d-b58a011508bf","cell_type":"markdown","source":"```\nimport socket\nimport flux.job\nfrom pympipool import Executor\nfrom tensorflow.python.client import device_lib\n\ndef get_available_gpus():\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(\n max_workers=2, \n gpus_per_worker=1,\n executor=flux_exe,\n ) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"23794ff4-916f-4b03-a18a-c232bab68dfa","cell_type":"markdown","source":"The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \nrequires `pympipool` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\nor preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, \nas two functions are submitted and the results are printed. \n\nTo clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\nthe submission script is given below: ","metadata":{}},{"id":"6dea0b84-65fd-4785-b78d-0ad3ff5aaa95","cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py\n```","metadata":{}},{"id":"5f77c45c-7077-4edf-ace7-1922faecd380","cell_type":"markdown","source":"The important part is that for using the `pympipool.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not \nneed to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `pympipool`\ninternally calls `srun` to assign the individual resources to a given worker. \n\nFor the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler\nwithin the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\nby calling `srun flux start` in the submission script: ","metadata":{}},{"id":"e2cd51d8-8991-42bc-943a-050b6d7c74c3","cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py\n````","metadata":{}},{"id":"87529bb6-1fbb-4416-8edb-0b3124f4dec2","cell_type":"markdown","source":"As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n","metadata":{}},{"id":"0e4a6e73-38b1-4a6c-b567-d6b079a58886","cell_type":"markdown","source":"## Coupled Functions \nFor submitting two functions with rather different computing resource requirements it is essential to represent this \ndependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of \nindividual python functions and including the `concurrent.futures.Future` object of the first submitted function as \ninput for the second function during the submission. Consequently, this functionality can be used for directed acyclic \ngraphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\nand two:","metadata":{}},{"id":"c84442ee-68e4-4065-97e7-bfad7582acfc","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n future_1 = exe.submit(\n calc_function, \n 1,\n parameter_b=2,\n resource_dict={\"cores\": 1},\n )\n future_2 = exe.submit(\n calc_function, \n 1,\n parameter_b=future_1,\n resource_dict={\"cores\": 1},\n )\n print(future_2.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"4\n","output_type":"stream"}],"execution_count":10},{"id":"bd3e6eea-3a77-49ec-8fec-d88274aeeda5","cell_type":"markdown","source":"Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. ","metadata":{}},{"id":"d1086337-5291-4e06-96d1-a6e162d28c58","cell_type":"markdown","source":"## SLURM Job Scheduler\nUsing `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in \n`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`\ncall represents a request to the central database of SLURM this can drastically reduce the performance, especially for\nlarge numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org)\nis recommended as secondary job scheduler even within the context of the SLURM job manager. \n\nStill the general usage of `pympipool` remains similar even with SLURM as backend:","metadata":{}},{"id":"27569937-7d99-4697-b3ee-f68c43b95a10","cell_type":"markdown","source":"```\nfrom pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"slurm\") as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())\n```","metadata":{}},{"id":"ae8dd860-f90f-47b4-b3e5-664f5c949350","cell_type":"markdown","source":"The `backend=\"slurm\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nIn addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\na list of command line arguments for the `srun` command. ","metadata":{}},{"id":"449d2c7a-67ba-449e-8e0b-98a228707e1c","cell_type":"markdown","source":"## Workstation Support\nWhile the high performance computing (HPC) setup is limited to the Linux operating system, `pympipool` can also be used\nin combination with MacOS and Windows. These setups are limited to a single compute node. \n\nStill the general usage of `pympipool` remains similar:","metadata":{}},{"id":"fa147b3b-61df-4884-b90c-544362bc95d9","cell_type":"code","source":"from pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"mpi\") as exe:\n future = exe.submit(sum, [1,1], resource_dict={\"cores\": 1})\n print(future.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":11},{"id":"0370b42d-237b-4169-862a-b0bac4bb858b","cell_type":"markdown","source":"The `backend=\"mpi\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nWorkstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the\nlook up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\nthan using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \nintroduced. ","metadata":{}}]} \ No newline at end of file