Skip to content

Commit

Permalink
Provide explicit exception handling behaviours, and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dangercrow committed Sep 29, 2020
1 parent 40cf767 commit 63a56dd
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 21 deletions.
17 changes: 14 additions & 3 deletions pqdm/_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import copy
from concurrent.futures import Executor, as_completed
from typing import Any, Callable, Iterable
from typing import Any, Callable, Iterable, Union

from tqdm import tqdm as tqdm_cli
from tqdm.notebook import tqdm as tqdm_notebook
from typing_extensions import Literal

from pqdm.constants import ArgumentPassing
from pqdm.constants import ArgumentPassing, ExceptionBehaviour
from pqdm.utils import _inside_jupyter, _divide_kwargs

TQDM = tqdm_notebook if _inside_jupyter() else tqdm_cli
Expand All @@ -26,6 +27,7 @@ def _parallel_process(
n_jobs: int,
executor: Executor,
argument_type: str = 'direct',
exception_behaviour: Union[Literal['ignore'], Literal['immediate'], Literal['deferred']] = 'ignore',
**kwargs
):
executor_opts, tqdm_opts = _divide_kwargs(kwargs, executor)
Expand Down Expand Up @@ -69,10 +71,19 @@ def _parallel_process(
collecting_opts['total'] = len(futures)

results = []
exceptions = []
for i, future in TQDM(enumerate(futures), **collecting_opts):
try:
results.append(future.result())
except Exception as e:
results.append(e)
if exception_behaviour == ExceptionBehaviour.IMMEDIATE:
raise e
if exception_behaviour == ExceptionBehaviour.IGNORE:
results.append(e)
if exception_behaviour == ExceptionBehaviour.DEFERRED:
exceptions.append(e)

if exceptions:
raise Exception(*exceptions)

return results
6 changes: 6 additions & 0 deletions pqdm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@
class ArgumentPassing(NamedTuple):
AS_ARGS = 'args'
AS_KWARGS = 'kwargs'


class ExceptionBehaviour(NamedTuple):
IGNORE = 'ignore'
IMMEDIATE = 'immediate'
DEFERRED = 'deferred'
2 changes: 2 additions & 0 deletions pqdm/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def pqdm(
n_jobs: int,
argument_type: Optional[Union[Literal['kwargs'], Literal['args']]] = None,
bounded: bool = False,
exception_behaviour: Union[Literal['ignore'], Literal['immediate'], Literal['deferred']] = 'ignore',
**kwargs
):
return _parallel_process(
Expand All @@ -21,5 +22,6 @@ def pqdm(
argument_type=argument_type,
n_jobs=n_jobs,
executor=BoundedProcessPoolExecutor if bounded else ProcessPoolExecutor,
exception_behaviour=exception_behaviour,
**kwargs
)
2 changes: 2 additions & 0 deletions pqdm/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def pqdm(
n_jobs: int,
argument_type: Optional[Union[Literal['kwargs'], Literal['args']]] = None,
bounded: bool = False,
exception_behaviour: Union[Literal['ignore'], Literal['immediate'], Literal['deferred']] = 'ignore',
**kwargs
):
return _parallel_process(
Expand All @@ -21,5 +22,6 @@ def pqdm(
argument_type=argument_type,
n_jobs=n_jobs,
executor=BoundedThreadPoolExecutor if bounded else ThreadPoolExecutor,
exception_behaviour=exception_behaviour,
**kwargs
)
62 changes: 44 additions & 18 deletions tests/test_pqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
from pqdm.threads import pqdm as pqdm_threads


run_for_threads_and_processes = pytest.mark.parametrize("pqdm_method", [pqdm_threads, pqdm_processes], ids=["threads", "processes"])


class ExceptionWithValueEquality(Exception):
""" Value equality is required for comparisons when processes are involved. """
def __eq__(self, other):
return type(self) is type(other) and self.args == other.args


def multiply_args(a, b):
return a * b

Expand All @@ -17,6 +26,13 @@ def multiply_list(x):
return x[0] * x[1]


def raises_exceptions(obj):
if isinstance(obj, BaseException):
raise obj
else:
return obj


RESULT = [1 * 2, 2 * 3, 3 * 4, 4 * 5]

TEST_DATA = [
Expand Down Expand Up @@ -51,42 +67,52 @@ def multiply_list(x):

]


@pytest.mark.parametrize("function, input_list, kwargs", TEST_DATA)
def test_pqdm_threads_work_with_argument_types(function, input_list, kwargs):
result = pqdm_threads(input_list, function, **kwargs)
assert result == RESULT
TEST_DATA_WITH_EXCEPTIONS = [
ExceptionWithValueEquality(1),
"SomeObjectWithValueEquality",
ExceptionWithValueEquality(2),
]


@run_for_threads_and_processes
@pytest.mark.parametrize("function, input_list, kwargs", TEST_DATA)
def test_pqdm_processes_work_with_argument_types(function, input_list, kwargs):
result = pqdm_processes(input_list, function, **kwargs)
def test_argument_types(pqdm_method, function, input_list, kwargs):
result = pqdm_method(input_list, function, **kwargs)
assert result == RESULT


@run_for_threads_and_processes
@pytest.mark.parametrize("function, input_list, kwargs", TEST_DATA)
def test_pqdm_processes_pushes_argument_to_tqdm(function, input_list, kwargs):
def test_pqdm_processes_pushes_argument_to_tqdm(pqdm_method, function, input_list, kwargs):
output = io.StringIO("")

kwargs['desc'] = 'Testing'
kwargs['file'] = output

result = pqdm_processes(input_list, function, **kwargs)
result = pqdm_method(input_list, function, **kwargs)

text = output.getvalue()
assert 'Testing:' in text
assert result == RESULT


@pytest.mark.parametrize("function, input_list, kwargs", TEST_DATA)
def test_pqdm_threads_pushes_argument_to_tqdm(function, input_list, kwargs):
output = io.StringIO("")
@run_for_threads_and_processes
def test_exceptions_ignored(pqdm_method):
results = pqdm_method(TEST_DATA_WITH_EXCEPTIONS, raises_exceptions, n_jobs=2, exception_behaviour='ignore')
assert results == TEST_DATA_WITH_EXCEPTIONS

kwargs['desc'] = 'Testing'
kwargs['file'] = output

result = pqdm_threads(input_list, function, **kwargs)
@run_for_threads_and_processes
def test_exceptions_immediately(pqdm_method):
with pytest.raises(Exception) as exc:
pqdm_method(TEST_DATA_WITH_EXCEPTIONS, raises_exceptions, n_jobs=2, exception_behaviour='immediate')

text = output.getvalue()
assert 'Testing:' in text
assert result == RESULT
assert exc.value == TEST_DATA_WITH_EXCEPTIONS[0]


@run_for_threads_and_processes
def test_exceptions_deferred(pqdm_method):
with pytest.raises(Exception) as exc:
pqdm_method(TEST_DATA_WITH_EXCEPTIONS, raises_exceptions, n_jobs=2, exception_behaviour='deferred')

assert exc.value.args == (TEST_DATA_WITH_EXCEPTIONS[0], TEST_DATA_WITH_EXCEPTIONS[2])

0 comments on commit 63a56dd

Please sign in to comment.