Skip to content

Commit

Permalink
exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
zh217 committed Sep 9, 2018
1 parent 1765305 commit 34b93af
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions aiochan/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ async def work():

def parallel_pipe(self, n, f, out=None, buffer=None, buffer_size=None, close=True, flatten=False,
in_q_size=0, out_q_size=0, mode='thread', mp_module=multiprocessing, pool_args=None,
pool_kwargs=None):
pool_kwargs=None, error_cb=None):

"""
Apply the plain function `f` to each value in the channel, and pipe the results to `out`.
Expand Down Expand Up @@ -569,6 +569,7 @@ def parallel_pipe(self, n, f, out=None, buffer=None, buffer_size=None, close=Tru
(for example, `torch.multiprocessing` from pytorch).
:param pool_args: additional arguments when creating pool
:param pool_kwargs: additional keyword arguments when creating pool
:param error_cb: callback in case there is an error
:return: the output channel.
"""
if out is None:
Expand Down Expand Up @@ -615,7 +616,7 @@ def process_worker(in_q, out_q):
break
else:
data, async_ft = next_item
pool.apply_async(f, (data,), callback=wrap_p_result(out_q, async_ft))
pool.apply_async(f, (data,), callback=wrap_p_result(out_q, async_ft), error_callback=error_cb)

if mode == 'thread':
threading.Thread(target=thread_worker, args=(in_q.sync_q, out_q.sync_q)).start()
Expand Down Expand Up @@ -664,7 +665,7 @@ async def order_out_worker():

def parallel_pipe_unordered(self, n, f, out=None, buffer=None, buffer_size=None, close=True, flatten=False,
in_q_size=0, out_q_size=0, mode='thread', mp_module=multiprocessing, pool_args=None,
pool_kwargs=None):
pool_kwargs=None, error_cb=None):

"""
Apply the plain function `f` to each value in the channel, and pipe the results to `out`.
Expand Down Expand Up @@ -692,6 +693,7 @@ def parallel_pipe_unordered(self, n, f, out=None, buffer=None, buffer_size=None,
(for example, `torch.multiprocessing` from pytorch).
:param pool_args: additional arguments when creating pool
:param pool_kwargs: additional keyword arguments when creating pool
:param error_cb: callback in case there is an error
:return: the output channel.
"""
if out is None:
Expand Down Expand Up @@ -728,7 +730,7 @@ def process_worker(in_q, out_q):
out_q.put(None)
break
else:
pool.apply_async(f, (next_item,), callback=lambda r: out_q.put(r))
pool.apply_async(f, (next_item,), callback=lambda r: out_q.put(r), error_callback=error_cb)

if mode == 'thread':
threading.Thread(target=thread_worker, args=(in_q.sync_q, out_q.sync_q)).start()
Expand Down

0 comments on commit 34b93af

Please sign in to comment.