# Execution options

One of the key design features of Pydra is the separation between the parameterisation of
the task to be executed, and the parameresiation of where and how the task should be
executed (e.g. on the cloud, on a HPC cluster, ...). This tutorial steps you through
some of the available options for executing a task.

[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/nipype/pydra-tutorial/develop/notebooks/tutorial/advanced_execution.ipynb)

In [1]:
import nest_asyncio
nest_asyncio.apply()

## Workers

Pydra supports several workers with which to execute tasks

- `debug` (default)
- `cf`
- `slurm`
- `sge`
- `psij`
- `dask` (experimental)

By default, the *debug* worker is used, which runs tasks serially in a single process
without use of the `asyncio` module. This makes it easier to debug errors in workflows
and python tasks, however, when using in Pydra in production you will typically want to
parallelise the execution for efficiency.

If running on a local workstation, then the `cf` (*ConcurrentFutures*) worker is a good
option because it is able to spread the tasks to be run over multiple processes and
maximise CPU usage.

If you have access to a high-performance cluster (HPC) then
the [SLURM](https://slurm.schedmd.com/documentation.html) and
[SGE](https://www.metagenomics.wiki/tools/hpc-sge) and [PSI/J](https://exaworks.org/psij)
workers can be used to submit each workflow node as separate jobs to the HPC scheduler.
There is also an experimental [Dask](https://www.dask.org/) worker, which provides a
range of execution backends to choose from.

To specify a worker, the abbreviation can be passed either as a string or using the
class itself. Additional parameters can be passed to the worker initialisation as keyword
arguments to the execution call. For example, if we wanted to run five tasks using the
ConcurentFutures worker but only use three CPUs, we can pass `n_procs=3` to the execution
call.

In [2]:

from pydra.design import python

if __name__ == "__main__":

    @python.define
    def TenToThePower(p: int) -> int:
        return 10 ** p

    ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])

    # Run the 5 tasks in parallel split across 3 processes
    outputs = ten_to_the_power(worker="cf", n_procs=3)

    p1, p2, p3, p4, p5 = outputs.out

    print(f"10^5 = {p5}")

A newer version (0.25) of nipype/pydra is available. You are using 0.25.dev144+g6a590e9d.d20250124


RuntimeError: Graph of 'Workflow(name='Split', inputs=Split(_constructed=None, defn=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=<function TenToThePower at 0x10b6f3420>), constructor=<function Submitter.__call__.<locals>.Split at 0x114510d60>), outputs=SplitOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=<function TenToThePower at 0x10b6f3420>), _workflow=..., _lzout=TenToThePowerOutputs(out=...), _state=<pydra.engine.state.State object at 0x114609130>, _cont_dim=None, _inner_cont_dim={}))), _nodes={'TenToThePower': Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=<function TenToThePower at 0x10b6f3420>), _workflow=..., _lzout=TenToThePowerOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=...)), _state=<pydra.engine.state.State object at 0x114609130>, _cont_dim=None, _inner_cont_dim={})})' workflow is not empty, but not able to get more tasks - something has gone wrong when retrieving the results predecessors:



Alternatively, the worker object can be initialised in the calling code and passed directly to the execution call

In [None]:
from pydra.engine.workers import ConcurrentFuturesWorker

ten_to_the_power = TenToThePower().split(p=[6, 7, 8, 9, 10])

# Run the 5 tasks in parallel split across 3 processes
outputs = ten_to_the_power(worker=ConcurrentFuturesWorker(n_procs=3))

p6, p7, p8, p9, p10 = outputs.out

print(f"10^10 = {p10}")

## Cache locations

When a task runs, a unique hash is generated by the combination of all the inputs to the
task and the operation to be performed. This hash is used to name the output directory for
the task within the specified cache directory. Therefore, if you use the same cache
directory between runs and in a subsequent run the same task is executed with the same
inputs then the location of its output directory will also be the same, and the outputs
generated by the previous run are reused.

For example, using the MrGrid example from the [Getting Started Tutorial](./1-getting-started.html)


In [None]:
from pathlib import Path
import tempfile
from fileformats.medimage import Nifti1
from pydra.engine.submitter import Submitter
from pydra.tasks.mrtrix3.v3_0 import MrGrid

# Make directory filled with nifti files
test_dir = Path(tempfile.mkdtemp())
nifti_dir = test_dir / "nifti"
nifti_dir.mkdir()
for i in range(10):
    Nifti1.sample(nifti_dir, seed=i)

# Instantiate the task definition, "splitting" over all NIfTI files in the test directory
# by splitting the "input" input field over all files in the directory
mrgrid = MrGrid(operation="regrid", voxel=(0.5, 0.5, 0.5)).split(
    in_file=nifti_dir.iterdir()
)

# Run the task to resample all NIfTI files
outputs = mrgrid()

# Create a new custom directory
cache_dir = test_dir / "cache"
cache_dir.mkdir()

submitter = Submitter(cache_dir=cache_dir)

# Run the task to resample all NIfTI files with different voxel sizes
with submitter:
    result1 = submitter(mrgrid)

print(result1)


If we attempt to run the same task with the same parameterisation the cache directory
will point to the same location and the results will be reused

In [None]:
mrgrid_varying_vox_sizes2 = MrGrid().split(
    ("input", "voxel"),
    input=nifti_dir.iterdir(),
    voxel=VOXEL_SIZES
)

submitter = Submitter(cache_dir=test_dir / "cache")

# Result from previous run is reused as the task and inputs are identical
with submitter:
    result2 = submitter(mrgrid_varying_vox_sizes2)


# Check that the output directory is the same for both runs
assert result2.output_dir == result1.output_dir

# Change the voxel sizes to resample the NIfTI files to for one of the files
mrgrid_varying_vox_sizes2.inputs.voxel[2] = [0.25]

# Result from previous run is reused as the task and inputs are identical
with submitter:
    result3 = submitter(mrgrid_varying_vox_sizes2)

# The output directory will be different as the inputs are now different
assert result3.output_dir != result1.output_dir

Note that for file objects, the contents of the files are used to calculate the hash
not their paths. Therefore, when inputting large files there might be some additional
overhead on the first run (the file hashes themselves are cached by path and mtime so
shouldn't need to be recalculated unless they are modified). However, this makes the
hashes invariant to file-system movement. For example, changing the name of one of the
files in the nifti directory won't invalidate the hash.

In [None]:
# Rename a NIfTI file within the test directory
first_file = next(nifti_dir.iterdir())
first_file.rename(first_file.with_name("first.nii.gz"))

mrgrid_varying_vox_sizes3 = MrGrid().split(
    ("input", "voxel"),
    input=nifti_dir.iterdir(),
    voxel=VOXEL_SIZES
)

# Result from previous run is reused as the task and inputs are identical
with submitter:
    result4 = submitter(mrgrid_varying_vox_sizes2)

# Check that the output directory is the same for both runs
assert result4.output_dir == result1.output_dir

See [Caches and hashes](../explanation/hashing-caching.html) for more details on how inputs
are hashed for caching and issues to consider.

## Environments

Work in progress...

See [Containers and Environments](../explanation/environments.rst) for more details.

## Provenance and auditing

Work in progress...