From 5b4ffe3b2c328a0c655412e68869f2c745f71f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 13 Nov 2025 07:11:17 +0100 Subject: [PATCH] fix documentation --- README.md | 1 + notebooks/1-single-node.ipynb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 74d82670..558009bb 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,7 @@ as hierarchical job scheduler within the allocations. * [Basic Functionality](https://executorlib.readthedocs.io/en/latest/1-single-node.html#basic-functionality) * [Parallel Functions](https://executorlib.readthedocs.io/en/latest/1-single-node.html#parallel-functions) * [Performance Optimization](https://executorlib.readthedocs.io/en/latest/1-single-node.html#performance-optimization) + * [Advanced Scheduling](https://executorlib.readthedocs.io/en/latest/1-single-node.html#advanced-scheduling) * [Testing and Debugging](https://executorlib.readthedocs.io/en/latest/1-single-node.html#testing-and-debugging) * [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) * [SLURM](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#slurm) diff --git a/notebooks/1-single-node.ipynb b/notebooks/1-single-node.ipynb index b4f7e78c..110b5c2e 100644 --- a/notebooks/1-single-node.ipynb +++ b/notebooks/1-single-node.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.13.9","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"6915218e-7cf3-4bf4-9618-7e6942b4762f","cell_type":"markdown","source":"# Single Node Executor\nThe `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n\nAn advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. ","metadata":{}},{"id":"ccc686dd-8fc5-4755-8a19-f40010ebb1b8","cell_type":"markdown","source":"## Basic Functionality\nThe general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib.","metadata":{}},{"id":"b1907f12-7378-423b-9b83-1b65fc0a20f5","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"1654679f-38b3-4699-9bfe-b48cbde0b2db","cell_type":"markdown","source":"It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:","metadata":{}},{"id":"16f7d138-ed77-45ea-a554-d329f7237500","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 45.6 ms, sys: 86.1 ms, total: 132 ms\nWall time: 285 ms\n"}],"execution_count":2},{"id":"a1109584-9db2-4f9d-b3ed-494d96241396","cell_type":"markdown","source":"As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement.","metadata":{}},{"id":"cfccdf9a-b23b-4814-8c14-36703a8a5f9e","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[4, 6, 8]\nCPU times: user 23.9 ms, sys: 15.2 ms, total: 39.1 ms\nWall time: 622 ms\n"}],"execution_count":3},{"id":"7db58f70-8137-4f1c-a87b-0d282f2bc3c5","cell_type":"markdown","source":"If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). ","metadata":{}},{"id":"abd0beb7-471d-490e-bb9c-96755bd7aacf","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n print(list(results))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[10, 12, 14]\nCPU times: user 28.2 ms, sys: 12.8 ms, total: 41 ms\nWall time: 673 ms\n"}],"execution_count":4},{"id":"ac86bf47-4eb6-4d7c-acae-760b880803a8","cell_type":"markdown","source":"These three examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library.","metadata":{}},{"id":"5de0f0f2-bf5c-46b3-8171-a3a206ce6775","cell_type":"markdown","source":"## Parallel Functions\nWriting parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. ","metadata":{}},{"id":"dc8e692f-bf6c-4838-bb82-6a6b8454a2e7","cell_type":"markdown","source":"### MPI Parallel Functions\nMPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n\nThe following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n\nThe [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section.","metadata":{}},{"id":"a251d083-489e-41c1-9e49-c86093858006","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"adbf8a10-04e1-4fd9-8768-4375bcba9ec3","cell_type":"markdown","source":"The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:","metadata":{}},{"id":"266864f1-d29e-4934-9b5d-51f4ffb11f5c","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":6},{"id":"3a449e3f-d7a4-4056-a1d0-35dfca4dad22","cell_type":"markdown","source":"Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again.","metadata":{}},{"id":"cb4ad978-bdf2-47bb-a7df-846641a54ec2","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":7},{"id":"c1d1d7b1-64fa-4e47-bbde-2a16036568d6","cell_type":"markdown","source":"In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \nManagement (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).","metadata":{}},{"id":"4f5c5221-d99c-4614-82b1-9d6d3260c1bf","cell_type":"markdown","source":"### Thread Parallel Functions\nAn alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`.","metadata":{}},{"id":"7a7d21f6-9f1a-4f30-8024-9993e156dc75","cell_type":"code","source":"def calc_with_threads(i):\n import os\n\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":8},{"id":"82ed8f46-836c-402e-9363-be6e16c2a0b0","cell_type":"markdown","source":"Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:","metadata":{}},{"id":"b8ed330d-ee77-44a0-a02f-670fa945b043","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":9},{"id":"63222cd5-664b-4aba-a80c-5814166b1239","cell_type":"markdown","source":"Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:","metadata":{}},{"id":"31562f89-c01c-4e7a-bbdd-fa26ca99e68b","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n fs = exe.submit(calc_with_threads, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":10},{"id":"8b78a7b4-066e-4cbc-858e-606c8bbbbf0c","cell_type":"markdown","source":"For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n\nBeyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions.","metadata":{}},{"id":"ca9bc450-2762-4d49-b7f8-48cc83e068fd","cell_type":"markdown","source":"## Performance Optimization\nThe default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.","metadata":{}},{"id":"e9b52ecf-3984-4695-98e7-315aa3712104","cell_type":"markdown","source":"### Block Allocation\nBy default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. ","metadata":{}},{"id":"0da4c7d0-2268-4ea8-b62d-5d94c79ebc72","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 23.6 ms, sys: 9.69 ms, total: 33.3 ms\nWall time: 360 ms\n"}],"execution_count":11},{"id":"d38163b3-1c04-431c-964b-2bad4f823a4d","cell_type":"markdown","source":"The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: ","metadata":{}},{"id":"cb8c4943-4c78-4203-95f2-1db758e588d9","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":12},{"id":"9e1212c4-e3fb-4e21-be43-0a4f0a08b856","cell_type":"markdown","source":"Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.","metadata":{}},{"id":"5ebf7195-58f9-40f2-8203-2d4b9f0e9602","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":13},{"id":"b75fb95f-f2f5-4be9-9f2a-9c2e9961c644","cell_type":"markdown","source":"The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.","metadata":{}},{"id":"8aa754cc-eb1a-4fa1-bd72-272246df1d2f","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":14},{"id":"1854895a-7239-4b30-b60d-cf1a89234464","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":15},{"id":"d07cf107-3627-4cb0-906c-647497d6e0d2","cell_type":"markdown","source":"The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored.","metadata":{}},{"id":"cc648799-a0c6-4878-a469-97457bce024f","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":16},{"id":"1073b8ca-1492-46e9-8d1f-f52ad48d28a2","cell_type":"markdown","source":"The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function.","metadata":{}},{"id":"24397d78-dff1-4834-830c-a8f390fe6b9c","cell_type":"markdown","source":"### Cache\nThe development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n\nThe [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. ","metadata":{}},{"id":"ecdcef49-5c89-4538-b377-d53979673bf7","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 501 ms, sys: 52.8 ms, total: 554 ms\nWall time: 1.11 s\n"}],"execution_count":17},{"id":"32d0fb2e-5ac1-4249-b6c8-953c92fdfded","cell_type":"markdown","source":"When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n\nStill it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. ","metadata":{}},{"id":"c39babe8-4370-4d31-9520-9a7ce63378c8","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 42.1 ms, sys: 24.3 ms, total: 66.4 ms\nWall time: 691 ms\n"}],"execution_count":18},{"id":"5144a035-633e-4e60-a362-f3b15b28848b","cell_type":"markdown","source":"An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:","metadata":{}},{"id":"f574b9e1-de55-4e38-aef7-a4bed540e040","cell_type":"code","source":"import pandas\nfrom executorlib import get_cache_data\n\ndf = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\ndf","metadata":{"trusted":true},"outputs":[{"execution_count":19,"output_type":"execute_result","data":{"text/plain":" function input_args input_kwargs output runtime \\\n0 ([2, 2],) {} 4 0.001219 \n1 ([3, 3],) {} 6 0.015866 \n2 ([1, 1],) {} 2 0.018141 \n\n filename \n0 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f... \n1 /home/jovyan/file/sum0f7710227cda6456e5d071877... \n2 /home/jovyan/file/sumf5ad27b855231a293ddd735a8... ","text/html":"
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
functioninput_argsinput_kwargsoutputruntimefilename
0<built-in function sum>([2, 2],){}40.001219/home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
1<built-in function sum>([3, 3],){}60.015866/home/jovyan/file/sum0f7710227cda6456e5d071877...
2<built-in function sum>([1, 1],){}20.018141/home/jovyan/file/sumf5ad27b855231a293ddd735a8...
\n
"},"metadata":{}}],"execution_count":19},{"id":"68092479-e846-494a-9ac9-d9638b102bd8","cell_type":"markdown","source":"After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. ","metadata":{}},{"id":"34a9316d-577f-4a63-af14-736fb4e6b219","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":20},{"id":"1cea95b5-4110-444c-82af-fa6718bfa17f","cell_type":"markdown","source":"Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. ","metadata":{}},{"id":"71a8a0be-a933-4e83-9da5-50da35e9975b","cell_type":"markdown","source":"### Dependencies\nMany scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function.","metadata":{}},{"id":"d8b75a26-479d-405e-8895-a8d56b3f0f4b","cell_type":"code","source":"def calc_add(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":21},{"id":"36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078","cell_type":"markdown","source":"For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n\nThe important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`.","metadata":{}},{"id":"35fd5747-c57d-4926-8d83-d5c55a130ad6","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":22},{"id":"38e1bbb3-1028-4f50-93c1-d2427f399a7d","cell_type":"markdown","source":"As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n\nNo computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n\nInternally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. ","metadata":{}},{"id":"f67470b5-af1d-4add-9de8-7f259ca67324","cell_type":"code","source":"with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"None\n"},{"output_type":"display_data","data":{"text/plain":"","image/svg+xml":"\n\n\n\n\n0\n\ncalc_add\n\n\n\n1\n\ncalc_add\n\n\n\n0->1\n\n\n\n\n\n2\n\ncalc_add\n\n\n\n1->2\n\n\n\n\n\n3\n\n1\n\n\n\n3->0\n\n\n\n\n\n4\n\n0\n\n\n\n4->0\n\n\n\n\n\n5\n\n2\n\n\n\n5->1\n\n\n\n\n\n6\n\n3\n\n\n\n6->2\n\n\n\n\n"},"metadata":{}}],"execution_count":23},{"id":"4c952a9d-2b58-401d-b7ec-fda740db774a","cell_type":"markdown","source":"# Advanced Scheduling\nGoing beyond just directed acyclic graphs (DAG) with one-to-many and many-to-one relationships, executorlib provides a number of advanced scheduling patterns. These are briefly introduced in the following.","metadata":{}},{"id":"4db2c87c-9a7d-4074-82d0-24357fa4f0e6","cell_type":"markdown","source":"## Runtime-dependent Batching \nTo maximize the throughput of dependent calculation tasks its important to idenify all tasks which can be executed at a given moment. Unfortunately, some of these dependencies can only be determined at run time, which is challenging for most schedulers. To demonstrate the runtime-dependent batching in executorlib we discuss the following example. Starting with a group of ten tasks and then grouping them into groups of three for processing. Still the order of the tasks, which tasks belong into which group, is only determined at run time. \n\nFor simplicity, we just use a simple function which directly returns the input.","metadata":{}},{"id":"857a5e0f-50d6-45ec-aac7-a7151f36c19f","cell_type":"code","source":"def reply(i):\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":24},{"id":"544dacab-5601-4a9b-814f-88560eafb079","cell_type":"markdown","source":"After the group of ten tasks is submitted their future objects are stored in a list named `future_individual_lst`. This list is then provided to the `batched()` function of the `SingleNodeExecutor()` to generate batches of tasks which are then provided to the `sum` function for further processing. The results of this second step are stored in the `future_group_lst`. Finally, the results of these future objects are evaluated in the third step. ","metadata":{}},{"id":"17d1354a-0943-4b62-9b0d-7d39c8df23f2","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_individual_lst = [\n exe.submit(reply, i) for i in range(10)\n ]\n future_group_lst = [\n exe.submit(sum, f) for f in exe.batched(future_individual_lst, n=3)\n ]\n print(sum([f.result() for f in future_group_lst]))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"45\n"}],"execution_count":25},{"id":"80e8b154-d659-4f3e-add2-701a454d770d","cell_type":"markdown","source":"## Split Future Objects\nIn analogy to the `batched()` function which combines multiple future objects in a batch for further processing, it is also necessary to split the future objects even before the evaluation is completed. Executorlib provides two utility functions, namely `split_future()` for tuples and lists and `get_item_from_future()` for dictionaries. \n\nStarting with a function which returns a tuple, named `get_a_tuple()`:","metadata":{}},{"id":"6ef541e3-a2be-4e05-9e77-65eaceff4248","cell_type":"code","source":"def get_a_tuple(i):\n return \"a\", \"b\", i","metadata":{"trusted":true},"outputs":[],"execution_count":26},{"id":"dd3e4312-1594-4b3b-baf3-5b3121ca8910","cell_type":"markdown","source":"This function is submitted to the `SingleNodeExecutor()` and while in this case it directly returns the tuple, the evaluation would commonly take much longer. By having the ability to split the output of the future object using the `split_future()` function, a number of future objects is generated one for each element of the tuple. This is enabled by providing the number of elements in the tuple as an additional parameter `n=3`. ","metadata":{}},{"id":"927c5931-d7fd-4bc1-ae0e-3dd2f4bc99ce","cell_type":"code","source":"from executorlib import split_future","metadata":{"trusted":true},"outputs":[],"execution_count":27},{"id":"64a9b46e-0de6-4abb-99d9-8aaa0c6cbccb","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = exe.submit(get_a_tuple, 15)\n f1, f2, f3 = split_future(future=future, n=3)\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"a b 15\n"}],"execution_count":28},{"id":"a10d8f90-e790-4708-b193-90a7f014d699","cell_type":"markdown","source":"In analogy, to the `split_future()` function for lists and tuples, the `get_item_from_future()` function returns one item of a dicitionary which is returned by a function submitted to an `Executor()`. In this example the `get_a_dict()` function returns a dictionary, again in this example the dictionary is returned directly while commonly this would take much longer. ","metadata":{}},{"id":"51ec4dbf-d009-41c9-bc04-cf1b16fe782f","cell_type":"code","source":"def get_a_dict(i):\n return {\"a\": 1, \"b\": 2, \"c\": i}","metadata":{"trusted":true},"outputs":[],"execution_count":29},{"id":"5c90ab75-2a6b-4fbe-a7ee-7f91b2b4af43","cell_type":"markdown","source":"The `get_a_dict()` function is submitted to the `SingleNodeExecutor()`, it returns a future object named `future_dict`. Still as we know that the `result()` of this future object `future_dict` returns a dictionary, we can already access the items of this dictionary with the `get_item_from_future()` function. The `get_item_from_future()` function takes a future object as input in addition to the `key` of the dictionary which should be accessed, as a result the `get_item_from_future()` function returns a future object for the value related to the key. In this example these future objects are named `f1`, `f2` and `f3`. These fucture objects are evaluated afterwards.","metadata":{}},{"id":"a41251d2-a840-4c12-9625-c2e0585c489b","cell_type":"code","source":"from executorlib import get_item_from_future","metadata":{"trusted":true},"outputs":[],"execution_count":30},{"id":"a6e75c5e-ad0c-4723-99c5-8125b63fbeaf","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_dict = exe.submit(get_a_dict, 15)\n f1 = get_item_from_future(future=future_dict, key=\"a\")\n f2 = get_item_from_future(future=future_dict, key=\"b\")\n f3 = get_item_from_future(future=future_dict, key=\"c\")\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"1 2 15\n"}],"execution_count":31},{"id":"a945c98d-dc73-4284-9d8c-52475a760f6b","cell_type":"markdown","source":"## Recursion\nIn addition to the `map()` function for parallel loops and the option to integrate `while` loops directly in the submission, it is sometimes helpful to use recursive algorthims to accelerate scientific simulation and analysis. To demonstrate the use of executorlib in combination with recursive algorithms, the quicksort algorithm is demonstrated. \n\nThe quicksort algorithm is implemented in two parts, a `quick_sort()` which splits the input `sequence` into two sets, one lower and one larger than the pivot element of the `sequence`. After the initial execution the `quick_sort()` function is again applied to both resulting sets until only a single element remains in the `sequence`. In this case the single element is returned. The pivot is defined as the first element of the `sequence`.\n\nTo simplify the submission of the `quick_sort()` function to the `SingleNodeExecutor()` dictionaries are used as return types, these dictionaries either contain only a single item with the key `\"result\"` when only a single element remains in the list, or three items, one `\"left\"` for a list lower than the pivot, one `\"right\"` for the items larger than the pivot and the `\"result\"` for the pivot itself. ","metadata":{}},{"id":"08b0c0a1-34ad-4b2a-847c-fe9c96956b27","cell_type":"code","source":"def quick_sort(sequence):\n length = len(sequence)\n if length <= 1:\n return {\"result\": sequence}\n else:\n pivot = sequence.pop() \n \n greater_items = []\n lesser_items = []\n\n for item in sequence:\n if item > pivot:\n greater_items.append(item)\n else:\n lesser_items.append(item)\n\n return {\"left\": lesser_items, \"right\": greater_items, \"result\": [pivot]}","metadata":{"trusted":true},"outputs":[],"execution_count":32},{"id":"54a8a765-0b9f-424a-86e5-8024c4d59018","cell_type":"markdown","source":"To enable the recursive submission to the `SingleNodeExecutor()` a `recusive_submit()` function is defined using the [asynchronous IO module](https://docs.python.org/3/library/asyncio.html) of the Python standard libary, indicated by the `async` keyword ahead of the function definition. It takes a `function`, the `sequence` and the `executor` as inputs. As a first step the function is submitted to the `executor` and the execution is halted until the first result is available by wrapping the future object using the `wrap_future()` function of the `asyncio` package and appling the `await` function. Then the corresponding result dictionary `result_dict` is evaluated: If the `result_dict` contains more than one key, then the `recusive_submit()` is evaluated for both the left side with lower values than the pivot element and the right side with elements higher than the pivot element. Again in both cases the `await` keyword is used to enable the parallel execution of both branches at the same time. Afterwards the result of both sides is combined with the pivot element. For the case that the dictionary only contains a single element, it is returned directly. ","metadata":{}},{"id":"4e142829-c9de-4969-ab3c-97f3e1ada90c","cell_type":"code","source":"import asyncio\n\nasync def recusive_submit(function, sequence, executor):\n result_dict = await asyncio.wrap_future(exe.submit(function, sequence))\n if len(result_dict) > 1:\n left = await recusive_submit(function=function, sequence=result_dict[\"left\"], executor=executor)\n right = await recusive_submit(function=function, sequence=result_dict[\"right\"], executor=executor)\n return left + result_dict[\"result\"] + right\n else:\n return result_dict[\"result\"]","metadata":{"trusted":true},"outputs":[],"execution_count":33},{"id":"531a8f7d-5b21-46d0-b632-32a1568571c8","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n loop = asyncio.get_event_loop()\n task = loop.create_task(recusive_submit(function=quick_sort, sequence=[0,9,3,8,2,7,5], executor=exe))\n print(await task)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[0, 2, 3, 5, 7, 8, 9]\n"}],"execution_count":34},{"id":"725c8ac2-27e4-495b-b061-1fd6053f4c0b","cell_type":"markdown","source":"## Testing and Debugging\nThe up-scaling of Python functions from a single workstation to an High Performance Computer (HPC) can be challenging, so executorlib provides a number of debugging utilities to help you optimize your functions for execution with executorlib. ","metadata":{}},{"id":"2c96b460-ebd2-4c8a-a5e1-922a860808ab","cell_type":"markdown","source":"### Measure Data Transferred \nTransferring a large amount of data between two processes requires additional resources so it is helpful to measure the data transferred between the frontend and backend process. This is achieved by setting the `log_obj_size` parameter to `True`:","metadata":{}},{"id":"71905acf-2fef-4738-82ae-89e35e2e3d2d","cell_type":"code","source":"from executorlib import SingleNodeExecutor\n\nwith SingleNodeExecutor(log_obj_size=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stderr","output_type":"stream","text":"Send dictionary of size: 101\nReceived dictionary of size: 59\nSend dictionary of size: 69\nReceived dictionary of size: 58\n"},{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":35},{"id":"0e9f7192-2b7d-4caf-b2b8-ae9cb060e90f","cell_type":"markdown","source":"### Write Log Files\nLibraries like executorlib are commonly used to sample a large parameter space, in this case it is possible that out of a large number of parameters one combination throws an error. This error can be logged in a file which also contains the function and input parameters using the `\"error_log_file\"` parameter in the `resource_dict`. This allows to change the log file on a per-function bases.","metadata":{}},{"id":"2810acd5-a46e-4d85-b447-945248ffca15","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":36},{"id":"ef6b87a7-361e-4a3f-bad3-72ad87968e8c","cell_type":"code","source":"def my_funct(i, j): \n if i == 2 and j == 2:\n raise ValueError()\n else: \n return i * j + i + j","metadata":{"trusted":true},"outputs":[],"execution_count":37},{"id":"209ff76b-913a-46f7-9fff-250e207898b2","cell_type":"markdown","source":"A try and except statement is added to prevent the jupyter notebook from crashing:","metadata":{}},{"id":"30381a00-30cc-45a2-82ff-17371339f5c7","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"error_log_file\": \"error.out\"}) as exe:\n future_lst = []\n for i in range(4):\n for j in range(4):\n future_lst.append(exe.submit(my_funct, i=i, j=j))\n try:\n print([f.result() for f in future_lst])\n except ValueError:\n pass","metadata":{"trusted":true},"outputs":[],"execution_count":38},{"id":"36bfc556-aa86-4b50-972b-4b8f68ceec3d","cell_type":"markdown","source":"The content of the log file is a basic text file, so it can be read with any kind of file utility. The important part is that the log file contains not only the error message but in addition also the function name and the input parameters in the case `kwargs: {'i': 2, 'j': 2}` which helps for future debugging of the sampling function.","metadata":{}},{"id":"a61c06fc-3029-48ed-895f-9f6c065b7c47","cell_type":"code","source":"with open(\"error.out\") as f:\n content = f.readlines()","metadata":{"trusted":true},"outputs":[],"execution_count":39},{"id":"6a6cd10f-6fe3-4a17-a8e6-2536feb9a11a","cell_type":"code","source":"content","metadata":{"trusted":true},"outputs":[{"execution_count":40,"output_type":"execute_result","data":{"text/plain":"['function: \\n',\n 'args: ()\\n',\n \"kwargs: {'i': 2, 'j': 2}\\n\",\n 'Traceback (most recent call last):\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/backend/interactive_serial.py\", line 56, in main\\n',\n ' output = call_funct(input_dict=input_dict, funct=None, memory=memory)\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 33, in call_funct\\n',\n ' return funct(input_dict[\"fn\"], *input_dict[\"args\"], **input_dict[\"kwargs\"])\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 22, in funct\\n',\n ' return args[0].__call__(*args[1:], **kwargs)\\n',\n ' ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^\\n',\n ' File \"/tmp/ipykernel_108/3167739528.py\", line 3, in my_funct\\n',\n 'ValueError\\n']"},"metadata":{}}],"execution_count":40},{"id":"5aff120a-317f-47d4-9639-8eccfb136117","cell_type":"code","source":"import os\n\nif os.path.exists(\"error.out\"):\n os.remove(\"error.out\")","metadata":{"trusted":true},"outputs":[],"execution_count":41},{"id":"44cd41d7-4417-429e-8481-f2a49e5f769c","cell_type":"markdown","source":"### TestClusterExecutor\nWhile the `SingleNodeExecutor` internally behaves very similar to the `FluxJobExecutor` and `SlurmJobExecutor` the `FluxClusterExecutor` and `SlurmClusterExecutor` behave very different as they use the file system to exchange information rather than socket-based communication. This can lead to complications when it comes to debugging. To address this challenge executorlib provides the `TestClusterExecutor` which can be executed on a local workstation just like the `SingleNodeExecutor` but in the background it uses the same file based communication like the `SlurmClusterExecutor` and the `FluxClusterExecutor`:","metadata":{}},{"id":"2ae29fb0-54e5-468f-8727-a179b6ed363e","cell_type":"code","source":"from executorlib.api import TestClusterExecutor\n\nwith TestClusterExecutor(cache_directory=\"test\") as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":42}]} \ No newline at end of file +{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.13.9","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"6915218e-7cf3-4bf4-9618-7e6942b4762f","cell_type":"markdown","source":"# Single Node Executor\nThe `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n\nAn advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. ","metadata":{}},{"id":"ccc686dd-8fc5-4755-8a19-f40010ebb1b8","cell_type":"markdown","source":"## Basic Functionality\nThe general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib.","metadata":{}},{"id":"b1907f12-7378-423b-9b83-1b65fc0a20f5","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"1654679f-38b3-4699-9bfe-b48cbde0b2db","cell_type":"markdown","source":"It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:","metadata":{}},{"id":"16f7d138-ed77-45ea-a554-d329f7237500","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 45.6 ms, sys: 86.1 ms, total: 132 ms\nWall time: 285 ms\n"}],"execution_count":2},{"id":"a1109584-9db2-4f9d-b3ed-494d96241396","cell_type":"markdown","source":"As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement.","metadata":{}},{"id":"cfccdf9a-b23b-4814-8c14-36703a8a5f9e","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[4, 6, 8]\nCPU times: user 23.9 ms, sys: 15.2 ms, total: 39.1 ms\nWall time: 622 ms\n"}],"execution_count":3},{"id":"7db58f70-8137-4f1c-a87b-0d282f2bc3c5","cell_type":"markdown","source":"If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). ","metadata":{}},{"id":"abd0beb7-471d-490e-bb9c-96755bd7aacf","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n print(list(results))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[10, 12, 14]\nCPU times: user 28.2 ms, sys: 12.8 ms, total: 41 ms\nWall time: 673 ms\n"}],"execution_count":4},{"id":"ac86bf47-4eb6-4d7c-acae-760b880803a8","cell_type":"markdown","source":"These three examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library.","metadata":{}},{"id":"5de0f0f2-bf5c-46b3-8171-a3a206ce6775","cell_type":"markdown","source":"## Parallel Functions\nWriting parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. ","metadata":{}},{"id":"dc8e692f-bf6c-4838-bb82-6a6b8454a2e7","cell_type":"markdown","source":"### MPI Parallel Functions\nMPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n\nThe following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n\nThe [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section.","metadata":{}},{"id":"a251d083-489e-41c1-9e49-c86093858006","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"adbf8a10-04e1-4fd9-8768-4375bcba9ec3","cell_type":"markdown","source":"The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:","metadata":{}},{"id":"266864f1-d29e-4934-9b5d-51f4ffb11f5c","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":6},{"id":"3a449e3f-d7a4-4056-a1d0-35dfca4dad22","cell_type":"markdown","source":"Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again.","metadata":{}},{"id":"cb4ad978-bdf2-47bb-a7df-846641a54ec2","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":7},{"id":"c1d1d7b1-64fa-4e47-bbde-2a16036568d6","cell_type":"markdown","source":"In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \nManagement (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).","metadata":{}},{"id":"4f5c5221-d99c-4614-82b1-9d6d3260c1bf","cell_type":"markdown","source":"### Thread Parallel Functions\nAn alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`.","metadata":{}},{"id":"7a7d21f6-9f1a-4f30-8024-9993e156dc75","cell_type":"code","source":"def calc_with_threads(i):\n import os\n\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":8},{"id":"82ed8f46-836c-402e-9363-be6e16c2a0b0","cell_type":"markdown","source":"Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:","metadata":{}},{"id":"b8ed330d-ee77-44a0-a02f-670fa945b043","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":9},{"id":"63222cd5-664b-4aba-a80c-5814166b1239","cell_type":"markdown","source":"Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:","metadata":{}},{"id":"31562f89-c01c-4e7a-bbdd-fa26ca99e68b","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n fs = exe.submit(calc_with_threads, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":10},{"id":"8b78a7b4-066e-4cbc-858e-606c8bbbbf0c","cell_type":"markdown","source":"For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n\nBeyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions.","metadata":{}},{"id":"ca9bc450-2762-4d49-b7f8-48cc83e068fd","cell_type":"markdown","source":"## Performance Optimization\nThe default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.","metadata":{}},{"id":"e9b52ecf-3984-4695-98e7-315aa3712104","cell_type":"markdown","source":"### Block Allocation\nBy default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. ","metadata":{}},{"id":"0da4c7d0-2268-4ea8-b62d-5d94c79ebc72","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 23.6 ms, sys: 9.69 ms, total: 33.3 ms\nWall time: 360 ms\n"}],"execution_count":11},{"id":"d38163b3-1c04-431c-964b-2bad4f823a4d","cell_type":"markdown","source":"The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: ","metadata":{}},{"id":"cb8c4943-4c78-4203-95f2-1db758e588d9","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":12},{"id":"9e1212c4-e3fb-4e21-be43-0a4f0a08b856","cell_type":"markdown","source":"Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.","metadata":{}},{"id":"5ebf7195-58f9-40f2-8203-2d4b9f0e9602","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":13},{"id":"b75fb95f-f2f5-4be9-9f2a-9c2e9961c644","cell_type":"markdown","source":"The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.","metadata":{}},{"id":"8aa754cc-eb1a-4fa1-bd72-272246df1d2f","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":14},{"id":"1854895a-7239-4b30-b60d-cf1a89234464","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":15},{"id":"d07cf107-3627-4cb0-906c-647497d6e0d2","cell_type":"markdown","source":"The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored.","metadata":{}},{"id":"cc648799-a0c6-4878-a469-97457bce024f","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":16},{"id":"1073b8ca-1492-46e9-8d1f-f52ad48d28a2","cell_type":"markdown","source":"The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function.","metadata":{}},{"id":"24397d78-dff1-4834-830c-a8f390fe6b9c","cell_type":"markdown","source":"### Cache\nThe development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n\nThe [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. ","metadata":{}},{"id":"ecdcef49-5c89-4538-b377-d53979673bf7","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 501 ms, sys: 52.8 ms, total: 554 ms\nWall time: 1.11 s\n"}],"execution_count":17},{"id":"32d0fb2e-5ac1-4249-b6c8-953c92fdfded","cell_type":"markdown","source":"When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n\nStill it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. ","metadata":{}},{"id":"c39babe8-4370-4d31-9520-9a7ce63378c8","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 42.1 ms, sys: 24.3 ms, total: 66.4 ms\nWall time: 691 ms\n"}],"execution_count":18},{"id":"5144a035-633e-4e60-a362-f3b15b28848b","cell_type":"markdown","source":"An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:","metadata":{}},{"id":"f574b9e1-de55-4e38-aef7-a4bed540e040","cell_type":"code","source":"import pandas\nfrom executorlib import get_cache_data\n\ndf = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\ndf","metadata":{"trusted":true},"outputs":[{"execution_count":19,"output_type":"execute_result","data":{"text/plain":" function input_args input_kwargs output runtime \\\n0 ([2, 2],) {} 4 0.001219 \n1 ([3, 3],) {} 6 0.015866 \n2 ([1, 1],) {} 2 0.018141 \n\n filename \n0 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f... \n1 /home/jovyan/file/sum0f7710227cda6456e5d071877... \n2 /home/jovyan/file/sumf5ad27b855231a293ddd735a8... ","text/html":"
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
functioninput_argsinput_kwargsoutputruntimefilename
0<built-in function sum>([2, 2],){}40.001219/home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
1<built-in function sum>([3, 3],){}60.015866/home/jovyan/file/sum0f7710227cda6456e5d071877...
2<built-in function sum>([1, 1],){}20.018141/home/jovyan/file/sumf5ad27b855231a293ddd735a8...
\n
"},"metadata":{}}],"execution_count":19},{"id":"68092479-e846-494a-9ac9-d9638b102bd8","cell_type":"markdown","source":"After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. ","metadata":{}},{"id":"34a9316d-577f-4a63-af14-736fb4e6b219","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":20},{"id":"1cea95b5-4110-444c-82af-fa6718bfa17f","cell_type":"markdown","source":"Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. ","metadata":{}},{"id":"71a8a0be-a933-4e83-9da5-50da35e9975b","cell_type":"markdown","source":"### Dependencies\nMany scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function.","metadata":{}},{"id":"d8b75a26-479d-405e-8895-a8d56b3f0f4b","cell_type":"code","source":"def calc_add(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":21},{"id":"36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078","cell_type":"markdown","source":"For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n\nThe important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`.","metadata":{}},{"id":"35fd5747-c57d-4926-8d83-d5c55a130ad6","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":22},{"id":"38e1bbb3-1028-4f50-93c1-d2427f399a7d","cell_type":"markdown","source":"As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n\nNo computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n\nInternally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. ","metadata":{}},{"id":"f67470b5-af1d-4add-9de8-7f259ca67324","cell_type":"code","source":"with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"None\n"},{"output_type":"display_data","data":{"text/plain":"","image/svg+xml":"\n\n\n\n\n0\n\ncalc_add\n\n\n\n1\n\ncalc_add\n\n\n\n0->1\n\n\n\n\n\n2\n\ncalc_add\n\n\n\n1->2\n\n\n\n\n\n3\n\n1\n\n\n\n3->0\n\n\n\n\n\n4\n\n0\n\n\n\n4->0\n\n\n\n\n\n5\n\n2\n\n\n\n5->1\n\n\n\n\n\n6\n\n3\n\n\n\n6->2\n\n\n\n\n"},"metadata":{}}],"execution_count":23},{"id":"4c952a9d-2b58-401d-b7ec-fda740db774a","cell_type":"markdown","source":"## Advanced Scheduling\nGoing beyond just directed acyclic graphs (DAG) with one-to-many and many-to-one relationships, executorlib provides a number of advanced scheduling patterns. These are briefly introduced in the following.","metadata":{}},{"id":"4db2c87c-9a7d-4074-82d0-24357fa4f0e6","cell_type":"markdown","source":"### Runtime-dependent Batching \nTo maximize the throughput of dependent calculation tasks its important to idenify all tasks which can be executed at a given moment. Unfortunately, some of these dependencies can only be determined at run time, which is challenging for most schedulers. To demonstrate the runtime-dependent batching in executorlib we discuss the following example. Starting with a group of ten tasks and then grouping them into groups of three for processing. Still the order of the tasks, which tasks belong into which group, is only determined at run time. \n\nFor simplicity, we just use a simple function which directly returns the input.","metadata":{}},{"id":"857a5e0f-50d6-45ec-aac7-a7151f36c19f","cell_type":"code","source":"def reply(i):\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":24},{"id":"544dacab-5601-4a9b-814f-88560eafb079","cell_type":"markdown","source":"After the group of ten tasks is submitted their future objects are stored in a list named `future_individual_lst`. This list is then provided to the `batched()` function of the `SingleNodeExecutor()` to generate batches of tasks which are then provided to the `sum` function for further processing. The results of this second step are stored in the `future_group_lst`. Finally, the results of these future objects are evaluated in the third step. ","metadata":{}},{"id":"17d1354a-0943-4b62-9b0d-7d39c8df23f2","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_individual_lst = [\n exe.submit(reply, i) for i in range(10)\n ]\n future_group_lst = [\n exe.submit(sum, f) for f in exe.batched(future_individual_lst, n=3)\n ]\n print(sum([f.result() for f in future_group_lst]))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"45\n"}],"execution_count":25},{"id":"80e8b154-d659-4f3e-add2-701a454d770d","cell_type":"markdown","source":"### Split Future Objects\nIn analogy to the `batched()` function which combines multiple future objects in a batch for further processing, it is also necessary to split the future objects even before the evaluation is completed. Executorlib provides two utility functions, namely `split_future()` for tuples and lists and `get_item_from_future()` for dictionaries. \n\nStarting with a function which returns a tuple, named `get_a_tuple()`:","metadata":{}},{"id":"6ef541e3-a2be-4e05-9e77-65eaceff4248","cell_type":"code","source":"def get_a_tuple(i):\n return \"a\", \"b\", i","metadata":{"trusted":true},"outputs":[],"execution_count":26},{"id":"dd3e4312-1594-4b3b-baf3-5b3121ca8910","cell_type":"markdown","source":"This function is submitted to the `SingleNodeExecutor()` and while in this case it directly returns the tuple, the evaluation would commonly take much longer. By having the ability to split the output of the future object using the `split_future()` function, a number of future objects is generated one for each element of the tuple. This is enabled by providing the number of elements in the tuple as an additional parameter `n=3`. ","metadata":{}},{"id":"927c5931-d7fd-4bc1-ae0e-3dd2f4bc99ce","cell_type":"code","source":"from executorlib import split_future","metadata":{"trusted":true},"outputs":[],"execution_count":27},{"id":"64a9b46e-0de6-4abb-99d9-8aaa0c6cbccb","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = exe.submit(get_a_tuple, 15)\n f1, f2, f3 = split_future(future=future, n=3)\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"a b 15\n"}],"execution_count":28},{"id":"a10d8f90-e790-4708-b193-90a7f014d699","cell_type":"markdown","source":"In analogy, to the `split_future()` function for lists and tuples, the `get_item_from_future()` function returns one item of a dicitionary which is returned by a function submitted to an `Executor()`. In this example the `get_a_dict()` function returns a dictionary, again in this example the dictionary is returned directly while commonly this would take much longer. ","metadata":{}},{"id":"51ec4dbf-d009-41c9-bc04-cf1b16fe782f","cell_type":"code","source":"def get_a_dict(i):\n return {\"a\": 1, \"b\": 2, \"c\": i}","metadata":{"trusted":true},"outputs":[],"execution_count":29},{"id":"5c90ab75-2a6b-4fbe-a7ee-7f91b2b4af43","cell_type":"markdown","source":"The `get_a_dict()` function is submitted to the `SingleNodeExecutor()`, it returns a future object named `future_dict`. Still as we know that the `result()` of this future object `future_dict` returns a dictionary, we can already access the items of this dictionary with the `get_item_from_future()` function. The `get_item_from_future()` function takes a future object as input in addition to the `key` of the dictionary which should be accessed, as a result the `get_item_from_future()` function returns a future object for the value related to the key. In this example these future objects are named `f1`, `f2` and `f3`. These fucture objects are evaluated afterwards.","metadata":{}},{"id":"a41251d2-a840-4c12-9625-c2e0585c489b","cell_type":"code","source":"from executorlib import get_item_from_future","metadata":{"trusted":true},"outputs":[],"execution_count":30},{"id":"a6e75c5e-ad0c-4723-99c5-8125b63fbeaf","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_dict = exe.submit(get_a_dict, 15)\n f1 = get_item_from_future(future=future_dict, key=\"a\")\n f2 = get_item_from_future(future=future_dict, key=\"b\")\n f3 = get_item_from_future(future=future_dict, key=\"c\")\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"1 2 15\n"}],"execution_count":31},{"id":"a945c98d-dc73-4284-9d8c-52475a760f6b","cell_type":"markdown","source":"### Recursion\nIn addition to the `map()` function for parallel loops and the option to integrate `while` loops directly in the submission, it is sometimes helpful to use recursive algorthims to accelerate scientific simulation and analysis. To demonstrate the use of executorlib in combination with recursive algorithms, the quicksort algorithm is demonstrated. \n\nThe quicksort algorithm is implemented in two parts, a `quick_sort()` which splits the input `sequence` into two sets, one lower and one larger than the pivot element of the `sequence`. After the initial execution the `quick_sort()` function is again applied to both resulting sets until only a single element remains in the `sequence`. In this case the single element is returned. The pivot is defined as the first element of the `sequence`.\n\nTo simplify the submission of the `quick_sort()` function to the `SingleNodeExecutor()` dictionaries are used as return types, these dictionaries either contain only a single item with the key `\"result\"` when only a single element remains in the list, or three items, one `\"left\"` for a list lower than the pivot, one `\"right\"` for the items larger than the pivot and the `\"result\"` for the pivot itself. ","metadata":{}},{"id":"08b0c0a1-34ad-4b2a-847c-fe9c96956b27","cell_type":"code","source":"def quick_sort(sequence):\n length = len(sequence)\n if length <= 1:\n return {\"result\": sequence}\n else:\n pivot = sequence.pop() \n \n greater_items = []\n lesser_items = []\n\n for item in sequence:\n if item > pivot:\n greater_items.append(item)\n else:\n lesser_items.append(item)\n\n return {\"left\": lesser_items, \"right\": greater_items, \"result\": [pivot]}","metadata":{"trusted":true},"outputs":[],"execution_count":32},{"id":"54a8a765-0b9f-424a-86e5-8024c4d59018","cell_type":"markdown","source":"To enable the recursive submission to the `SingleNodeExecutor()` a `recusive_submit()` function is defined using the [asynchronous IO module](https://docs.python.org/3/library/asyncio.html) of the Python standard libary, indicated by the `async` keyword ahead of the function definition. It takes a `function`, the `sequence` and the `executor` as inputs. As a first step the function is submitted to the `executor` and the execution is halted until the first result is available by wrapping the future object using the `wrap_future()` function of the `asyncio` package and appling the `await` function. Then the corresponding result dictionary `result_dict` is evaluated: If the `result_dict` contains more than one key, then the `recusive_submit()` is evaluated for both the left side with lower values than the pivot element and the right side with elements higher than the pivot element. Again in both cases the `await` keyword is used to enable the parallel execution of both branches at the same time. Afterwards the result of both sides is combined with the pivot element. For the case that the dictionary only contains a single element, it is returned directly. ","metadata":{}},{"id":"4e142829-c9de-4969-ab3c-97f3e1ada90c","cell_type":"code","source":"import asyncio\n\nasync def recusive_submit(function, sequence, executor):\n result_dict = await asyncio.wrap_future(exe.submit(function, sequence))\n if len(result_dict) > 1:\n left = await recusive_submit(function=function, sequence=result_dict[\"left\"], executor=executor)\n right = await recusive_submit(function=function, sequence=result_dict[\"right\"], executor=executor)\n return left + result_dict[\"result\"] + right\n else:\n return result_dict[\"result\"]","metadata":{"trusted":true},"outputs":[],"execution_count":33},{"id":"531a8f7d-5b21-46d0-b632-32a1568571c8","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n loop = asyncio.get_event_loop()\n task = loop.create_task(recusive_submit(function=quick_sort, sequence=[0,9,3,8,2,7,5], executor=exe))\n print(await task)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[0, 2, 3, 5, 7, 8, 9]\n"}],"execution_count":34},{"id":"725c8ac2-27e4-495b-b061-1fd6053f4c0b","cell_type":"markdown","source":"## Testing and Debugging\nThe up-scaling of Python functions from a single workstation to an High Performance Computer (HPC) can be challenging, so executorlib provides a number of debugging utilities to help you optimize your functions for execution with executorlib. ","metadata":{}},{"id":"2c96b460-ebd2-4c8a-a5e1-922a860808ab","cell_type":"markdown","source":"### Measure Data Transferred \nTransferring a large amount of data between two processes requires additional resources so it is helpful to measure the data transferred between the frontend and backend process. This is achieved by setting the `log_obj_size` parameter to `True`:","metadata":{}},{"id":"71905acf-2fef-4738-82ae-89e35e2e3d2d","cell_type":"code","source":"from executorlib import SingleNodeExecutor\n\nwith SingleNodeExecutor(log_obj_size=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stderr","output_type":"stream","text":"Send dictionary of size: 101\nReceived dictionary of size: 59\nSend dictionary of size: 69\nReceived dictionary of size: 58\n"},{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":35},{"id":"0e9f7192-2b7d-4caf-b2b8-ae9cb060e90f","cell_type":"markdown","source":"### Write Log Files\nLibraries like executorlib are commonly used to sample a large parameter space, in this case it is possible that out of a large number of parameters one combination throws an error. This error can be logged in a file which also contains the function and input parameters using the `\"error_log_file\"` parameter in the `resource_dict`. This allows to change the log file on a per-function bases.","metadata":{}},{"id":"2810acd5-a46e-4d85-b447-945248ffca15","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":36},{"id":"ef6b87a7-361e-4a3f-bad3-72ad87968e8c","cell_type":"code","source":"def my_funct(i, j): \n if i == 2 and j == 2:\n raise ValueError()\n else: \n return i * j + i + j","metadata":{"trusted":true},"outputs":[],"execution_count":37},{"id":"209ff76b-913a-46f7-9fff-250e207898b2","cell_type":"markdown","source":"A try and except statement is added to prevent the jupyter notebook from crashing:","metadata":{}},{"id":"30381a00-30cc-45a2-82ff-17371339f5c7","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"error_log_file\": \"error.out\"}) as exe:\n future_lst = []\n for i in range(4):\n for j in range(4):\n future_lst.append(exe.submit(my_funct, i=i, j=j))\n try:\n print([f.result() for f in future_lst])\n except ValueError:\n pass","metadata":{"trusted":true},"outputs":[],"execution_count":38},{"id":"36bfc556-aa86-4b50-972b-4b8f68ceec3d","cell_type":"markdown","source":"The content of the log file is a basic text file, so it can be read with any kind of file utility. The important part is that the log file contains not only the error message but in addition also the function name and the input parameters in the case `kwargs: {'i': 2, 'j': 2}` which helps for future debugging of the sampling function.","metadata":{}},{"id":"a61c06fc-3029-48ed-895f-9f6c065b7c47","cell_type":"code","source":"with open(\"error.out\") as f:\n content = f.readlines()","metadata":{"trusted":true},"outputs":[],"execution_count":39},{"id":"6a6cd10f-6fe3-4a17-a8e6-2536feb9a11a","cell_type":"code","source":"content","metadata":{"trusted":true},"outputs":[{"execution_count":40,"output_type":"execute_result","data":{"text/plain":"['function: \\n',\n 'args: ()\\n',\n \"kwargs: {'i': 2, 'j': 2}\\n\",\n 'Traceback (most recent call last):\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/backend/interactive_serial.py\", line 56, in main\\n',\n ' output = call_funct(input_dict=input_dict, funct=None, memory=memory)\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 33, in call_funct\\n',\n ' return funct(input_dict[\"fn\"], *input_dict[\"args\"], **input_dict[\"kwargs\"])\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 22, in funct\\n',\n ' return args[0].__call__(*args[1:], **kwargs)\\n',\n ' ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^\\n',\n ' File \"/tmp/ipykernel_108/3167739528.py\", line 3, in my_funct\\n',\n 'ValueError\\n']"},"metadata":{}}],"execution_count":40},{"id":"5aff120a-317f-47d4-9639-8eccfb136117","cell_type":"code","source":"import os\n\nif os.path.exists(\"error.out\"):\n os.remove(\"error.out\")","metadata":{"trusted":true},"outputs":[],"execution_count":41},{"id":"44cd41d7-4417-429e-8481-f2a49e5f769c","cell_type":"markdown","source":"### TestClusterExecutor\nWhile the `SingleNodeExecutor` internally behaves very similar to the `FluxJobExecutor` and `SlurmJobExecutor` the `FluxClusterExecutor` and `SlurmClusterExecutor` behave very different as they use the file system to exchange information rather than socket-based communication. This can lead to complications when it comes to debugging. To address this challenge executorlib provides the `TestClusterExecutor` which can be executed on a local workstation just like the `SingleNodeExecutor` but in the background it uses the same file based communication like the `SlurmClusterExecutor` and the `FluxClusterExecutor`:","metadata":{}},{"id":"2ae29fb0-54e5-468f-8727-a179b6ed363e","cell_type":"code","source":"from executorlib.api import TestClusterExecutor\n\nwith TestClusterExecutor(cache_directory=\"test\") as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":42}]}