diff --git a/notebooks/2-hpc-cluster.ipynb b/notebooks/2-hpc-cluster.ipynb index ee140a0c..abfbda8c 100644 --- a/notebooks/2-hpc-cluster.ipynb +++ b/notebooks/2-hpc-cluster.ipynb @@ -1,274 +1 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", - "metadata": {}, - "source": [ - "# HPC Cluster Executor\n", - "In contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n", - "\n", - "Internally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. " - ] - }, - { - "cell_type": "markdown", - "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", - "metadata": {}, - "source": [ - "## SLURM\n", - "The [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n", - "\n", - "The connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io)." - ] - }, - { - "cell_type": "markdown", - "id": "db7760e8-35a6-4a1c-8b0f-410b536c3835", - "metadata": {}, - "source": [ - "```python\n", - "from executorlib import SlurmClusterExecutor\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "b20913f3-59e4-418c-a399-866124f8e497", - "metadata": {}, - "source": "In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows." - }, - { - "cell_type": "markdown", - "id": "0b8f3b77-6199-4736-9f28-3058c5230777", - "metadata": {}, - "source": [ - "```python\n", - "with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n", - " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", - " print([f.result() for f in future_lst])\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", - "metadata": {}, - "source": "Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`." - }, - { - "cell_type": "markdown", - "id": "658781de-f222-4235-8c26-b0f77a0831b3", - "metadata": {}, - "source": [ - "```python\n", - "submission_template = \"\"\"\\\n", - "#!/bin/bash\n", - "#SBATCH --output=time.out\n", - "#SBATCH --job-name={{job_name}}\n", - "#SBATCH --chdir={{working_directory}}\n", - "#SBATCH --get-user-env=L\n", - "#SBATCH --partition={{partition}}\n", - "{%- if run_time_max %}\n", - "#SBATCH --time={{ [1, run_time_max // 60]|max }}\n", - "{%- endif %}\n", - "{%- if dependency %}\n", - "#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n", - "{%- endif %}\n", - "{%- if memory_max %}\n", - "#SBATCH --mem={{memory_max}}G\n", - "{%- endif %}\n", - "#SBATCH --cpus-per-task={{cores}}\n", - "\n", - "{{command}}\n", - "\"\"\"\n", - "\n", - "with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n", - " future = exe.submit(\n", - " sum, [4, 4], \n", - " resource_dict={\n", - " \"submission_template\": submission_template, \n", - " \"run_time_max\": 180, # in seconds \n", - " })\n", - " print([f.result() for f in future_lst])\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", - "metadata": {}, - "source": [ - "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. " - ] - }, - { - "cell_type": "markdown", - "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", - "metadata": {}, - "source": [ - "## Flux\n", - "While most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above." - ] - }, - { - "cell_type": "markdown", - "id": "29d7aa18-357e-416e-805c-1322b59abec1", - "metadata": {}, - "source": [ - "### Dependencies\n", - "As already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter." - ] - }, - { - "cell_type": "markdown", - "id": "3d55176a-facc-4ff5-91cd-690d480bd5b8", - "metadata": {}, - "source": [ - "```python\n", - "def add_funct(a, b):\n", - " return a + b\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "77125681-9344-43c4-8904-46d48cb90104", - "metadata": {}, - "source": [ - "```python\n", - "from executorlib import FluxClusterExecutor\n", - "\n", - "with FluxClusterExecutor(cache_directory=\"./cache\") as exe:\n", - " future = 0\n", - " for i in range(4, 8):\n", - " future = exe.submit(add_funct, i, future)\n", - " print(future.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", - "metadata": {}, - "source": [ - "### Resource Assignment\n", - "In analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n", - "\n", - "Below this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package." - ] - }, - { - "cell_type": "markdown", - "id": "ea800f9a-6915-4b5a-bc57-2e072cc95437", - "metadata": {}, - "source": [ - "```python\n", - "def calc(i):\n", - " from mpi4py import MPI\n", - "\n", - " size = MPI.COMM_WORLD.Get_size()\n", - " rank = MPI.COMM_WORLD.Get_rank()\n", - " return i, size, rank\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "8bebb1b4-25fc-4f57-8633-a2677b712a87", - "metadata": {}, - "source": [ - "```python\n", - "with FluxClusterExecutor(cache_directory=\"./cache\") as exe:\n", - " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", - " print(fs.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", - "metadata": {}, - "source": [ - "Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n", - "```python\n", - "def get_available_gpus():\n", - " import socket\n", - " from tensorflow.python.client import device_lib\n", - " local_device_protos = device_lib.list_local_devices()\n", - " return [\n", - " (x.name, x.physical_device_desc, socket.gethostname()) \n", - " for x in local_device_protos if x.device_type == 'GPU'\n", - " ]\n", - "```\n", - "\n", - "```python\n", - "with FluxClusterExecutor(\n", - " cache_directory=\"./cache\",\n", - " resource_dict={\"gpus_per_core\": 1}\n", - ") as exe:\n", - " fs_1 = exe.submit(get_available_gpus)\n", - " fs_2 = exe.submit(get_available_gpus)\n", - " print(fs_1.result(), fs_2.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", - "metadata": {}, - "source": [ - "### Cleaning Cache\n", - "Finally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data." - ] - }, - { - "cell_type": "markdown", - "id": "481eeb82-9240-4fdf-84ab-87e39681d201", - "metadata": {}, - "source": [ - "```python\n", - "import os\n", - "import shutil\n", - "\n", - "cache_dir = \"./cache\"\n", - "if os.path.exists(cache_dir):\n", - " print(os.listdir(cache_dir))\n", - " try:\n", - " shutil.rmtree(cache_dir)\n", - " except OSError:\n", - " pass\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1de93586-d302-4aa6-878a-51acfb1d3009", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.5" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} +{"metadata":{"kernelspec":{"display_name":"Flux","language":"python","name":"flux"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.12.9"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"db7760e8-35a6-4a1c-8b0f-410b536c3835","cell_type":"markdown","source":"```python\nfrom executorlib import SlurmClusterExecutor\n```","metadata":{}},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --cpus-per-task={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n })\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ","metadata":{}},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\nWhile most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"0f7fc37a-1248-492d-91ab-9db1d737eaee","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"ae308683-6083-4e78-afc2-bff6c6dc297b","cell_type":"code","source":"from executorlib import FluxClusterExecutor\n\nwith FluxClusterExecutor(cache_directory=\"./cache\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"22\n"}],"execution_count":2},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.","metadata":{}},{"id":"eded3a0f-e54f-44f6-962f-eedde4bd2158","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n","metadata":{"trusted":true},"outputs":[],"execution_count":3},{"id":"669b05df-3cb2-4f69-9d94-8b2442745ebb","cell_type":"code","source":"with FluxClusterExecutor(cache_directory=\"./cache\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":4},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"### Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"f537b4f6-cc98-43da-8aca-94a823bcbcbd","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./cache\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['add_functdce32a0e7f6eac9e4e19fec335b79726', 'calc76234667eef65c770fecf54645ef8ada', 'add_functee0545e0d3edb8a4a6ceb6d5ae712d39', 'add_funct3263a1038c0d088677685b6eccd9f7b7', 'add_funct6034ded02bdb3ff97695f3a94455ca4d']\n"}],"execution_count":5}]} \ No newline at end of file diff --git a/notebooks/3-hpc-job.ipynb b/notebooks/3-hpc-job.ipynb index 486405d8..e21fdaf8 100644 --- a/notebooks/3-hpc-job.ipynb +++ b/notebooks/3-hpc-job.ipynb @@ -1,436 +1 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "87c3425d-5abe-4e0b-a948-e371808c322c", - "metadata": {}, - "source": [ - "# HPC Job Executor\n", - "In contrast to the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) which submits individual Python functions to HPC job schedulers, the HPC Job Executors take a given job allocation of the HPC job scheduler and executes Python functions with the resources available in this job allocation. In this regard it is similar to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) as it communicates with the individual Python processes using the [zero message queue](https://zeromq.org/), still it is more advanced as it can access the computational resources of all compute nodes of the given HPC job allocation and also provides the option to assign GPUs as accelerators for parallel execution.\n", - "\n", - "Available Functionality: \n", - "* Submit Python functions with the [submit() function or the map() function](https://executorlib.readthedocs.io/en/latest/1-single-node.html#basic-functionality).\n", - "* Support for parallel execution, either using the [message passing interface (MPI)](https://executorlib.readthedocs.io/en/latest/1-single-node.html#mpi-parallel-functions), [thread based parallelism](https://executorlib.readthedocs.io/en/latest/1-single-node.html#thread-parallel-functions) or by [assigning dedicated GPUs](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#resource-assignment) to selected Python functions. All these resources assignments are handled via the [resource dictionary parameter resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).\n", - "* Performance optimization features, like [block allocation](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation), [dependency resolution](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) and [caching](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache).\n", - "\n", - "The only parameter the user has to change is the `backend` parameter. " - ] - }, - { - "cell_type": "markdown", - "id": "8c788b9f-6b54-4ce0-a864-4526b7f6f170", - "metadata": {}, - "source": [ - "## SLURM\n", - "With the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com/) currently being the most commonly used job scheduler, executorlib provides an interface to submit Python functions to SLURM. Internally, this is based on the [srun](https://slurm.schedmd.com/srun.html) command of the SLURM scheduler, which creates job steps in a given allocation. Given that all resource requests in SLURM are communicated via a central database a large number of submitted Python functions and resulting job steps can slow down the performance of SLURM. To address this limitation it is recommended to install the hierarchical job scheduler [flux](https://flux-framework.org/) in addition to SLURM, to use flux for distributing the resources within a given allocation. This configuration is discussed in more detail below in the section [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux)." - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "133b751f-0925-4d11-99f0-3f8dd9360b54", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import SlurmJobExecutor" - ] - }, - { - "cell_type": "markdown", - "id": "9b74944e-2ccd-4cb0-860a-d876310ea870", - "metadata": {}, - "source": [ - "```python\n", - "with SlurmAllocationExecutor() as exe:\n", - " future = exe.submit(sum, [1, 1])\n", - " print(future.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "36e2d68a-f093-4082-933a-d95bfe7a60c6", - "metadata": {}, - "source": [ - "## SLURM with Flux \n", - "As discussed in the installation section it is important to select the [flux](https://flux-framework.org/) version compatible to the installation of a given HPC cluster. Which GPUs are available? Who manufactured these GPUs? Does the HPC use [mpich](https://www.mpich.org/) or [OpenMPI](https://www.open-mpi.org/) or one of their commercial counter parts like cray MPI or intel MPI? Depending on the configuration different installation options can be choosen, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#hpc-job-executor).\n", - "\n", - "Afterwards flux can be started in an [sbatch](https://slurm.schedmd.com/sbatch.html) submission script using:\n", - "```\n", - "srun flux start python \n", - "```\n", - "In this Python script `` the `\"flux_allocation\"` backend can be used." - ] - }, - { - "cell_type": "markdown", - "id": "68be70c3-af18-4165-862d-7022d35bf9e4", - "metadata": {}, - "source": [ - "### Resource Assignment\n", - "Independent of the selected Executor [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) or HPC job executor the assignment of the computational resources remains the same. They can either be specified in the `submit()` function by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) or alternatively during the initialization of the `Executor` class by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) there.\n", - "\n", - "This functionality of executorlib is commonly used to rewrite individual Python functions to use MPI while the rest of the Python program remains serial." - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "8a2c08df-cfea-4783-ace6-68fcd8ebd330", - "metadata": {}, - "outputs": [], - "source": [ - "def calc_mpi(i):\n", - " from mpi4py import MPI\n", - "\n", - " size = MPI.COMM_WORLD.Get_size()\n", - " rank = MPI.COMM_WORLD.Get_rank()\n", - " return i, size, rank" - ] - }, - { - "cell_type": "markdown", - "id": "715e0c00-7b17-40bb-bd55-b0e097bfef07", - "metadata": {}, - "source": [ - "Depending on the choice of MPI version, it is recommended to specify the pmi standard which [flux](https://flux-framework.org/) should use internally for the resource assignment. For example for OpenMPI >=5 `\"pmix\"` is the recommended pmi standard." - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "5802c7d7-9560-4909-9d30-a915a91ac0a1", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[(3, 2, 0), (3, 2, 1)]\n" - ] - } - ], - "source": [ - "from executorlib import FluxJobExecutor\n", - "\n", - "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", - " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", - " print(fs.result())" - ] - }, - { - "cell_type": "markdown", - "id": "da862425-08b6-4ced-999f-89a74e85f410", - "metadata": {}, - "source": [ - "### Block Allocation\n", - "The block allocation for the HPC allocation mode follows the same implementation as the [block allocation for the Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation). It starts by defining the initialization function `init_function()` which returns a dictionary which is internally used to look up input parameters for Python functions submitted to the `FluxJobExecutor` class. Commonly this functionality is used to store large data objects inside the Python process created for the block allocation, rather than reloading these Python objects for each submitted function." - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "cdc742c0-35f7-47ff-88c0-1b0dbeabe51b", - "metadata": {}, - "outputs": [], - "source": [ - "def init_function():\n", - " return {\"j\": 4, \"k\": 3, \"l\": 2}" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "5ddf8343-ab2c-4469-ac9f-ee568823d4ad", - "metadata": {}, - "outputs": [], - "source": [ - "def calc_with_preload(i, j, k):\n", - " return i + j + k" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "0da13efa-1941-416f-b9e6-bba15b5cdfa2", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "10\n" - ] - } - ], - "source": [ - "with FluxJobExecutor(\n", - " flux_executor_pmi_mode=\"pmix\",\n", - " max_workers=2,\n", - " init_function=init_function,\n", - " block_allocation=True,\n", - ") as exe:\n", - " fs = exe.submit(calc_with_preload, 2, j=5)\n", - " print(fs.result())" - ] - }, - { - "cell_type": "markdown", - "id": "82f3b947-e662-4a0d-b590-9475e0b4f7dd", - "metadata": {}, - "source": [ - "In this example the parameter `k` is used from the dataset created by the initialization function while the parameters `i` and `j` are specified by the call of the `submit()` function. \n", - "\n", - "When using the block allocation mode, it is recommended to set either the maxium number of workers using the `max_workers` parameter or the maximum number of CPU cores using the `max_cores` parameter to prevent oversubscribing the available resources. " - ] - }, - { - "cell_type": "markdown", - "id": "8ced8359-8ecb-480b-966b-b85d8446d85c", - "metadata": {}, - "source": [ - "### Dependencies\n", - "Python functions with rather different computational resource requirements should not be merged into a single function. So to able to execute a series of Python functions which each depend on the output of the previous Python function executorlib internally handles the dependencies based on the [concurrent futures future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects from the Python standard library. This implementation is independent of the selected backend and works for HPC allocation mode just like explained in the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) section." - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "bd26d97b-46fd-4786-9ad1-1e534b31bf36", - "metadata": {}, - "outputs": [], - "source": [ - "def add_funct(a, b):\n", - " return a + b" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "1a2d440f-3cfc-4ff2-b74d-e21823c65f69", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "6\n" - ] - } - ], - "source": [ - "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", - " future = 0\n", - " for i in range(1, 4):\n", - " future = exe.submit(add_funct, i, future)\n", - " print(future.result())" - ] - }, - { - "cell_type": "markdown", - "id": "f526c2bf-fdf5-463b-a955-020753138415", - "metadata": {}, - "source": [ - "### Caching\n", - "Finally, also the caching is available for HPC allocation mode, in analogy to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache). Again this functionality is not designed to identify function calls with the same parameters, but rather provides the option to reload previously cached results even after the Python processes which contained the executorlib `Executor` class is closed. As the cache is stored on the file system, this option can decrease the performance of executorlib. Consequently the caching option should primarily be used during the prototyping phase." - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "dcba63e0-72f5-49d1-ab04-2092fccc1c47", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[2, 4, 6]\n" - ] - } - ], - "source": [ - "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n", - " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", - " print([f.result() for f in future_lst])" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "c3958a14-075b-4c10-9729-d1c559a9231c", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "['sumd1bf4ee658f1ac42924a2e4690e797f4.h5out', 'sum5171356dfe527405c606081cfbd2dffe.h5out', 'sumb6a5053f96b7031239c2e8d0e7563ce4.h5out']\n" - ] - } - ], - "source": [ - "import os\n", - "import shutil\n", - "\n", - "cache_dir = \"./cache\"\n", - "if os.path.exists(cache_dir):\n", - " print(os.listdir(cache_dir))\n", - " try:\n", - " shutil.rmtree(cache_dir)\n", - " except OSError:\n", - " pass" - ] - }, - { - "cell_type": "markdown", - "id": "c24ca82d-60bd-4fb9-a082-bf9a81e838bf", - "metadata": {}, - "source": [ - "### Nested executors\n", - "The hierarchical nature of the [flux](https://flux-framework.org/) job scheduler allows the creation of additional executorlib Executors inside the functions submitted to the Executor. This hierarchy can be beneficial to separate the logic to saturate the available computational resources. " - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "06fb2d1f-65fc-4df6-9402-5e9837835484", - "metadata": {}, - "outputs": [], - "source": [ - "def calc_nested():\n", - " from executorlib import FluxJobExecutor\n", - "\n", - " with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", - " fs = exe.submit(sum, [1, 1])\n", - " return fs.result()" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "89b7d0fd-5978-4913-a79a-f26cc8047445", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "2\n" - ] - } - ], - "source": [ - "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n", - " fs = exe.submit(calc_nested)\n", - " print(fs.result())" - ] - }, - { - "cell_type": "markdown", - "id": "34a8c690-ca5a-41d1-b38f-c67eff085750", - "metadata": {}, - "source": [ - "### Resource Monitoring\n", - "For debugging it is commonly helpful to keep track of the computational resources. [flux](https://flux-framework.org/) provides a number of features to analyse the resource utilization, so here only the two most commonly used ones are introduced. Starting with the option to list all the resources available in a given allocation with the `flux resource list` command:" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "id": "7481eb0a-a41b-4d46-bb48-b4db299fcd86", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " STATE NNODES NCORES NGPUS NODELIST\n", - " free 1 2 0 fedora\n", - " allocated 0 0 0 \n", - " down 0 0 0 \n" - ] - } - ], - "source": [ - "! flux resource list" - ] - }, - { - "cell_type": "markdown", - "id": "08d98134-a0e0-4841-be82-e09e1af29e7f", - "metadata": {}, - "source": [ - "Followed by the list of jobs which were executed in a given flux session. This can be retrieved using the `flux jobs -a` command:" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "id": "1ee6e147-f53a-4526-8ed0-fd036f2ee6bf", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " JOBID USER NAME ST NTASKS NNODES TIME INFO\n", - "\u001B[01;32m ƒDqBpVYK jan python CD 1 1 0.695s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒDxdEtYf jan python CD 1 1 0.225s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒDVahzPq jan python CD 1 1 0.254s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒDSsZJXH jan python CD 1 1 0.316s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒDSu3Hod jan python CD 1 1 0.277s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒDFbkmFD jan python CD 1 1 0.247s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒD9eKeas jan python CD 1 1 0.227s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒD3iNXCs jan python CD 1 1 0.224s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒCoZ3P5q jan python CD 1 1 0.261s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒCoXZPoV jan python CD 1 1 0.261s fedora\n", - "\u001B[0;0m\u001B[01;32m ƒCZ1URjd jan python CD 2 1 0.360s fedora\n", - "\u001B[0;0m" - ] - } - ], - "source": [ - "! flux jobs -a" - ] - }, - { - "cell_type": "markdown", - "id": "021f165b-27cc-4676-968b-cbcfd1f0210a", - "metadata": {}, - "source": [ - "## Flux\n", - "While the number of HPC clusters which use [flux](https://flux-framework.org/) as primary job scheduler is currently still limited the setup and functionality provided by executorlib for running [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux) also applies to HPCs which use [flux](https://flux-framework.org/) as primary job scheduler." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04f03ebb-3f9e-4738-b9d2-5cb0db9b63c3", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.5" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} +{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.9","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"87c3425d-5abe-4e0b-a948-e371808c322c","cell_type":"markdown","source":"# HPC Job Executor\nIn contrast to the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) which submits individual Python functions to HPC job schedulers, the HPC Job Executors take a given job allocation of the HPC job scheduler and executes Python functions with the resources available in this job allocation. In this regard it is similar to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) as it communicates with the individual Python processes using the [zero message queue](https://zeromq.org/), still it is more advanced as it can access the computational resources of all compute nodes of the given HPC job allocation and also provides the option to assign GPUs as accelerators for parallel execution.\n\nAvailable Functionality: \n* Submit Python functions with the [submit() function or the map() function](https://executorlib.readthedocs.io/en/latest/1-single-node.html#basic-functionality).\n* Support for parallel execution, either using the [message passing interface (MPI)](https://executorlib.readthedocs.io/en/latest/1-single-node.html#mpi-parallel-functions), [thread based parallelism](https://executorlib.readthedocs.io/en/latest/1-single-node.html#thread-parallel-functions) or by [assigning dedicated GPUs](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#resource-assignment) to selected Python functions. All these resources assignments are handled via the [resource dictionary parameter resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).\n* Performance optimization features, like [block allocation](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation), [dependency resolution](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) and [caching](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache).\n\nThe only parameter the user has to change is the `backend` parameter. ","metadata":{}},{"id":"8c788b9f-6b54-4ce0-a864-4526b7f6f170","cell_type":"markdown","source":"## SLURM\nWith the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com/) currently being the most commonly used job scheduler, executorlib provides an interface to submit Python functions to SLURM. Internally, this is based on the [srun](https://slurm.schedmd.com/srun.html) command of the SLURM scheduler, which creates job steps in a given allocation. Given that all resource requests in SLURM are communicated via a central database a large number of submitted Python functions and resulting job steps can slow down the performance of SLURM. To address this limitation it is recommended to install the hierarchical job scheduler [flux](https://flux-framework.org/) in addition to SLURM, to use flux for distributing the resources within a given allocation. This configuration is discussed in more detail below in the section [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux).","metadata":{}},{"id":"133b751f-0925-4d11-99f0-3f8dd9360b54","cell_type":"code","source":"from executorlib import SlurmJobExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"9b74944e-2ccd-4cb0-860a-d876310ea870","cell_type":"markdown","source":"```python\nwith SlurmAllocationExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())\n```","metadata":{}},{"id":"36e2d68a-f093-4082-933a-d95bfe7a60c6","cell_type":"markdown","source":"## SLURM with Flux \nAs discussed in the installation section it is important to select the [flux](https://flux-framework.org/) version compatible to the installation of a given HPC cluster. Which GPUs are available? Who manufactured these GPUs? Does the HPC use [mpich](https://www.mpich.org/) or [OpenMPI](https://www.open-mpi.org/) or one of their commercial counter parts like cray MPI or intel MPI? Depending on the configuration different installation options can be choosen, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#hpc-job-executor).\n\nAfterwards flux can be started in an [sbatch](https://slurm.schedmd.com/sbatch.html) submission script using:\n```\nsrun flux start python \n```\nIn this Python script `` the `\"flux_allocation\"` backend can be used.","metadata":{}},{"id":"68be70c3-af18-4165-862d-7022d35bf9e4","cell_type":"markdown","source":"### Resource Assignment\nIndependent of the selected Executor [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) or HPC job executor the assignment of the computational resources remains the same. They can either be specified in the `submit()` function by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) or alternatively during the initialization of the `Executor` class by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) there.\n\nThis functionality of executorlib is commonly used to rewrite individual Python functions to use MPI while the rest of the Python program remains serial.","metadata":{}},{"id":"8a2c08df-cfea-4783-ace6-68fcd8ebd330","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":2},{"id":"715e0c00-7b17-40bb-bd55-b0e097bfef07","cell_type":"markdown","source":"Depending on the choice of MPI version, it is recommended to specify the pmi standard which [flux](https://flux-framework.org/) should use internally for the resource assignment. For example for OpenMPI >=5 `\"pmix\"` is the recommended pmi standard.","metadata":{}},{"id":"5802c7d7-9560-4909-9d30-a915a91ac0a1","cell_type":"code","source":"from executorlib import FluxJobExecutor\n\nwith FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":3},{"id":"da862425-08b6-4ced-999f-89a74e85f410","cell_type":"markdown","source":"### Block Allocation\nThe block allocation for the HPC allocation mode follows the same implementation as the [block allocation for the Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation). It starts by defining the initialization function `init_function()` which returns a dictionary which is internally used to look up input parameters for Python functions submitted to the `FluxJobExecutor` class. Commonly this functionality is used to store large data objects inside the Python process created for the block allocation, rather than reloading these Python objects for each submitted function.","metadata":{}},{"id":"cdc742c0-35f7-47ff-88c0-1b0dbeabe51b","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":4},{"id":"5ddf8343-ab2c-4469-ac9f-ee568823d4ad","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"0da13efa-1941-416f-b9e6-bba15b5cdfa2","cell_type":"code","source":"with FluxJobExecutor(\n flux_executor_pmi_mode=\"pmix\",\n max_workers=2,\n init_function=init_function,\n block_allocation=True,\n) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":6},{"id":"82f3b947-e662-4a0d-b590-9475e0b4f7dd","cell_type":"markdown","source":"In this example the parameter `k` is used from the dataset created by the initialization function while the parameters `i` and `j` are specified by the call of the `submit()` function. \n\nWhen using the block allocation mode, it is recommended to set either the maxium number of workers using the `max_workers` parameter or the maximum number of CPU cores using the `max_cores` parameter to prevent oversubscribing the available resources. ","metadata":{}},{"id":"8ced8359-8ecb-480b-966b-b85d8446d85c","cell_type":"markdown","source":"### Dependencies\nPython functions with rather different computational resource requirements should not be merged into a single function. So to able to execute a series of Python functions which each depend on the output of the previous Python function executorlib internally handles the dependencies based on the [concurrent futures future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects from the Python standard library. This implementation is independent of the selected backend and works for HPC allocation mode just like explained in the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) section.","metadata":{}},{"id":"bd26d97b-46fd-4786-9ad1-1e534b31bf36","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":7},{"id":"1a2d440f-3cfc-4ff2-b74d-e21823c65f69","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":8},{"id":"f526c2bf-fdf5-463b-a955-020753138415","cell_type":"markdown","source":"### Caching\nFinally, also the caching is available for HPC allocation mode, in analogy to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache). Again this functionality is not designed to identify function calls with the same parameters, but rather provides the option to reload previously cached results even after the Python processes which contained the executorlib `Executor` class is closed. As the cache is stored on the file system, this option can decrease the performance of executorlib. Consequently the caching option should primarily be used during the prototyping phase.","metadata":{}},{"id":"dcba63e0-72f5-49d1-ab04-2092fccc1c47","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\n"}],"execution_count":9},{"id":"c3958a14-075b-4c10-9729-d1c559a9231c","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./cache\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum0d968285d17368d1c34ea7392309bcc5', 'sum6270955d7c8022a0c1027aafaee64439', 'sum0102e33bb2921ae07a3bbe3db5d3dec9']\n"}],"execution_count":10},{"id":"c24ca82d-60bd-4fb9-a082-bf9a81e838bf","cell_type":"markdown","source":"### Nested executors\nThe hierarchical nature of the [flux](https://flux-framework.org/) job scheduler allows the creation of additional executorlib Executors inside the functions submitted to the Executor. This hierarchy can be beneficial to separate the logic to saturate the available computational resources. ","metadata":{}},{"id":"06fb2d1f-65fc-4df6-9402-5e9837835484","cell_type":"code","source":"def calc_nested():\n from executorlib import FluxJobExecutor\n\n with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n fs = exe.submit(sum, [1, 1])\n return fs.result()","metadata":{"trusted":true},"outputs":[],"execution_count":11},{"id":"89b7d0fd-5978-4913-a79a-f26cc8047445","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n fs = exe.submit(calc_nested)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":12},{"id":"34a8c690-ca5a-41d1-b38f-c67eff085750","cell_type":"markdown","source":"### Resource Monitoring\nFor debugging it is commonly helpful to keep track of the computational resources. [flux](https://flux-framework.org/) provides a number of features to analyse the resource utilization, so here only the two most commonly used ones are introduced. Starting with the option to list all the resources available in a given allocation with the `flux resource list` command:","metadata":{}},{"id":"7481eb0a-a41b-4d46-bb48-b4db299fcd86","cell_type":"code","source":"! flux resource list","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" STATE NNODES NCORES NGPUS NODELIST\n free 1 24 0 jupyter-pyiron-executorlib-slqpe5j5\n allocated 0 0 0 \n down 0 0 0 \n"}],"execution_count":13},{"id":"08d98134-a0e0-4841-be82-e09e1af29e7f","cell_type":"markdown","source":"Followed by the list of jobs which were executed in a given flux session. This can be retrieved using the `flux jobs -a` command:","metadata":{}},{"id":"1ee6e147-f53a-4526-8ed0-fd036f2ee6bf","cell_type":"code","source":"! flux jobs -a","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" JOBID USER NAME ST NTASKS NNODES TIME INFO\n\u001b[01;32m ƒ5c7bbtT jovyan flux CD 1 1 4.227s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ47tyNMM jovyan python CD 1 1 2.982s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ47sVP51 jovyan python CD 1 1 2.902s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ427vAfR jovyan python CD 1 1 2.986s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ3jUnECw jovyan python CD 1 1 0.455s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ3P1G9Uj jovyan python CD 1 1 0.643s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ38sQze3 jovyan python CD 1 1 0.606s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ2HHH1w5 jovyan python CD 1 1 1.665s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒ2EvtA1M jovyan python CD 1 1 1.734s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m\u001b[01;32m ƒV4qQRd jovyan python CD 2 1 1.463s jupyter-pyiron-executorlib-slqpe5j5\n\u001b[0;0m"}],"execution_count":14},{"id":"021f165b-27cc-4676-968b-cbcfd1f0210a","cell_type":"markdown","source":"## Flux\nWhile the number of HPC clusters which use [flux](https://flux-framework.org/) as primary job scheduler is currently still limited the setup and functionality provided by executorlib for running [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux) also applies to HPCs which use [flux](https://flux-framework.org/) as primary job scheduler.","metadata":{}},{"id":"04f03ebb-3f9e-4738-b9d2-5cb0db9b63c3","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file