Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization fixes #2280

Merged
merged 10 commits into from
Jan 2, 2024
1 change: 1 addition & 0 deletions doc/changes/2280.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved behavior of `parallel_map` and `loky_pmap` in the case of timeouts, errors or keyboard interrupts
6 changes: 0 additions & 6 deletions qutip/solver/mcsolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ def mcsolve(H, state, tlist, c_ops=(), e_ops=None, ntraj=500, *,
| How to run the trajectories. "parallel" uses concurent module to
run in parallel while "loky" use the module of the same name to do
so.
- | job_timeout : int
| Maximum time to compute one trajectory.
- | num_cpus : int
| Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down Expand Up @@ -416,7 +414,6 @@ class MCSolver(MultiTrajSolver):
"keep_runs_results": False,
"method": "adams",
"map": "serial",
"job_timeout": None,
"num_cpus": None,
"bitgenerator": None,
"mc_corr_eps": 1e-10,
Expand Down Expand Up @@ -603,9 +600,6 @@ def options(self):
run in parallel while "loky" use the module of the same name to do
so.

job_timeout: None, int
Maximum time to compute one trajectory.

num_cpus: None, int
Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down
2 changes: 0 additions & 2 deletions qutip/solver/multitraj.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class MultiTrajSolver(Solver):
"normalize_output": False,
"method": "",
"map": "serial",
"job_timeout": None,
"num_cpus": None,
"bitgenerator": None,
}
Expand Down Expand Up @@ -146,7 +145,6 @@ def _initialize_run(self, state, ntraj=1, args=None, e_ops=(),
map_func = _get_map[self.options['map']]
map_kw = {
'timeout': timeout,
'job_timeout': self.options['job_timeout'],
'num_cpus': self.options['num_cpus'],
}
state0 = self._prepare_state(state)
Expand Down
5 changes: 0 additions & 5 deletions qutip/solver/nm_mcsolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def nm_mcsolve(H, state, tlist, ops_and_rates=(), e_ops=None, ntraj=500, *,
| How to run the trajectories. "parallel" uses concurent module to
run in parallel while "loky" use the module of the same name to do
so.
- | job_timeout : int
| Maximum time to compute one trajectory.
- | num_cpus : int
| Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down Expand Up @@ -559,9 +557,6 @@ def options(self):
run in parallel while "loky" use the module of the same name to do
so.

job_timeout: None, int
Maximum time to compute one trajectory.

num_cpus: None, int
Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down
88 changes: 50 additions & 38 deletions qutip/solver/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@


default_map_kw = {
'job_timeout': threading.TIMEOUT_MAX,
'timeout': threading.TIMEOUT_MAX,
'num_cpus': available_cpu_count(),
'fail_fast': True,
Expand Down Expand Up @@ -149,25 +148,24 @@ def parallel_map(task, values, task_args=None, task_kwargs=None,
The list or array of values for which the ``task`` function is to be
evaluated.
task_args : list, optional
The optional additional argument to the ``task`` function.
The optional additional arguments to the ``task`` function.
task_kwargs : dictionary, optional
The optional additional keyword argument to the ``task`` function.
The optional additional keyword arguments to the ``task`` function.
reduce_func : func, optional
If provided, it will be called with the output of each tasks instead of
storing a them in a list. Note that the order in which results are
If provided, it will be called with the output of each task instead of
storing them in a list. Note that the order in which results are
passed to ``reduce_func`` is not defined. It should return None or a
number. When returning a number, it represent the estimation of the
number of task left. On a return <= 0, the map will end early.
number. When returning a number, it represents the estimation of the
number of tasks left. On a return <= 0, the map will end early.
progress_bar : str, optional
Progress bar options's string for showing progress.
progress_bar_kwargs : dict, optional
Options for the progress bar.
map_kw: dict, optional
Dictionary containing entry for:
- timeout: float, Maximum time (sec) for the whole map.
- job_timeout: float, Maximum time (sec) for each job in the map.
- num_cpus: int, Number of job to run at once.
- fail_fast: bool, Raise an error at the first.
- num_cpus: int, Number of jobs to run at once.
- fail_fast: bool, Abort at the first error.

Returns
-------
Expand All @@ -184,7 +182,6 @@ def parallel_map(task, values, task_args=None, task_kwargs=None,
task_kwargs = {}
map_kw = _read_map_kw(map_kw)
end_time = map_kw['timeout'] + time.time()
job_time = map_kw['job_timeout']

progress_bar = progress_bars[progress_bar](
len(values), **progress_bar_kwargs
Expand All @@ -194,20 +191,33 @@ def parallel_map(task, values, task_args=None, task_kwargs=None,
finished = []
if reduce_func is not None:
results = None
result_func = lambda i, value: reduce_func(value)
def result_func(_, value):
return reduce_func(value)
else:
results = [None] * len(values)
result_func = lambda i, value: results.__setitem__(i, value)
result_func = results.__setitem__

def _done_callback(future):
if not future.cancelled():
try:
result = future.result()
remaining_ntraj = result_func(future._i, result)
if remaining_ntraj is not None and remaining_ntraj <= 0:
finished.append(True)
except Exception as e:
errors[future._i] = e
remaining_ntraj = result_func(future._i, result)
if remaining_ntraj is not None and remaining_ntraj <= 0:
finished.append(True)
except KeyboardInterrupt:
hodgestar marked this conversation as resolved.
Show resolved Hide resolved
# When a keyboard interrupt happens, it is raised in the main
# thread and in all worker threads. The worker threads have
# already returned and the main thread is only waiting for the
# ProcessPoolExecutor to shutdown before exiting. If the call
# to `future.result()` in this callback function raises the
# KeyboardInterrupt again, it makes the system enter a kind of
# deadlock state, where the user has to press CTRL+C a second
# time to actually end the program. For that reason, we
# silently ignore the KeyboardInterrupt here, avoiding the
# deadlock and allowing the main thread to exit.
pass
progress_bar.update()

if sys.version_info >= (3, 7):
Expand Down Expand Up @@ -294,24 +304,24 @@ def loky_pmap(task, values, task_args=None, task_kwargs=None,
The list or array of values for which the ``task`` function is to be
evaluated.
task_args : list, optional
The optional additional argument to the ``task`` function.
The optional additional arguments to the ``task`` function.
task_kwargs : dictionary, optional
The optional additional keyword argument to the ``task`` function.
The optional additional keyword arguments to the ``task`` function.
reduce_func : func, optional
If provided, it will be called with the output of each tasks instead of
storing a them in a list. It should return None or a number. When
returning a number, it represent the estimation of the number of task
left. On a return <= 0, the map will end early.
If provided, it will be called with the output of each task instead of
storing them in a list. Note that the results are passed to
``reduce_func`` in the original order. It should return None or a
number. When returning a number, it represents the estimation of the
number of tasks left. On a return <= 0, the map will end early.
progress_bar : str, optional
Progress bar options's string for showing progress.
progress_bar_kwargs : dict, optional
Options for the progress bar.
map_kw: dict, optional
Dictionary containing entry for:
- timeout: float, Maximum time (sec) for the whole map.
- job_timeout: float, Maximum time (sec) for each job in the map.
- num_cpus: int, Number of job to run at once.
- fail_fast: bool, Raise an error at the first.
- num_cpus: int, Number of jobs to run at once.
- fail_fast: bool, Abort at the first error.

Returns
-------
Expand All @@ -327,30 +337,34 @@ def loky_pmap(task, values, task_args=None, task_kwargs=None,
if task_kwargs is None:
task_kwargs = {}
map_kw = _read_map_kw(map_kw)
os.environ['QUTIP_IN_PARALLEL'] = 'TRUE'
from loky import get_reusable_executor, TimeoutError
end_time = map_kw['timeout'] + time.time()

progress_bar = progress_bars[progress_bar](
len(values), **progress_bar_kwargs
)

executor = get_reusable_executor(max_workers=map_kw['num_cpus'])
end_time = map_kw['timeout'] + time.time()
job_time = map_kw['job_timeout']
results = None
remaining_ntraj = None
errors = {}
remaining_ntraj = None
if reduce_func is None:
results = [None] * len(values)
else:
results = None

os.environ['QUTIP_IN_PARALLEL'] = 'TRUE'
from loky import get_reusable_executor, TimeoutError
try:
executor = get_reusable_executor(max_workers=map_kw['num_cpus'])

jobs = [executor.submit(task, value, *task_args, **task_kwargs)
for value in values]

for n, job in enumerate(jobs):
remaining_time = min(end_time - time.time(), job_time)
remaining_time = max(end_time - time.time(), 0)
try:
result = job.result(remaining_time)
except TimeoutError:
[job.cancel() for job in jobs]
break
except Exception as err:
if map_kw["fail_fast"]:
raise err
Expand All @@ -369,13 +383,11 @@ def loky_pmap(task, values, task_args=None, task_kwargs=None,
[job.cancel() for job in jobs]
raise e

except TimeoutError:
[job.cancel() for job in jobs]

finally:
executor.shutdown()
os.environ['QUTIP_IN_PARALLEL'] = 'FALSE'
pmenczel marked this conversation as resolved.
Show resolved Hide resolved
executor.shutdown(kill_workers=True)
hodgestar marked this conversation as resolved.
Show resolved Hide resolved

progress_bar.finished()
os.environ['QUTIP_IN_PARALLEL'] = 'FALSE'
if errors:
raise MapExceptions(
f"{len(errors)} iterations failed in loky_pmap",
Expand Down
2 changes: 1 addition & 1 deletion qutip/solver/solver_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def _solver_deprecation(kwargs, options, solver="me"):
if "map_kwargs" in kwargs and solver in ["mc", "stoc"]:
warnings.warn(
'"map_kwargs" are now included in options:\n'
'Use `options={"num_cpus": N, "job_timeout": Nsec}`',
'Use `options={"num_cpus": N}`',
FutureWarning
)
del kwargs["map_kwargs"]
Expand Down
10 changes: 0 additions & 10 deletions qutip/solver/stochastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,6 @@ def smesolve(
| How to run the trajectories. "parallel" uses concurent module to
run in parallel while "loky" use the module of the same name to do
so.
- | job_timeout : NoneType, int
| Maximum time to compute one trajectory.
- | num_cpus : NoneType, int
| Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down Expand Up @@ -459,8 +457,6 @@ def ssesolve(
How to run the trajectories. "parallel" uses concurent module to
run in parallel while "loky" use the module of the same name to do
so.
- | job_timeout : NoneType, int
| Maximum time to compute one trajectory.
- | num_cpus : NoneType, int
| Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down Expand Up @@ -507,7 +503,6 @@ class StochasticSolver(MultiTrajSolver):
"normalize_output": False,
"method": "taylor1.5",
"map": "serial",
"job_timeout": None,
"num_cpus": None,
"bitgenerator": None,
}
Expand Down Expand Up @@ -684,9 +679,6 @@ def options(self):
run in parallel while "loky" use the module of the same name to do
so.

job_timeout: None, int, default: None
Maximum time to compute one trajectory.

num_cpus: None, int, default: None
Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
Expand Down Expand Up @@ -796,7 +788,6 @@ class SMESolver(StochasticSolver):
"normalize_output": False,
"method": "platen",
"map": "serial",
"job_timeout": None,
"num_cpus": None,
"bitgenerator": None,
}
Expand Down Expand Up @@ -840,7 +831,6 @@ class SSESolver(StochasticSolver):
"normalize_output": False,
"method": "platen",
"map": "serial",
"job_timeout": None,
"num_cpus": None,
"bitgenerator": None,
}
2 changes: 0 additions & 2 deletions qutip/tests/solver/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def test_map(map, num_cpus):
args = (1, 2, 3)
kwargs = {'d': 4, 'e': 5, 'f': 6}
map_kw = {
'job_timeout': threading.TIMEOUT_MAX,
'timeout': threading.TIMEOUT_MAX,
'num_cpus': num_cpus,
}
Expand All @@ -60,7 +59,6 @@ def test_map_accumulator(map, num_cpus):
args = (1, 2, 3)
kwargs = {'d': 4, 'e': 5, 'f': 6}
map_kw = {
'job_timeout': threading.TIMEOUT_MAX,
'timeout': threading.TIMEOUT_MAX,
'num_cpus': num_cpus,
}
Expand Down