Skip to content
This repository has been archived by the owner on Oct 28, 2019. It is now read-only.

Commit

Permalink
Merge pull request #3604 from gmbnomis/asyncio_stages_cancel
Browse files Browse the repository at this point in the history
DeclarativeVersion: Cancel all remaining stages if one stage fails with an exception
  • Loading branch information
bmbouter committed Aug 27, 2018
2 parents 50605b0 + d0f773c commit a175c11
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
16 changes: 14 additions & 2 deletions plugin/pulpcore/plugin/stages/api.py
Expand Up @@ -52,9 +52,21 @@ async def create_pipeline(stages, maxsize=100):
futures = []
for stage in stages:
out_q = asyncio.Queue(maxsize=maxsize)
futures.append(stage(in_q, out_q))
futures.append(asyncio.ensure_future(stage(in_q, out_q)))
in_q = out_q
await asyncio.gather(*futures)
try:
await asyncio.gather(*futures)
except Exception:
# One of the stages raised an exception, cancel all stages...
pending = []
for task in futures:
if not task.done():
task.cancel()
pending.append(task)
# ...and run until all Exceptions show up
if pending:
await asyncio.wait(pending, timeout=60)
raise


class EndStage(Stage):
Expand Down
28 changes: 17 additions & 11 deletions pulpcore/pulpcore/app/models/progress.py
Expand Up @@ -2,6 +2,8 @@
Django models related to progress reporting
"""
from gettext import gettext as _

from asyncio import CancelledError
import logging
import datetime

Expand Down Expand Up @@ -91,11 +93,14 @@ def __enter__(self):

def __exit__(self, type, value, traceback):
"""
Update the progress report state to COMPLETED or FAILED.
Update the progress report state to COMPLETED, CANCELED, or FAILED.
If an exception occurs the progress report state is saved as:
- CANCELED if the exception is `asyncio.CancelledError`
- FAILED otherwise.
If an exception occurs the progress report state is saved as FAILED and the exception is
not suppressed. If the context manager exited without exception the progress report state
is saved as COMPLETED.
The exception is not suppressed. If the context manager exited without
exception the progress report state is saved as COMPLETED.
See the context manager documentation for more info on __exit__ parameters
"""
Expand All @@ -104,10 +109,11 @@ def __exit__(self, type, value, traceback):
self.total = self.done
if type is None:
self.state = TASK_STATES.COMPLETED
self.save()
elif type is CancelledError:
self.state = TASK_STATES.CANCELED
else:
self.state = TASK_STATES.FAILED
self.save()
self.save()


class ProgressSpinner(ProgressReport):
Expand All @@ -128,8 +134,8 @@ class ProgressSpinner(ProgressReport):
>>> metadata_progress.save()
The ProgressSpinner() is a context manager that provides automatic state transitions and saving
for the RUNNING COMPLETED and FAILED states. When ProgressSpinner() is used as a context
manager progress reporting is rate limited to every 500 milliseconds.
for the RUNNING CANCELED COMPLETED and FAILED states. When ProgressSpinner() is used as a
context manager progress reporting is rate limited to every 500 milliseconds.
Use it as follows:
>>> spinner = ProgressSpinner(message='Publishing Metadata')
Expand Down Expand Up @@ -171,9 +177,9 @@ class ProgressBar(ProgressReport):
>>> progress_bar.save()
The ProgressBar() is a context manager that provides automatic state transitions and saving for
the RUNNING COMPLETED and FAILED states. The increment() method can be called in the loop as
work is completed. When ProgressBar() is used as a context manager progress reporting is rate
limited to every 500 milliseconds.
the RUNNING CANCELED COMPLETED and FAILED states. The increment() method can be called in the
loop as work is completed. When ProgressBar() is used as a context manager progress reporting
is rate limited to every 500 milliseconds.
Use it as follows:
>>> progress_bar = ProgressBar(message='Publishing files', total=len(files_iterator))
Expand Down

0 comments on commit a175c11

Please sign in to comment.