# Use pipefunc in combination with executorlib
https://pipefunc.readthedocs.io/en/latest/concepts/execution-and-parallelism/

In [1]:
import time
from executorlib import SingleNodeExecutor
import numpy as np
from pipefunc import Pipeline, pipefunc
import threading
import multiprocessing

In [2]:
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def f(x):
    time.sleep(1)  # Simulate I/O-bound work
    return threading.current_thread().name

In [3]:
@pipefunc(output_name="z", mapspec="x[i] -> z[i]")
def g(x):
    np.linalg.eig(np.random.rand(10, 10))  # CPU-bound work
    return multiprocessing.current_process().name

In [4]:
pipeline = Pipeline([f, g])
inputs = {"x": [1, 2, 3]}

In [5]:
executor = {
    "y": SingleNodeExecutor(max_workers=2),
    "": SingleNodeExecutor(max_workers=2),  # empty string means default executor
}
storage = {
    "z": "file_array",
    "": "dict",  # empty string means default storage
}
results = pipeline.map(inputs, run_folder="run_folder", executor=executor, storage=storage)

  return _get_optimal_chunk_size(num_iterations, executor)


In [6]:
# Get the results to check the thread and process names
thread_names = results["y"].output.tolist()
process_names = results["z"].output.tolist()
print(f"thread_names: {thread_names}")
print(f"process_names: {process_names}")

thread_names: ['MainThread', 'MainThread', 'MainThread']
process_names: ['MainProcess', 'MainProcess', 'MainProcess']
