Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backpressure in Gossipsub #4667

Open
mxinden opened this issue Oct 16, 2023 · 21 comments
Open

Backpressure in Gossipsub #4667

mxinden opened this issue Oct 16, 2023 · 21 comments
Assignees
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted priority:important The changes needed are critical for libp2p, or are blocking another project

Comments

@mxinden
Copy link
Member

mxinden commented Oct 16, 2023

Description

See #3078 for tracking issue on backpressure in rust-libp2p in general.

Today EnabledHandler::send_queue can grow unboundedly, where the Gossipsub NetworkBehaviour sends messages faster than the Gossipsub EnabledHandler can process them.

/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[proto::RPC; 16]>,

I see two solutions:

  1. Front or tail drop in EnabledHandler once send_queue reaches a certain size.
  2. Implement backpressure between Gossipsubs NetworkBehaviour and ConnectionHandler and only forward from NetworkBehaviour to ConnectionHandler when the latter can handle another message. I.e. drop messages for a specific peer in NetworkBehaviour instead of as suggested in (1) in ConnectionHandler.

Motivation

See #3078. Prevents past failures like #4572.

Requirements

Bound send_queue, i.e. make it impossible for it to grow unboundedly.

Open questions

No response

Are you planning to do it yourself in a pull request ?

No

@mxinden mxinden added priority:important The changes needed are critical for libp2p, or are blocking another project difficulty:moderate help wanted getting-started Issues that can be tackled if you don't know the internals of libp2p very well labels Oct 16, 2023
@0xcrust
Copy link
Contributor

0xcrust commented Oct 26, 2023

Hello. I'd love to take this on

@thomaseizinger
Copy link
Contributor

Hello. I'd love to take this on

That would be awesome! I'll assign you :)
Let us know if you have any questions!

@mxinden
Copy link
Member Author

mxinden commented Nov 1, 2023

#4756 implements solution (1), more specifically tail dropping in the ConnectionHandler.

@divagant-martian raised concerns with this approach. After syncing with @divagant-martian, @jxs and @thomaseizinger, here is a rough summary.

Concerns with tail drop implemented in #4756

  • Not all messages are created equal.
    • E.g. ephemeral heart beats vs. publish messages. The former are fine to drop. Dropping the latter can cause trouble.
    • Subscription and control messages are likely very small, thus not worth dropping in the first place.
    • Dropping subscription and control messages can lead to an inconsistent view of the world between two directly connected peers.
    • Dropping forward messages is less severe than dropping publish messages. The former are likely propagated by other nodes as well, for the latter, the local node might (currently) be the only source.
  • Dropping a publish message should at least surface as an error to the user, ideally through Behaviour::publish.

(Above a publish message is authored by the local node whereas a forward message is published by another node but forwarded by the local node.)

Alternative solutions

There are many solutions to fulfill these requirements. Above all, I would like to keep added complexity low.

Smarter dropping in ConnectionHandler

  • Differentiate the ConnectionHandler HandlerIn by the various types of messages. Roughly along the lines of:
    @@ -61,8 +61,10 @@ pub enum HandlerEvent {
    /// A message sent from the behaviour to the handler.
    #[derive(Debug)]
    pub enum HandlerIn {
    -    /// A gossipsub message to send.
    -    Message(proto::RPC),
    +    Publish(proto::Message),
    +    Forward(proto::Message),
    +    Subscriptions(proto::SubOpts),
    +    Control(proto::ControlMessage),
        /// The peer has joined the mesh.
        JoinedMesh,
        /// The peer has left the mesh.
  • Have a queue for Subscriptions, Control and Publish. For now, this can be unbounded. At least memory footprint wise, Subscription and Control likely don't play a big role.
  • Have a separate queue for Forward. This queue is bounded. Forward messages are delivered on a best-effort basis. Tail dropping messages on reaching the limit.
  • When sending out messages, the first queue would be prioritized over the second queue.

Concerns with this solution:

  • Publish, forward, control and subscribe would end up in separate Protobuf messages. I doubt this will have a performance impact though.
  • Order between messages of the two queues is no longer guaranteed. E.g. the local node might unsubscribe from a topic and then forward a messages for that same topic.
  • Publish messages might still grow unboundedly in the send queue.

Backpressure between ConnectionHandler and NetworkBehaviour, dropping in NetworkBehaviour.

Builds on the initially proposed solution (2) in the GitHub issue description.

Implement backpressure between Gossipsubs NetworkBehaviour and ConnectionHandler and only forward from NetworkBehaviour to ConnectionHandler when the latter can handle another message. I.e. drop messages for a specific peer in NetworkBehaviour instead of as suggested in (1) in ConnectionHandler.

  • Add a bounded channel between NetworkBehaviour and ConnectionHandler.
  • In Behaviour::publish check whether enough channels to the many ConnectionHandlers have capacity. If not, return an error to the user, ideally taking a Context to wake-up the user once there is more capacity.
  • In Behaviour::foward_msg only forward to ConnectionHandlers that have capacity in the channel.

Curious to hear other people's thoughts. @divagant-martian, @jxs, @thomaseizinger, @0xcrust

@thomaseizinger
Copy link
Contributor

Thanks for writing this up @mxinden !

I recorded a related issue in #4781 in how we would go about reducing memory usage when encoding protobuf messages.

There is some overlap in the solutions. Most importantly, both require us to pass down different messages from the behaviour to the handler.

I think a good starting point would be to do that first. We could then fan-out (no pun intended) the work and tackle message dropping and memory-efficient encoding separately.

@AgeManning
Copy link
Contributor

AgeManning commented Nov 2, 2023

I agree with the current solution. I think we shouldn't be dropping some control messages like GRAFT, PRUNE, SUBSCRIBE,UNSUBSCRIBE for the reasons that have been listed.

I know that we want to keep the complexity simple here, but my initial thoughts were to time-bound the forward messages and maybe the publish messages. I think if there are forward messages hanging around for > 300ms its prob not worth forwarding them anyway (and we should prioritize newer messages rather than trying to get old ones out the door which are now likely duplicates/waste)

There is also probably an optimisation we could do, but this one will add prob too much complexity. When we forward messages, we check to make sure we haven't seen this message from peers we want to forward to (this happens in the behaviour) because if we send it to these peers, it's a waste because it just gets registered as a duplicate. Often there is a race between us receiving the message from multiple peers and trying to forward that message on. If a node waits a bit in the validation, it often doesn't have to forward it on, because it receives it from all its mesh peers. If we have the situation where a forward message is sitting in the ConnectionHandler its very likely the case we've already seen it from the peer and we could just remove it from the send_queue.

One solution could be that in the ConnectionHandler if we receive a message then we check the send_queue and remove it if it exists from the forward or publish queues.

Anyway. This has become a pretty high priority for us, so I'm happy to help out and contribute here, but don't want to step on anyone's toes.

Is someone already planning on implementing the proposed soln by @mxinden ?

@thomaseizinger
Copy link
Contributor

Great, thanks for weighing in @AgeManning !

Regardless of which optimisations we implement, I think it is safe to say that we will need to change the interface between gossipsub's NetworkBehaviour and ConnectionHandler from currently a single proto::RPC to differently typed messages for control, publish etc.

That allows the ConnectionHandler a more fine-grained reasoning in how to handle different messages. Do we agree on this? If yes, then that could be a safe first step whilst we work out more details on what we want to optimise.

(and we should prioritize newer messages rather than trying to get old ones out the door which are now likely duplicates/waste)

This should be easy to do once we have separate queues. I'd probably hold off the time-bound optimisation for now and see what effects the others have.

One solution could be that in the ConnectionHandler if we receive a message then we check the send_queue and remove it if it exists from the forward or publish queues.

This also seems very reasonable and easy to implement because we just have to check local state.

Is someone already planning on implementing the proposed soln by @mxinden ?

I don't know of anybody actively working on. There is a related effort to reduce memory allocations in gossipsub which I will need a similar refactoring in terms of changing the interface between behaviour and handler. cc @joshuef for visibility here.

Would it be safe to say that we should always prioritize control messages over publishes? Then we could do the following: Use the regular channel between behaviour and handler for control messages (i.e. ToSwarm::NotifyHandler). Those then go into a separate queue and are always processed first inside ConnectionHandler::poll.

Then, in a separate channel (per ConnectionHandler), we can send the publish messages from behaviour to handler. Separating these would allow control messages to always "jump the queue".

@AgeManning
Copy link
Contributor

Yeah nice.

Others may want to weigh in here, but its not clear cut which messages should get prioritised. There is a control message called IHAVE and I think that one can be prioritised lower than anything else. As we are not going to be this fine grained (i.e individual control messages) my gut feeling would be the following prioritisation:

  1. Publish messages
  2. Subscription
  3. Control messages
  4. Forward messages

However, if we split up the GRAFT/PRUNE from the IHAVE/IWANT. I think it would go like:

  1. Publish
  2. Subscribe/Unsubscribe
  3. Graft/Prune
  4. Forward Message
  5. IHAVE/IWANT

@mxinden
Copy link
Member Author

mxinden commented Nov 5, 2023

  • Publish messages
  • Subscription
  • Control messages
  • Forward messages

The goal here is to reduce the memory footprint. Subscribe and control messages are tiny in size (correct me if I am wrong). What would be the benefit of prioritizing publish messages over subscribe and control messages? Why not always put subscribe and control first? They don't take up much memory. Given their size, they don't delay either publish or forward messages significantly.

@AgeManning
Copy link
Contributor

My thinking was that the most important thing, the one that we want to reduce the latency the most on, is published messages. We want to send those with very high priority. I agree that control messages are tiny and if they get queued we can probably group them in the handler into a single rpc message.
Perhaps a better grouping is:

  1. Control - (This is a single RPC message that groups, subscribe/unsubscribe, graft,prune)
  2. Publish
  3. IHAVE/IWANT (Single RPC with IHAVE/IWANT)
  4. Forward

The handler pulls from the queues in this order and groups them if we can into a single RPC.

I think the most important thing we need to address is the dropping strategy. I think we probably just need to drop forward messages and maybe have some cap on the published messages.

The forward messages are time-sensitive, so I still think we should have some time-based component to it. Time-LRU cache or something.

@jxs
Copy link
Member

jxs commented Nov 13, 2023

After #4811 me and @thomaseizinger discussed how could we then move to implement what has been discussed in this issue. I plan if everybody agrees, on submitting a PR that adds a new unidirectional direct communication channel between the NetworkBehaviour and the ConnectionHandler tp follow @mxinden's initial (2) suggestion.

This communication could be something like a bounded async-priority-channel, we impl a priority list based on the one @AgeManning defined above, and ConnectionHandler's pick and send messages accordingly. This way Control and Publish messages will always be transferred first than Forward and IHAVE / IWANT prioritizing the flow of new data into the network.

If the communication channel queue between the NetworkBehaviour and the ConnectionHandler becomes full, the NetworkBehaviour stops sending messages to that ConnectionHandler and eventually (if it isn't able to make progress) removes that ConnectionHandler from its list clearing its queue and the associated message allocations.
Pausing the sending of messages to a ConnectionHandler matching a slow peer might not be enough as its message queue may be exclusive which will still keep the allocated memory. In Lighthouse's case we feel that a single peer by not consuming its queue is enough for it to grow to 10-18gb in size (we have also seen cases of 50gb allocated memory on a node). The network transfer of messages from the ConnectionHandler to the peer is sequential, even with the prioritization if a peer doesn't make progress reading it's not being useful.

This won't affect other communication protocols as it will only be removed from the list of channels in the NetworkBehavior. This also won't affect messages received as the ConnectionHandlers still bubble them up via HandlerEvent.

CC @AgeManning @thomaseizinger @mxinden @joshuef feel free to correct/add/address anything I might have missed/miswritten.

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Nov 14, 2023

Thanks for the write-up @jxs!

If the communication channel queue between the NetworkBehaviour and the ConnectionHandler becomes full, the NetworkBehaviour stops sending messages to that ConnectionHandler and eventually (if it isn't able to make progress) removes that ConnectionHandler from its list clearing its queue and the associated message allocations.

I am not familiar enough with gossipsub to say this for sure but my take would have been: Swap the handler to DisabledHandler::TooSlow or something like that immediately once the channel is full. I don't see any reason why we should keep this peer around and wait for it to catch-up? To the remote peer, it it will look like as if we don't support the gossipsub protocol anymore on this connection, i.e. protocol negotiation will fail if they try to reestablish the stream. If the connection gets closed and reestablished, we'd start from a clean state again.

I think that is a sensible behaviour. If the remote node is genuinely slow, closing the gossipsub streams will free up resources on both ends. If no other protocol is active as a result, the keep-alive algorithm will close the connection. If the remote node is malicious and purposely doesn't read from the stream, we'll close it eventually and refuse to speak to it going forward (on this connection).

Handler and behaviour run in different tasks, meaning the only reason why the handler would not be reading from the channel is because the stream is too slow. The idea @jxs and myself were exploring is to directly hook up the Receiver of async-priority-channel with the stream and make it a contained task that is essentially just a loop of reading from the channel and writing to the stream.

The channel already acts as a buffer so there isn't any need to have another buffer within ConnectionHandler that we write messages to before they get sent into the stream. All we need to do is pick a channel size that we are comfortable in filling without running out of memory. Assuming a max message size of 65k, buffering 100 messages would mean at most a memory consumption of 6.5 MB per connection. Assuming 100 connections would result in 650MB max buffer size.

@divagant-martian
Copy link
Contributor

divagant-martian commented Nov 14, 2023

A couple of questions/notes about this:

  • What's the benefit of a priority queue in this case? The priority here has been defined mostly on terms of "how much can we afford to drop this message" which is the reason I raised concerns about dropping publish messages without a clear application error in the first place.
    However, using a priority queue here allows/ is being used to effectively send messages in a different order, which helps in

    prioritizing the flow of new data into the network.

    but it won't help in preventing the queue from exploding. So this particular benefit (which is potentially debatable since there might be unforeseen effects of changing the message order) I don't think is directly helping with the issue at hand.

  • Regarding what needs to be done when the queue is full I agree that, most likely, the best approach is to drop the peer. If the peer is dropped while having a publish message on its queue, this is still a far better approach to feat(gossipsub): bound send-queue  #4756 since a peer won't be able to remain connected while silently forcing us to drop these important messages. Also, we might or might not want to look into how to bubble up this error back to the application. For some gossipsub users not having an error is perhaps acceptable, for others it might not. It's not clear to me what the answer here is but I suspect it to be relatively involved for what it is (understanding if the message was sent to at least a peer) so potentially not worth doing until the need arises.

@thomaseizinger
Copy link
Contributor

What's the benefit of a priority queue in this case?

Great question! I think the benefit is that we end up sending the most important messages first, meaning if it does fill up, the messages being dropped should only be unimportant ones.

@mxinden
Copy link
Member Author

mxinden commented Nov 14, 2023

I don't think a priority queue, e.g. https://github.com/rmcgibbo/async-priority-channel, is the simplest solution to the problem at hand, namely to prevent send_queue from becoming very large.

Say that the priority queue is full. It has a limit of 10 messages. 5 of these messages are forward messages. Say that the local node wants to send a publish message. In such case the forward messages would prevent the publish message from entering the queue to the ConnectionHandler.

Instead I suggest the following:

Have two channels from NetworkBehaviour to ConnectionHandler.

Control and publish message channel (aka. priority channel)

  • bounded channel
  • prioritized by the ConnectionHandler, i.e. when the outbound stream can handle more data, it first checks whether something is in this channel
  • NetworkBehaviour uses it to send control and publish messages
  • for control messages, NetworkBehaviour can simply do sender.clone().try_send(control_msg).expect("new sender to always have capacity"), they are not large and thus are fine to exceed the bound
  • for publish messages, NetworkBehaviour checks if channel has capacity
    • if so, adds the publish message to the channel
    • if not, and none of the other connections have capacity, returns NoSufficientPeers on the Behaviour::publish call
  • nice property here is that control and publish messages are still ordered

forward message channel (aka. non-priority channel)

  • bounded channel
  • not prioritized by the ConnectionHandler, i.e. when the outbound stream can handle more data, it first checks whether a message from the first channel can be send
  • NetworkBehaviour uses it to send forward messages
  • in case channel has no capacity, NetworkBehaviour does not forward the message to the ConnectionHandler and thus does not forward the message to the remote

time sensitive forward messages

The forward messages are time-sensitive, so I still think we should have some time-based component to it. Time-LRU cache or something.

We can add a if forward_msg.age() > MAX_FORWARD_MSG_AGE in the ConnectionHandler before sending them out through the outbound stream. Alternatively, instead of a FIFO channel, we could as well use a FILO datastructure.

I recommend doing this as a follow-up. If I understand correctly the most pressing issue is to bound the send_queue, i.e. prevent unbounded memory growth (this GitHub issue). Second priority is the timely delivery of messages, especially forward messages.

@thomaseizinger
Copy link
Contributor

Thanks for the comments @divagant-martian @mxinden! I now see that the priority-channel was a detour 😅

The solution sketched above looks good, thank you @mxinden !

@jxs
Copy link
Member

jxs commented Nov 14, 2023

Hi Max, and thanks for the input!

Say that the priority queue is full. It has a limit of 10 messages. 5 of these messages are forward messages. Say that the local node wants to send a publish message. In such case the forward messages would prevent the publish message from entering the queue to the ConnectionHandler.

Yeah but that will also happen if we use two channels. If the peer doesn't make progress reading bytes of a Forward message the ConnectionHandler won't check for messages in any of the channels so the queues will just both fill up right? And in that case we drop the peer.
The difference I see is that with the two queues we have a bigger buffer for Publish messages as it has its own queue, but if we follow your suggestion (which I think makes total sense) we could discard the > MAX_FORWARD_MSG_AGE messages and clear the queue of no longer useful Forward messages.

nice property here is that control and publish messages are still ordered

we can also achieve this with the async-priority-channel by setting the same priority for Control and Publish messages AFAIU, async-priority-channel uses a BinaryHeap underneath and impl's Ord for it's Item, so when comparing one won't take priority over the other.

if not, and none of the other connections have capacity, returns NoSufficientPeers on the Behaviour::publish call

yeah also makes sense to me, I take it you agree that we also drop the peer and its queue in that case right?

@mxinden
Copy link
Member Author

mxinden commented Nov 16, 2023

Say that the priority queue is full. It has a limit of 10 messages. 5 of these messages are forward messages. Say that the local node wants to send a publish message. In such case the forward messages would prevent the publish message from entering the queue to the ConnectionHandler.

Yeah but that will also happen if we use two channels.

Two channels guarantee that forward messages don't take away capacity of publish messages. E.g. in the case of a single (priority-) queue with a capacity of 10, one might have 10 forward messages, where an additional publish message would need to be dropped. In the case of two channels, each with a capacity of 5, forward messages can not starve publish messages.

The difference I see is that with the two queues we have a bigger buffer for Publish messages as it has its own queue, but if we follow your suggestion (which I think makes total sense) we could discard the > MAX_FORWARD_MSG_AGE messages and clear the queue of no longer useful Forward messages.

But forward messages might still starve publish messages. Say that the queue contains 10 forward messages each < MAX_FORWARD_MSG_AGE, an additional publish message would need to be dropped.

if not, and none of the other connections have capacity, returns NoSufficientPeers on the Behaviour::publish call

yeah also makes sense to me, I take it you agree that we also drop the peer and its queue in that case right?

I am undecided here and thus I suggest not dropping the peer as part of a first iteration of this GitHub issue. Networks might want to still support slow peers, as long as they don't exhaust the local nodes memory (e.g. through filling up send_queue).

Under the assumption that the Gossipsub scoring mechanism is functional (which I think is not a safe assumption to make), I would expect slow peers to eventually be scored low and thus disconnected.

Under the assumption that the Gossipsub scoring mechanism is not functional, I suggest simply emitting an event, leaving it up to the user whether to disconnect or not.

@jxs
Copy link
Member

jxs commented Nov 16, 2023

Meanwhile, while trying to implement the channels solution, the smoke tests fail with timeout, this seems to be because as the NetworkBehaviour doesn't return NotifyHandler when polled and directly sends the message to the ConnectionHandler the handler is polled much less regularly as the Behaviour will take precedence. So probably makes sense to communicate via NotifyHandler and not use channels directly.

Update: My bad wasn't registering the channel to be awake on a new message, as I was not polling but using try_recv().

I also discovered that the Behaviour establishes its own message priority by adding some Control messages to a pool that is only processed on the next heartbeat, as this #4875 shows.

@mxinden
Copy link
Member Author

mxinden commented Nov 18, 2023

Meanwhile, while trying to implement the channels solution, the smoke tests fail with timeout, this seems to be because as the NetworkBehaviour doesn't return NotifyHandler when polled and directly sends the message to the ConnectionHandler the handler is polled much less regularly as the Behaviour will take precedence.

That sounds like a bug in either libp2p-swarm or libp2p-gossipsub. The ConnectionHandler is running on a separate task and should only be polled less frequent in case the channel to the NetworkBehaviour is blocked.

So probably makes sense to communicate via NotifyHandler and not use channels directly.

Note that NotifyHandler has no backpressure. There is no way for the ConnectionHandler to signal to the NetworkBehaviour that its send_queue is full.

@AgeManning
Copy link
Contributor

I just wanted to throw into the mix the possibility that it could be our node that is slow. I think so far the assumption has been slow peers.
If our node has saturated bandwidth we will likely hit this scenario also and it may be that gossipsub is just using more bandwidth than our node supports. We probably don't want to disconnect all our peers in this case, so might need some heuristic to identify this situation over a single peer or a few drawing from the stream slowly.

@thomaseizinger
Copy link
Contributor

I just wanted to throw into the mix the possibility that it could be our node that is slow. I think so far the assumption has been slow peers. If our node has saturated bandwidth we will likely hit this scenario also and it may be that gossipsub is just using more bandwidth than our node supports. We probably don't want to disconnect all our peers in this case, so might need some heuristic to identify this situation over a single peer or a few drawing from the stream slowly.

Right. So all things considered, it appears that we can solve the OOM issue by using two bounded channels.

Initially, we can leave out the optimisation of dropping old forward messages. That can be added with a timer later. Initially, we can also not drop peers if the channel is full but simply return InsufficientPeers if we can't publish to any peers.

I think that is a basic but coherent solution. Perhaps we can add some metrics as well to track the sizes of these channels.

We could even go with a single channel to start with and only introduce the 2nd channel once we drop old forward messages.

mergify bot pushed a commit that referenced this issue Nov 22, 2023
Previously, the `NetworkBehaviour` constructed a `proto::RPC` type and sent that into an unbounded queue to the `ConnectionHandler`. With such a broad type, the `ConnectionHandler` cannot do any prioritization on which messages to drop if a connection slows down.

To enable a more fine-granular handling of messages, we make the interface between `NetworkBehaviour` and `ConnectionHandler` more specific by introducing an `RpcOut` type that differentiates between `Publish`, `Forward`, `Subscribe`, etc.

Related: #4667.

Pull-Request: #4811.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted priority:important The changes needed are critical for libp2p, or are blocking another project
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants