-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix handling of dead async processor #909
Conversation
neptune/new/internal/operation_processors/async_operation_processor.py
Outdated
Show resolved
Hide resolved
1e9a8a3
to
1fc4ffc
Compare
8b3e0e1
to
ee55f88
Compare
ee55f88
to
3b7df1b
Compare
if not self._consumer.is_running(): | ||
exception = NeptuneSynchronizationAlreadyStoppedException() | ||
logger.warning(str(exception)) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to additionally print NeptuneSynchronizationAlreadyStoppedException()
here?
We're running flush
only from wait
, where we already check consumer for running and raise exception if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it have to do with flush? It's in _wait_for_queue_empty
called on stop().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it have to do with flush?
Eh, sorry, I've analyzed the code in github...
try: | ||
super().run() | ||
except Exception: | ||
with self._processor._waiting_cond: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's the part responsible for faster dying when sync process is already dead, but I don't know how.
Some commet would be appreciated ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just wakes waiting threads when synchronization process is dying. If not for this they would be sleeping forever waiting for synchronization. Nothing magic here.
No description provided.