New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exit longlived greenlets if any of them fails #2208
Exit longlived greenlets if any of them fails #2208
Conversation
raiden/network/transport/matrix.py
Outdated
@@ -280,6 +280,11 @@ def stop_and_wait(self): | |||
self._client.greenlets.clear() | |||
self._client.logout() | |||
|
|||
@property | |||
def greenlet(self) -> gevent.Greenlet: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bad approach, why don't you make the RaidenService
and Transport
classes greenlets, and let them manage their child tasks instead of exposing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hackaugusto I thought about it too, but these classes aren't greenlets on theirselves, as there's no obvious loop inside (MatrixTransport
depends on the underlying _client.sync_thread
greenlet, RaidenService
doesn't actually have a loop/long-lasting _run
method or alike, APIServer
depends on the WSGIServer
instance, which isn't even a greenlet on itself nor have one, but use low-level gevent calls to asynchronously dispatch request handling, that greenlet inside it just wraps the busy wait).
Rather, they control one or more subtasks/greenlets, like AlarmTask
, GMatrixClient
and WSGIServer
, and are the responsible for starting/stopping them and being a point of control if the underlying task fails unexpectedly. How about a RaidenTask
abstract class, with start
, stop
, possibly join
methods and a long_greenlets() -> List[Greenlet]
property? This would be only for these few long-lived tasks, as the other ones should fit nicely on the standard greenlet contract
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An option would also to make a small StoppableGreenlet
abstract class (greenlet with synchronous stop
method), and make those classes inherint from it, making them effectivelly greenlets (as you asked), but in those cases, the _run
method would be a simple get
on the children tasks, which would make it re-raise underlying exceptions, if any, and cleanup and exit cleanly when stop
is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, having a standard method to stop all these classes is good, I just want to avoid the introspection used here:
Lines 1015 to 1025 in d8839ae
def stop_task(task, timeout=None): | |
try: | |
if callable(getattr(task, 'stop', None)): | |
task.stop() | |
finally: | |
if isinstance(task, gevent.Greenlet): | |
task.kill() | |
task.get(timeout=timeout) | |
elif hasattr(task, 'greenlet'): | |
task.greenlet.kill() | |
task.greenlet.get(timeout=timeout) |
raiden/tests/utils/network.py
Outdated
@@ -313,7 +313,7 @@ def create_apps( | |||
'server': 'matrix.local.raiden', | |||
}, | |||
'retries_before_backoff': retries_before_backoff, | |||
'retry_interval': retry_interval, | |||
'retry_interval': retry_interval * 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not the right place for this change, please change it on raiden.tests.fixtures.variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem is that that fixture doesn't have yet access to the transport. Will look on how to inject that there
raiden/tasks.py
Outdated
self.stop_event.set(True) | ||
self._stop_event.set(True) | ||
|
||
def stop(self, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't add timeouts for the inner layers, only use it at the outer-most layers of the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just to comply with the interface. Inner layers may leave then as None
as well
raiden/raiden_service.py
Outdated
self.blockchain_events.uninstall_all_event_listeners() | ||
except (gevent.timeout.Timeout, RaidenShuttingDown): | ||
pass | ||
log.exception('Exception on RaidenService stop') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RaidenShuttingDown
exceptions are expcted durring shutting down and should not be logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope RaidenShuttingDown
won't be needed anymore after proper stop
and exception handling here in this PR, but no hard feelings for it too
@@ -237,9 +229,6 @@ def block_number(self): | |||
""" Return the most recent block. """ | |||
return self.web3.eth.blockNumber | |||
|
|||
def inject_stop_event(self, event): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in the commit's comment: the correct approach both for exception handling and stop time should remove the need for injecting/raising an exception in web3. Instead, whatever uses it should stop and exit gracefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, could you please remove the exception RaidenShuttingDown
? This only exists because we were not relying on GreenletExit
, and it is only raised from the web3 middleware.
raiden/network/transport/matrix.py
Outdated
for delay in timeout_generator: | ||
state = self._address_to_presence.get(receiver_address) | ||
if state in reachable: | ||
self._send_raw(receiver_address, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a blocking call, correct?
The if self._stop_event.wait(delay):
bellow will not be sufficient to exit the task on a timely manner. Is the kill introduced in raiden-libs used to exit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, _send_raw
blocks, but should be very quick (only request send time), so why _stop_event.wait(delay)
isn't enough? The timeout on shutdown should ensure it won't block foreverTimeout
on stop
waiting for these greenlets
raiden/network/transport/matrix.py
Outdated
else: | ||
self.log.error( | ||
'Invalid message', | ||
message=data, | ||
) | ||
|
||
def _receive_delivered(self, delivered: Delivered): | ||
# FIXME: The signature doesn't seem to be verified - check in UDPTransport as well | ||
# FIXME: check if UDPTransport also checks Delivered sender and message presence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sender must be checked, otherwise a malicious node could forge a delivered message, which would clear the queue and stop retries.
This is only necessary because UDP is not an encrypted transport, so we need to authenticate using signatures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know.. the point is that I did that there for Matrix
, so it's just updating the FIXME
to remember to do this check also for UDP
eventually.
raiden/network/transport/matrix.py
Outdated
) | ||
discovery_room = self._client.create_room(self._discovery_room_alias, is_public=True) | ||
last_ex = None | ||
for _ in range(10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These nested exceptions are hard to read, cant you split this in two loops?
while not room_exists and retry():
try:
create_room()
except: ...
if not room_exists:
raise
while not joined and retry():
try:
join()
except: ...
if not joined:
raise
raiden/network/transport/matrix.py
Outdated
return | ||
|
||
message_id = getattr(message, 'message_identifier') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use message.message_identifier
raiden/network/transport/matrix.py
Outdated
self._client.set_presence_state(UserPresence.OFFLINE.value) | ||
|
||
try: | ||
with gevent.Timeout(timeout, 'Logging out despite unjoined greenlets.'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have a timeout on the raiden service, no need for this here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All Runnable
have the timeout
parameter for stop
, I'm just sticking to the interface/behavior, it was already there, just updated for the new interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't add the timeout here, we have had multiple discussions about this and timeouts don't compose.
You should rebase this on latest master. Lots of conflicting files @andrevmatos |
Yes, I know, but a lot of fixes which weren't directly related but needed for this PR were split on #2268, I may look into merging that first then this one on top of that. Hope to get that ready soon, only a single test failing in a non locally reproducible and weird way (gas exceeds limit error on teardown?) |
raiden/utils/runnable.py
Outdated
def __init__(self, run=None, *args, **kwargs): | ||
if run is not None: | ||
self.run = run | ||
self.args = tuple(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this call necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to ensure it's a tuple, not really needed, but painless as it's only on __init__
time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this value is a tuple
https://docs.python.org/3/reference/compound_stmts.html#function-definitions
If the form “*identifier” is present, it is initialized to a tuple receiving any excess positional parameters, defaulting to the empty tuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh.. not sure why I was thinking it was a list
, ok, I'll fix, thank you
raiden/utils/runnable.py
Outdated
def stop(self): | ||
""" Synchronous stop, gracefully tells run() to exit | ||
|
||
May wait subtasks to finish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not waiting then it's not synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it should be made synchronous
raiden/raiden_service.py
Outdated
with gevent.Timeout(self.shutdown_timeout, 'Substasks didn\'t stopped on time'): | ||
self.transport.stop() | ||
self.alarm.stop() | ||
self.transport.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the point of this call if stop
is synchronous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop
isn't intended to raise run-time exceptions, only stop-time/cleanup ones (as well as start
raise startup-time exceptions). This get
here ensures run-time exceptions are re-raised, if any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean by runtime/stoptime/cleanup exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imagine the life of a Runnable: start
-> run
-> stop
start
is synchorous, and may raise if something during startup didn't go as expected. Think on a port binding when the port is already in use, and it should prevent the task from being actually started (as some pre-start condition failed)runtime
exceptions are raised during "normal" operation, insiderun
blocking call, either by itself (in case its a loop) or by any subtask (throughkill
orget
)stop
should also be synchronous, and it should fail on cleanup exceptions (imagine acommit
on a DB which failed). No need to re-raise run-time exceptions, they should be re-raised by parent at due time withget
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to stop
then get
still don't make sense.
If you want to call get
because there could have been a "runtime" exception, then the task is dead already and calling stop
doesn't make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime exceptions may be raised because of stop
. It shouldn't be common, but can happen that stop
tells the greenlet to exit gracefully, but then an exception is raised inside it. The interface still holds.
raiden/utils/runnable.py
Outdated
self.greenlet.name = f'{self.__class__.__name__}|{self.greenlet.name}' | ||
self.greenlet.start() | ||
|
||
def run(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to stick to the greenlet interface, this should be _run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'd do that, just made it without _
to tell its part of the public interface (or at least, the interface children must care), but can make it protected too.
|
||
def stop(self): | ||
if self.stop_event.ready(): | ||
return # double call, happens on normal stop, ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since start
raises, I think calling this on a non started task should raise too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this simple return was from a left-over change where stop
could be called twice (once for graceful exit, other on run
teardown), but since run
now only calls stop
when raising, this shouldn't be the case anymore and can successfuly raise if called on non-started Runnable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just needs to remove the unused exception, that can go into another PR.
This also fixed #1149 ? |
@LefterisJP yes, most of it. |
@@ -514,15 +558,15 @@ def _handle_message(self, room, event) -> bool: | |||
"Can't parse message binary data", | |||
message_data=data, | |||
peer_address=pex(peer_address), | |||
exception=ex, | |||
_exc=ex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this changed? what is the point of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exception
is used internally by structlog
. This threw a lot of warnings in the unit tests @LefterisJP created as part of #2300 . This fixes it by using a non-conflicting parameter name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems the wrong fix IMO, either use log.exception
or log.<level>(exc_info=exc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would include the whole stacktrace of the exceptions, which isn't required here. Current approach only logs the repr
of the exception, which is enough for an entry which can spam the logs if invalid messages are being received.
Its run method will wait on any sub-task (send_with_retry greenlets and GMatrixClient loop), and re-raise if any exception happens.
- RaidenService.start is now a synchronous method, re-raises startup-time exceptions. start_async spawns a greenlet for it, which should be checked for errors - get rid of RaidenService.start_event, uses stop_event where needed - no need to check if running in web3 middleware, children will be gracefully stopped and cleaned.
[ci integration]
[ci integration]
[ci integration]
[ci integration]
[ci integration]
Implements 3rd point of #2201