Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): introduce backpressure #5595

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

jxs
Copy link
Member

@jxs jxs commented Sep 10, 2024

Description

superseeds #4914 with some changes and improvements, namely:

  • introduce a Delay for Forward and Publish messages, messages that take more than the configured delay to be sent are discarded
  • introduce scoring and penalize slow peers
  • remove control pool
  • report slow peers with the number of failed messages

@jxs jxs force-pushed the impl-gossipsub-backpressure-2 branch from 6d406e1 to 5e349f5 Compare September 10, 2024 22:44
@jxs jxs force-pushed the impl-gossipsub-backpressure-2 branch from 5e349f5 to 00cde64 Compare September 10, 2024 22:52
Copy link
Contributor

@ackintosh ackintosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve left a few small comments.

// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) {
for sender_queue in self.connected_peers.values().map(|v| &v.sender) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Akihito, updated!

Comment on lines 345 to 349
// Slow peer weighting
if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
score += excess * self.params.slow_peer_weight;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this nested if statement (Slow peer weighting) outside of P7: behavioural pattern penalty?

https://github.com/sigp/lighthouse/blob/bbe9242811b5dc14d991506bc4d076f0ce98b3cb/beacon_node/lighthouse_network/gossipsub/src/peer_score.rs#L339-L350

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, thanks Akihito. Updated!

Copy link
Contributor

@guillaumemichel guillaumemichel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add new tests for the introduced backpressure

Comment on lines +173 to +175
slow_peer_weight: -0.2,
slow_peer_threshold: 0.0,
slow_peer_decay: 0.2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do these numbers come from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are made up as suggestions.

These are rough estimates from what we expect on real networks. I.e They are minor penalties relative to the behaviour penalty.

We didn't want to penalize peers too much, so having 50 failures to 1 behaviour penalty seemed reasonable.

Note that these are not on by default and it is generally recommended that users set their own scoring based on their own networks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't want to penalize peers too much, so having 50 failures to 1 behaviour penalty seemed reasonable.

OK, makes sense to me! Since this is defining the default for PeerScoreParams, it should be a safe and documented value (even if not enabled by default). IMO a comment saying that the slow_peer_weight is 50x lower than behaviour_penalty_weight would help.

protocols/gossipsub/src/config.rs Show resolved Hide resolved
protocols/gossipsub/src/config.rs Show resolved Hide resolved
protocols/gossipsub/src/config.rs Show resolved Hide resolved
Comment on lines 603 to 608
pub(crate) fn new(cap: usize) -> RpcSender {
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
let len = Arc::new(AtomicUsize::new(0));
RpcSender {
cap: cap / 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reasoning of using cap / 2 instead of cap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause we have two channels priority and non_priority left a comment elaborating on that, thanks Gui

protocols/gossipsub/src/types.rs Show resolved Hide resolved
Comment on lines +684 to +685
if self.len.load(Ordering::Relaxed) >= self.cap {
return Err(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.cap is only used in publish. why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because publish calls to self.priority_sender which is unbounded, and we need it unbounded to send control messages such as GRAFT, PRUNE, SUBSCRIBE, and UNSUBSCRIBE. Left a comment explaining this, thanks Gui!

protocols/gossipsub/src/types.rs Outdated Show resolved Hide resolved
protocols/gossipsub/src/peer_score.rs Show resolved Hide resolved
protocols/gossipsub/src/behaviour.rs Show resolved Hide resolved
Copy link
Member Author

@jxs jxs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews Akihito and Gui!

// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Akihito, updated!

Comment on lines 345 to 349
// Slow peer weighting
if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
score += excess * self.params.slow_peer_weight;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, thanks Akihito. Updated!

protocols/gossipsub/src/behaviour.rs Show resolved Hide resolved
protocols/gossipsub/src/config.rs Show resolved Hide resolved
protocols/gossipsub/src/types.rs Show resolved Hide resolved
Comment on lines 603 to 608
pub(crate) fn new(cap: usize) -> RpcSender {
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
let len = Arc::new(AtomicUsize::new(0));
RpcSender {
cap: cap / 2,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause we have two channels priority and non_priority left a comment elaborating on that, thanks Gui

Comment on lines +684 to +685
if self.len.load(Ordering::Relaxed) >= self.cap {
return Err(());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because publish calls to self.priority_sender which is unbounded, and we need it unbounded to send control messages such as GRAFT, PRUNE, SUBSCRIBE, and UNSUBSCRIBE. Left a comment explaining this, thanks Gui!

protocols/gossipsub/src/types.rs Outdated Show resolved Hide resolved
protocols/gossipsub/src/types.rs Outdated Show resolved Hide resolved
protocols/gossipsub/src/types.rs Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants