-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor and cleanup gossipsub #373
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave some notes regarding the TODO
s removed.
@@ -106,7 +109,6 @@ def attach(self, pubsub: Pubsub) -> None: | |||
logger.debug("attached to pusub") | |||
|
|||
# Start heartbeat now that we have a pubsub instance | |||
# TODO: Start after delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed by adding heartbeat_initial_delay
.
@@ -127,8 +129,9 @@ def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: | |||
# instance in multistream-select, but it is not the protocol that gossipsub supports. | |||
# In this case, probably we registered gossipsub to a wrong `protocol_id` | |||
# in multistream-select, or wrong versions. | |||
# TODO: Better handling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much to be handled here. This code base should not be reached.
@@ -218,15 +235,9 @@ def _get_peers_to_send( | |||
|
|||
# gossipsub peers | |||
in_topic_gossipsub_peers: List[ID] = None | |||
# TODO: Do we need to check `topic in self.pubsub.my_topics`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked in L226
# Note: the comments here are the exact pseudocode from the spec | ||
for topic in self.fanout: | ||
# If time since last published > ttl | ||
# Delete topic entry if it's not in `pubsub.peer_topics` | ||
# or if it's time-since-last-published > ttl | ||
# TODO: there's no way time_since_last_publish gets set anywhere yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be handled in another PR.
for topic in self.mesh: | ||
msg_ids = self.mcache.window(topic) | ||
if msg_ids: | ||
# TODO: Make more efficient, possibly using a generator? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems irrelevant now IMO.
@@ -78,7 +78,6 @@ class Pubsub: | |||
|
|||
topic_validators: Dict[str, TopicValidator] | |||
|
|||
# TODO: Be sure it is increased atomically everytime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be the case.
@@ -300,7 +299,6 @@ def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...] | |||
logger.debug("Fail to add new peer %s: stream closed", peer_id) | |||
del self.peers[peer_id] | |||
return | |||
# TODO: Check EOF of this stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's checked right above.
""" | ||
Continuously read from peer queue and each time a new peer is found, | ||
open a stream to the peer using a supported pubsub protocol | ||
TODO: Handle failure for when the peer does not support any of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the unreachable code path mentioned above.
fdcd481
to
8e59122
Compare
This should be ready for review. Tasks like |
ca0ce52
to
67f02c5
Compare
Co-Authored-By: Chih Cheng Liang <chihchengliang@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kept this pretty high-level but overall looks good!
for topic in self.mesh: | ||
if peer_id in self.mesh[topic]: | ||
# Delete the entry if no other peers left | ||
self.mesh[topic].remove(peer_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I'd think we should change List
s to Set
for better performance some time.
ihave_msgs: List[rpc_pb2.ControlIHave], | ||
graft_msgs: List[rpc_pb2.ControlGraft], | ||
prune_msgs: List[rpc_pb2.ControlPrune], | ||
) -> rpc_pb2.ControlMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I understanding it correctly that this change is for piggybacking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say it's just packing the messages together and send them at once. Will open an issue for piggybacking.
use defaultdict and init control message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice work.
Co-Authored-By: Kevin Mai-Husan Chia <mhchia@users.noreply.github.com>
): | ||
if topic in self.pubsub.peer_topics: | ||
# Combine fanout peers with selected peers | ||
fanout_peers += self._get_in_topic_gossipsub_peers_from_minus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: one thing I'm thinking is to have methods like _get_fanout_peers(topic)
, _get_mesh_peers(topic)
, and _get_ peers_to_emit_ihave_to
, to make the code more readable.
Very nice! I used gossipsub in my implementation before... TL;DR, what should I change? |
@moshemalawach i'm not sure a user of the code in this PR will need to update anything beyond any API changes introduced. note that we will be migrating to |
How was it fixed?
Mainly refactoring the Gossipsub
heartbeat
.mesh_heartbeat
andgossip_heartbeat
And some other bug fixes and remove outdated
TODO
s.Cute Animal Picture
(source: https://www.maxpixels.net/Mammals-Cute-Pet-Dog-Animal-Kingdom-Portrait-3071189)