diff --git a/notebooks/1-single-node.ipynb b/notebooks/1-single-node.ipynb
index 60689144..03da88b0 100644
--- a/notebooks/1-single-node.ipynb
+++ b/notebooks/1-single-node.ipynb
@@ -1,924 +1 @@
-{
- "cells": [
- {
- "cell_type": "markdown",
- "id": "6915218e-7cf3-4bf4-9618-7e6942b4762f",
- "metadata": {},
- "source": [
- "# Single Node Executor\n",
- "The `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n",
- "\n",
- "An advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. "
- ]
- },
- {
- "cell_type": "markdown",
- "id": "ccc686dd-8fc5-4755-8a19-f40010ebb1b8",
- "metadata": {},
- "source": [
- "## Basic Functionality\n",
- "The general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 1,
- "id": "b1907f12-7378-423b-9b83-1b65fc0a20f5",
- "metadata": {},
- "outputs": [],
- "source": [
- "from executorlib import SingleNodeExecutor"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "1654679f-38b3-4699-9bfe-b48cbde0b2db",
- "metadata": {},
- "source": [
- "It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 2,
- "id": "16f7d138-ed77-45ea-a554-d329f7237500",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "2\n",
- "CPU times: user 84.4 ms, sys: 59.3 ms, total: 144 ms\n",
- "Wall time: 482 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor() as exe:\n",
- " future = exe.submit(sum, [1, 1])\n",
- " print(future.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1109584-9db2-4f9d-b3ed-494d96241396",
- "metadata": {},
- "source": [
- "As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 3,
- "id": "cfccdf9a-b23b-4814-8c14-36703a8a5f9e",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[4, 6, 8]\n",
- "CPU times: user 39.7 ms, sys: 26.8 ms, total: 66.5 ms\n",
- "Wall time: 524 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor() as exe:\n",
- " future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n",
- " print([f.result() for f in future_lst])"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "7db58f70-8137-4f1c-a87b-0d282f2bc3c5",
- "metadata": {},
- "source": [
- "If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 4,
- "id": "abd0beb7-471d-490e-bb9c-96755bd7aacf",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[10, 12, 14]\n",
- "CPU times: user 28 ms, sys: 23.1 ms, total: 51.1 ms\n",
- "Wall time: 517 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor() as exe:\n",
- " results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n",
- " print(list(results))"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "ac86bf47-4eb6-4d7c-acae-760b880803a8",
- "metadata": {},
- "source": [
- "These three examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library."
- ]
- },
- {
- "cell_type": "markdown",
- "id": "5de0f0f2-bf5c-46b3-8171-a3a206ce6775",
- "metadata": {},
- "source": [
- "## Parallel Functions\n",
- "Writing parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. "
- ]
- },
- {
- "cell_type": "markdown",
- "id": "dc8e692f-bf6c-4838-bb82-6a6b8454a2e7",
- "metadata": {},
- "source": [
- "### MPI Parallel Functions\n",
- "MPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n",
- "\n",
- "The following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n",
- "\n",
- "The [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 5,
- "id": "a251d083-489e-41c1-9e49-c86093858006",
- "metadata": {},
- "outputs": [],
- "source": [
- "def calc_mpi(i):\n",
- " from mpi4py import MPI\n",
- "\n",
- " size = MPI.COMM_WORLD.Get_size()\n",
- " rank = MPI.COMM_WORLD.Get_rank()\n",
- " return i, size, rank"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "adbf8a10-04e1-4fd9-8768-4375bcba9ec3",
- "metadata": {},
- "source": [
- "The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 6,
- "id": "266864f1-d29e-4934-9b5d-51f4ffb11f5c",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[(3, 2, 0), (3, 2, 1)]\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor() as exe:\n",
- " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "3a449e3f-d7a4-4056-a1d0-35dfca4dad22",
- "metadata": {},
- "source": [
- "Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 7,
- "id": "cb4ad978-bdf2-47bb-a7df-846641a54ec2",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[(3, 2, 0), (3, 2, 1)]\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n",
- " fs = exe.submit(calc_mpi, 3)\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "c1d1d7b1-64fa-4e47-bbde-2a16036568d6",
- "metadata": {},
- "source": [
- "In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \n",
- "Management (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary)."
- ]
- },
- {
- "cell_type": "markdown",
- "id": "4f5c5221-d99c-4614-82b1-9d6d3260c1bf",
- "metadata": {},
- "source": [
- "### Thread Parallel Functions\n",
- "An alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 8,
- "id": "7a7d21f6-9f1a-4f30-8024-9993e156dc75",
- "metadata": {},
- "outputs": [],
- "source": [
- "def calc_with_threads(i):\n",
- " import os\n",
- "\n",
- " os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n",
- " os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n",
- " os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n",
- " os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n",
- " os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n",
- " import numpy as np\n",
- "\n",
- " return i"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "82ed8f46-836c-402e-9363-be6e16c2a0b0",
- "metadata": {},
- "source": [
- "Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 9,
- "id": "b8ed330d-ee77-44a0-a02f-670fa945b043",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "3\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor() as exe:\n",
- " fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "63222cd5-664b-4aba-a80c-5814166b1239",
- "metadata": {},
- "source": [
- "Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 10,
- "id": "31562f89-c01c-4e7a-bbdd-fa26ca99e68b",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "3\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n",
- " fs = exe.submit(calc_with_threads, 3)\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "8b78a7b4-066e-4cbc-858e-606c8bbbbf0c",
- "metadata": {},
- "source": [
- "For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n",
- "\n",
- "Beyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions."
- ]
- },
- {
- "cell_type": "markdown",
- "id": "ca9bc450-2762-4d49-b7f8-48cc83e068fd",
- "metadata": {},
- "source": [
- "## Performance Optimization\n",
- "The default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features."
- ]
- },
- {
- "cell_type": "markdown",
- "id": "e9b52ecf-3984-4695-98e7-315aa3712104",
- "metadata": {},
- "source": [
- "### Block Allocation\n",
- "By default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 11,
- "id": "0da4c7d0-2268-4ea8-b62d-5d94c79ebc72",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "2\n",
- "CPU times: user 31.1 ms, sys: 19.1 ms, total: 50.1 ms\n",
- "Wall time: 394 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n",
- " future = exe.submit(sum, [1, 1])\n",
- " print(future.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "d38163b3-1c04-431c-964b-2bad4f823a4d",
- "metadata": {},
- "source": [
- "The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 12,
- "id": "cb8c4943-4c78-4203-95f2-1db758e588d9",
- "metadata": {},
- "outputs": [],
- "source": [
- "def calc_mpi(i):\n",
- " from mpi4py import MPI\n",
- "\n",
- " size = MPI.COMM_WORLD.Get_size()\n",
- " rank = MPI.COMM_WORLD.Get_rank()\n",
- " return i, size, rank"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "9e1212c4-e3fb-4e21-be43-0a4f0a08b856",
- "metadata": {},
- "source": [
- "Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 13,
- "id": "5ebf7195-58f9-40f2-8203-2d4b9f0e9602",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[(3, 2, 0), (3, 2, 1)]\n"
- ]
- },
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "--------------------------------------------------------------------------\n",
- "A system call failed during shared memory initialization that should\n",
- "not have. It is likely that your MPI job will now either abort or\n",
- "experience performance degradation.\n",
- "\n",
- " Local host: MacBook-Pro.local\n",
- " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.55070/1/vader_segment.MacBook-Pro.501.96730001.1\n",
- " Error: No such file or directory (errno 2)\n",
- "--------------------------------------------------------------------------\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor(resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n",
- " fs = exe.submit(calc_mpi, 3)\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "b75fb95f-f2f5-4be9-9f2a-9c2e9961c644",
- "metadata": {},
- "source": [
- "The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 14,
- "id": "8aa754cc-eb1a-4fa1-bd72-272246df1d2f",
- "metadata": {},
- "outputs": [],
- "source": [
- "def init_function():\n",
- " return {\"j\": 4, \"k\": 3, \"l\": 2}"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 15,
- "id": "1854895a-7239-4b30-b60d-cf1a89234464",
- "metadata": {},
- "outputs": [],
- "source": [
- "def calc_with_preload(i, j, k):\n",
- " return i + j + k"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "d07cf107-3627-4cb0-906c-647497d6e0d2",
- "metadata": {},
- "source": [
- "The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 16,
- "id": "cc648799-a0c6-4878-a469-97457bce024f",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "10\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor(init_function=init_function, block_allocation=True) as exe:\n",
- " fs = exe.submit(calc_with_preload, 2, j=5)\n",
- " print(fs.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "1073b8ca-1492-46e9-8d1f-f52ad48d28a2",
- "metadata": {},
- "source": [
- "The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function."
- ]
- },
- {
- "cell_type": "markdown",
- "id": "24397d78-dff1-4834-830c-a8f390fe6b9c",
- "metadata": {},
- "source": [
- "### Cache\n",
- "The development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n",
- "\n",
- "The [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 17,
- "id": "ecdcef49-5c89-4538-b377-d53979673bf7",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[2, 4, 6]\n",
- "CPU times: user 512 ms, sys: 138 ms, total: 650 ms\n",
- "Wall time: 865 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor(cache_directory=\"./file\") as exe:\n",
- " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n",
- " print([f.result() for f in future_lst])"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "32d0fb2e-5ac1-4249-b6c8-953c92fdfded",
- "metadata": {},
- "source": [
- "When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n",
- "\n",
- "Still it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 18,
- "id": "c39babe8-4370-4d31-9520-9a7ce63378c8",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[2, 4, 6]\n",
- "CPU times: user 56.7 ms, sys: 32.5 ms, total: 89.2 ms\n",
- "Wall time: 620 ms\n"
- ]
- }
- ],
- "source": [
- "%%time\n",
- "with SingleNodeExecutor(cache_directory=\"./file\") as exe:\n",
- " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n",
- " print([f.result() for f in future_lst])"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "5144a035-633e-4e60-a362-f3b15b28848b",
- "metadata": {},
- "source": [
- "An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 19,
- "id": "f574b9e1-de55-4e38-aef7-a4bed540e040",
- "metadata": {},
- "outputs": [
- {
- "data": {
- "text/html": [
- "
\n",
- "\n",
- "
\n",
- " \n",
- "
\n",
- "
\n",
- "
function
\n",
- "
input_args
\n",
- "
input_kwargs
\n",
- "
output
\n",
- "
runtime
\n",
- "
filename
\n",
- "
\n",
- " \n",
- " \n",
- "
\n",
- "
0
\n",
- "
<built-in function sum>
\n",
- "
([1, 1],)
\n",
- "
{}
\n",
- "
2
\n",
- "
0.001686
\n",
- "
sum0d968285d17368d1c34ea7392309bcc5.h5out
\n",
- "
\n",
- "
\n",
- "
1
\n",
- "
<built-in function sum>
\n",
- "
([3, 3],)
\n",
- "
{}
\n",
- "
6
\n",
- "
0.136151
\n",
- "
sum0102e33bb2921ae07a3bbe3db5d3dec9.h5out
\n",
- "
\n",
- "
\n",
- "
2
\n",
- "
<built-in function sum>
\n",
- "
([2, 2],)
\n",
- "
{}
\n",
- "
4
\n",
- "
0.136006
\n",
- "
sum6270955d7c8022a0c1027aafaee64439.h5out
\n",
- "
\n",
- " \n",
- "
\n",
- "
"
- ],
- "text/plain": [
- " function input_args input_kwargs output runtime \\\n",
- "0 ([1, 1],) {} 2 0.001686 \n",
- "1 ([3, 3],) {} 6 0.136151 \n",
- "2 ([2, 2],) {} 4 0.136006 \n",
- "\n",
- " filename \n",
- "0 sum0d968285d17368d1c34ea7392309bcc5.h5out \n",
- "1 sum0102e33bb2921ae07a3bbe3db5d3dec9.h5out \n",
- "2 sum6270955d7c8022a0c1027aafaee64439.h5out "
- ]
- },
- "execution_count": 19,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "import pandas\n",
- "from executorlib import get_cache_data\n",
- "\n",
- "df = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\n",
- "df"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "68092479-e846-494a-9ac9-d9638b102bd8",
- "metadata": {},
- "source": [
- "After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 20,
- "id": "34a9316d-577f-4a63-af14-736fb4e6b219",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "['sum0d968285d17368d1c34ea7392309bcc5.h5out', 'sum0102e33bb2921ae07a3bbe3db5d3dec9.h5out', 'sum6270955d7c8022a0c1027aafaee64439.h5out']\n"
- ]
- }
- ],
- "source": [
- "import os\n",
- "import shutil\n",
- "\n",
- "cache_dir = \"./file\"\n",
- "if os.path.exists(cache_dir):\n",
- " print(os.listdir(cache_dir))\n",
- " try:\n",
- " shutil.rmtree(cache_dir)\n",
- " except OSError:\n",
- " pass"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "1cea95b5-4110-444c-82af-fa6718bfa17f",
- "metadata": {},
- "source": [
- "Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. "
- ]
- },
- {
- "cell_type": "markdown",
- "id": "71a8a0be-a933-4e83-9da5-50da35e9975b",
- "metadata": {},
- "source": [
- "### Dependencies\n",
- "Many scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 21,
- "id": "d8b75a26-479d-405e-8895-a8d56b3f0f4b",
- "metadata": {},
- "outputs": [],
- "source": [
- "def calc_add(a, b):\n",
- " return a + b"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078",
- "metadata": {},
- "source": [
- "For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n",
- "\n",
- "The important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 22,
- "id": "35fd5747-c57d-4926-8d83-d5c55a130ad6",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "6\n"
- ]
- }
- ],
- "source": [
- "with SingleNodeExecutor() as exe:\n",
- " future = 0\n",
- " for i in range(1, 4):\n",
- " future = exe.submit(calc_add, i, future)\n",
- " print(future.result())"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "38e1bbb3-1028-4f50-93c1-d2427f399a7d",
- "metadata": {},
- "source": [
- "As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n",
- "\n",
- "No computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n",
- "\n",
- "Internally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 23,
- "id": "f67470b5-af1d-4add-9de8-7f259ca67324",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "None\n"
- ]
- },
- {
- "data": {
- "image/svg+xml": [
- ""
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- }
- ],
- "source": [
- "with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n",
- " future = 0\n",
- " for i in range(1, 4):\n",
- " future = exe.submit(calc_add, i, future)\n",
- " print(future.result())"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "67b52bd0-bd51-4538-a089-2776b8034547",
- "metadata": {},
- "outputs": [],
- "source": []
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3 (ipykernel)",
- "language": "python",
- "name": "python3"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.12.5"
- }
- },
- "nbformat": 4,
- "nbformat_minor": 5
-}
+{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.12.11","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"6915218e-7cf3-4bf4-9618-7e6942b4762f","cell_type":"markdown","source":"# Single Node Executor\nThe `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n\nAn advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. ","metadata":{}},{"id":"ccc686dd-8fc5-4755-8a19-f40010ebb1b8","cell_type":"markdown","source":"## Basic Functionality\nThe general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib.","metadata":{}},{"id":"b1907f12-7378-423b-9b83-1b65fc0a20f5","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"1654679f-38b3-4699-9bfe-b48cbde0b2db","cell_type":"markdown","source":"It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:","metadata":{}},{"id":"16f7d138-ed77-45ea-a554-d329f7237500","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 47.3 ms, sys: 60.3 ms, total: 108 ms\nWall time: 969 ms\n"}],"execution_count":2},{"id":"a1109584-9db2-4f9d-b3ed-494d96241396","cell_type":"markdown","source":"As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement.","metadata":{}},{"id":"cfccdf9a-b23b-4814-8c14-36703a8a5f9e","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[4, 6, 8]\nCPU times: user 39.9 ms, sys: 16.2 ms, total: 56.1 ms\nWall time: 3.77 s\n"}],"execution_count":3},{"id":"7db58f70-8137-4f1c-a87b-0d282f2bc3c5","cell_type":"markdown","source":"If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). ","metadata":{}},{"id":"abd0beb7-471d-490e-bb9c-96755bd7aacf","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n print(list(results))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[10, 12, 14]\nCPU times: user 41.7 ms, sys: 17.4 ms, total: 59.1 ms\nWall time: 3.81 s\n"}],"execution_count":4},{"id":"ac86bf47-4eb6-4d7c-acae-760b880803a8","cell_type":"markdown","source":"These three examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library.","metadata":{}},{"id":"5de0f0f2-bf5c-46b3-8171-a3a206ce6775","cell_type":"markdown","source":"## Parallel Functions\nWriting parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. ","metadata":{}},{"id":"dc8e692f-bf6c-4838-bb82-6a6b8454a2e7","cell_type":"markdown","source":"### MPI Parallel Functions\nMPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n\nThe following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n\nThe [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section.","metadata":{}},{"id":"a251d083-489e-41c1-9e49-c86093858006","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"adbf8a10-04e1-4fd9-8768-4375bcba9ec3","cell_type":"markdown","source":"The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:","metadata":{}},{"id":"266864f1-d29e-4934-9b5d-51f4ffb11f5c","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":6},{"id":"3a449e3f-d7a4-4056-a1d0-35dfca4dad22","cell_type":"markdown","source":"Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again.","metadata":{}},{"id":"cb4ad978-bdf2-47bb-a7df-846641a54ec2","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":7},{"id":"c1d1d7b1-64fa-4e47-bbde-2a16036568d6","cell_type":"markdown","source":"In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \nManagement (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).","metadata":{}},{"id":"4f5c5221-d99c-4614-82b1-9d6d3260c1bf","cell_type":"markdown","source":"### Thread Parallel Functions\nAn alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`.","metadata":{}},{"id":"7a7d21f6-9f1a-4f30-8024-9993e156dc75","cell_type":"code","source":"def calc_with_threads(i):\n import os\n\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":8},{"id":"82ed8f46-836c-402e-9363-be6e16c2a0b0","cell_type":"markdown","source":"Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:","metadata":{}},{"id":"b8ed330d-ee77-44a0-a02f-670fa945b043","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":9},{"id":"63222cd5-664b-4aba-a80c-5814166b1239","cell_type":"markdown","source":"Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:","metadata":{}},{"id":"31562f89-c01c-4e7a-bbdd-fa26ca99e68b","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n fs = exe.submit(calc_with_threads, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":10},{"id":"8b78a7b4-066e-4cbc-858e-606c8bbbbf0c","cell_type":"markdown","source":"For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n\nBeyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions.","metadata":{}},{"id":"ca9bc450-2762-4d49-b7f8-48cc83e068fd","cell_type":"markdown","source":"## Performance Optimization\nThe default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.","metadata":{}},{"id":"e9b52ecf-3984-4695-98e7-315aa3712104","cell_type":"markdown","source":"### Block Allocation\nBy default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. ","metadata":{}},{"id":"0da4c7d0-2268-4ea8-b62d-5d94c79ebc72","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 36.5 ms, sys: 9.69 ms, total: 46.2 ms\nWall time: 2 s\n"}],"execution_count":11},{"id":"d38163b3-1c04-431c-964b-2bad4f823a4d","cell_type":"markdown","source":"The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: ","metadata":{}},{"id":"cb8c4943-4c78-4203-95f2-1db758e588d9","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":12},{"id":"9e1212c4-e3fb-4e21-be43-0a4f0a08b856","cell_type":"markdown","source":"Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.","metadata":{}},{"id":"5ebf7195-58f9-40f2-8203-2d4b9f0e9602","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":13},{"id":"b75fb95f-f2f5-4be9-9f2a-9c2e9961c644","cell_type":"markdown","source":"The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.","metadata":{}},{"id":"8aa754cc-eb1a-4fa1-bd72-272246df1d2f","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":14},{"id":"1854895a-7239-4b30-b60d-cf1a89234464","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":15},{"id":"d07cf107-3627-4cb0-906c-647497d6e0d2","cell_type":"markdown","source":"The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored.","metadata":{}},{"id":"cc648799-a0c6-4878-a469-97457bce024f","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":16},{"id":"1073b8ca-1492-46e9-8d1f-f52ad48d28a2","cell_type":"markdown","source":"The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function.","metadata":{}},{"id":"24397d78-dff1-4834-830c-a8f390fe6b9c","cell_type":"markdown","source":"### Cache\nThe development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n\nThe [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. ","metadata":{}},{"id":"ecdcef49-5c89-4538-b377-d53979673bf7","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 50 ms, sys: 12.8 ms, total: 62.8 ms\nWall time: 3.19 s\n"}],"execution_count":17},{"id":"32d0fb2e-5ac1-4249-b6c8-953c92fdfded","cell_type":"markdown","source":"When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n\nStill it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. ","metadata":{}},{"id":"c39babe8-4370-4d31-9520-9a7ce63378c8","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 25.6 ms, sys: 6.75 ms, total: 32.3 ms\nWall time: 3.46 s\n"}],"execution_count":18},{"id":"5144a035-633e-4e60-a362-f3b15b28848b","cell_type":"markdown","source":"An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:","metadata":{}},{"id":"f574b9e1-de55-4e38-aef7-a4bed540e040","cell_type":"code","source":"import pandas\nfrom executorlib import get_cache_data\n\ndf = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\ndf","metadata":{"trusted":true},"outputs":[{"execution_count":19,"output_type":"execute_result","data":{"text/plain":" function input_args input_kwargs output runtime \\\n0 ([2, 2],) {} 4 2.977758 \n1 ([3, 3],) {} 6 2.778058 \n2 ([1, 1],) {} 2 2.700404 \n\n filename \n0 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f... \n1 /home/jovyan/file/sum0f7710227cda6456e5d071877... \n2 /home/jovyan/file/sumf5ad27b855231a293ddd735a8... ","text/html":"
\n\n
\n \n
\n
\n
function
\n
input_args
\n
input_kwargs
\n
output
\n
runtime
\n
filename
\n
\n \n \n
\n
0
\n
<built-in function sum>
\n
([2, 2],)
\n
{}
\n
4
\n
2.977758
\n
/home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
\n
\n
\n
1
\n
<built-in function sum>
\n
([3, 3],)
\n
{}
\n
6
\n
2.778058
\n
/home/jovyan/file/sum0f7710227cda6456e5d071877...
\n
\n
\n
2
\n
<built-in function sum>
\n
([1, 1],)
\n
{}
\n
2
\n
2.700404
\n
/home/jovyan/file/sumf5ad27b855231a293ddd735a8...
\n
\n \n
\n
"},"metadata":{}}],"execution_count":19},{"id":"68092479-e846-494a-9ac9-d9638b102bd8","cell_type":"markdown","source":"After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. ","metadata":{}},{"id":"34a9316d-577f-4a63-af14-736fb4e6b219","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":20},{"id":"1cea95b5-4110-444c-82af-fa6718bfa17f","cell_type":"markdown","source":"Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. ","metadata":{}},{"id":"71a8a0be-a933-4e83-9da5-50da35e9975b","cell_type":"markdown","source":"### Dependencies\nMany scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function.","metadata":{}},{"id":"d8b75a26-479d-405e-8895-a8d56b3f0f4b","cell_type":"code","source":"def calc_add(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":21},{"id":"36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078","cell_type":"markdown","source":"For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n\nThe important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`.","metadata":{}},{"id":"35fd5747-c57d-4926-8d83-d5c55a130ad6","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":22},{"id":"38e1bbb3-1028-4f50-93c1-d2427f399a7d","cell_type":"markdown","source":"As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n\nNo computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n\nInternally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. ","metadata":{}},{"id":"f67470b5-af1d-4add-9de8-7f259ca67324","cell_type":"code","source":"with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"None\n"},{"output_type":"display_data","data":{"text/plain":"","image/svg+xml":""},"metadata":{}}],"execution_count":23},{"id":"67b52bd0-bd51-4538-a089-2776b8034547","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]}
\ No newline at end of file