-
Notifications
You must be signed in to change notification settings - Fork 194
Register for disconnected event notification by pubsub #340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register for disconnected event notification by pubsub #340
Conversation
| await stream.reset() | ||
| # Force context switch for stream handlers to process the stream reset event we just emit | ||
| # before we cancel the stream handler tasks. | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a code smell
I would suggest looking into a mechanism to allow you to directly await this type of thing. For example, await stream.reset() could return an object that could then be awaited and wouldn't return until the underlying handlers have processed the event. Otherwise you're likely to continue to have to letter random sleeps into your code and I can say from experience in trinity that we don't want this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await stream.reset() along with await self.muxed_conn.close() on L40 will set the events for Mplex and Mplex_stream. This signals that Mplex and its Mplex_streams are shutting down. Then when stream handler try to read from stream it will catch StreamReset error and properly finishes.
So the problem is that changing the state of the stream does not guarantee that the stream handler task that's reading from the stream will detect the state change and finish right after. So in this case if we cancel tasks right after await stream.reset() the stream handler task will be cancelled instead of catching StreamReset error and properly finishes.
Maybe we can cancel other tasks and wait for stream handler task to finish?
- fix await stream close/reset - make `_handle_dead_peer` a sync function
1097b41 to
b4a3724
Compare
3350571 to
b8c7f0c
Compare
mhchia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good to me. I left some comments about disconnecting.
| :param network: network the connection was opened on | ||
| :param conn: connection that was opened | ||
| """ | ||
| await self.dead_peers_queue.put(conn.muxed_conn.peer_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I understand handle_dead_peer_queue is called twice when a peer disconnects us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, _handle_dead_peer might be called twice. Once when we disconnect from peer(network notify pubsub) and once when pubsub detect that the connection has closed.
Implement
disconnected()inPubsubNotifeeso pubsub can be notified of peer who has disconnected from us and remove the peer's info from pubsub records.I believe this can close #306 (or it's already solved?).