diff --git a/Cargo.lock b/Cargo.lock index eefa2f873ab4a..7bffd9e169159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9038,6 +9038,7 @@ name = "sc-consensus-beefy" version = "4.0.0-dev" dependencies = [ "array-bytes 4.2.0", + "async-channel", "async-trait", "fnv", "futures", @@ -9426,6 +9427,7 @@ dependencies = [ name = "sc-network-bitswap" version = "0.10.0-dev" dependencies = [ + "async-channel", "cid", "futures", "libp2p-identity", @@ -9502,6 +9504,7 @@ name = "sc-network-light" version = "0.10.0-dev" dependencies = [ "array-bytes 4.2.0", + "async-channel", "futures", "libp2p-identity", "log", @@ -9543,6 +9546,7 @@ name = "sc-network-sync" version = "0.10.0-dev" dependencies = [ "array-bytes 4.2.0", + "async-channel", "async-trait", "fork-tree", "futures", diff --git a/client/consensus/beefy/Cargo.toml b/client/consensus/beefy/Cargo.toml index 161d53777ebc1..7e3ddf688fe5e 100644 --- a/client/consensus/beefy/Cargo.toml +++ b/client/consensus/beefy/Cargo.toml @@ -10,6 +10,7 @@ homepage = "https://substrate.io" [dependencies] array-bytes = "4.1" +async-channel = "1.8.0" async-trait = "0.1.57" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } fnv = "1.0.6" diff --git a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs index d4f4b59f0195e..6ed954da57d87 100644 --- a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs +++ b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs @@ -17,10 +17,7 @@ //! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer. use codec::Decode; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; +use futures::{channel::oneshot, StreamExt}; use log::{debug, trace}; use sc_client_api::BlockBackend; use sc_network::{ @@ -102,11 +99,11 @@ impl IncomingRequest { /// /// Takes care of decoding and handling of invalid encoded requests. pub(crate) struct IncomingRequestReceiver { - raw: mpsc::Receiver, + raw: async_channel::Receiver, } impl IncomingRequestReceiver { - pub fn new(inner: mpsc::Receiver) -> Self { + pub fn new(inner: async_channel::Receiver) -> Self { Self { raw: inner } } diff --git a/client/consensus/beefy/src/communication/request_response/mod.rs b/client/consensus/beefy/src/communication/request_response/mod.rs index 545ab18cf1d34..1801512fa5421 100644 --- a/client/consensus/beefy/src/communication/request_response/mod.rs +++ b/client/consensus/beefy/src/communication/request_response/mod.rs @@ -23,7 +23,6 @@ pub(crate) mod outgoing_requests_engine; pub use incoming_requests_handler::BeefyJustifsRequestHandler; -use futures::channel::mpsc; use std::time::Duration; use codec::{Decode, Encode, Error as CodecError}; @@ -54,7 +53,7 @@ pub(crate) fn on_demand_justifications_protocol_config>( ) -> (IncomingRequestReceiver, RequestResponseConfig) { let name = justifications_protocol_name(genesis_hash, fork_id); let fallback_names = vec![]; - let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE); + let (tx, rx) = async_channel::bounded(JUSTIF_CHANNEL_SIZE); let rx = IncomingRequestReceiver::new(rx); let cfg = RequestResponseConfig { name, diff --git a/client/network/bitswap/Cargo.toml b/client/network/bitswap/Cargo.toml index a953676ec160e..0a3c569708531 100644 --- a/client/network/bitswap/Cargo.toml +++ b/client/network/bitswap/Cargo.toml @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.11" [dependencies] +async-channel = "1.8.0" cid = "0.8.6" futures = "0.3.21" libp2p-identity = { version = "0.1.2", features = ["peerid"] } diff --git a/client/network/bitswap/src/lib.rs b/client/network/bitswap/src/lib.rs index a7857f6eec362..beaaa8fd0fdec 100644 --- a/client/network/bitswap/src/lib.rs +++ b/client/network/bitswap/src/lib.rs @@ -21,7 +21,7 @@ //! CID is expected to reference 256-bit Blake2b transaction hash. use cid::{self, Version}; -use futures::{channel::mpsc, StreamExt}; +use futures::StreamExt; use libp2p_identity::PeerId; use log::{debug, error, trace}; use prost::Message; @@ -93,13 +93,13 @@ impl Prefix { /// Bitswap request handler pub struct BitswapRequestHandler { client: Arc + Send + Sync>, - request_receiver: mpsc::Receiver, + request_receiver: async_channel::Receiver, } impl BitswapRequestHandler { /// Create a new [`BitswapRequestHandler`]. pub fn new(client: Arc + Send + Sync>) -> (Self, ProtocolConfig) { - let (tx, request_receiver) = mpsc::channel(MAX_REQUEST_QUEUE); + let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE); let config = ProtocolConfig { name: ProtocolName::from(PROTOCOL_NAME), @@ -289,7 +289,7 @@ pub enum BitswapError { #[cfg(test)] mod tests { use super::*; - use futures::{channel::oneshot, SinkExt}; + use futures::channel::oneshot; use sc_block_builder::BlockBuilderProvider; use schema::bitswap::{ message::{wantlist::Entry, Wantlist}, diff --git a/client/network/light/Cargo.toml b/client/network/light/Cargo.toml index cd0dfbca50d2a..ec2c2c077fc8c 100644 --- a/client/network/light/Cargo.toml +++ b/client/network/light/Cargo.toml @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.11" [dependencies] +async-channel = "1.8.0" array-bytes = "4.1" codec = { package = "parity-scale-codec", version = "3.2.2", features = [ "derive", diff --git a/client/network/light/src/light_client_requests/handler.rs b/client/network/light/src/light_client_requests/handler.rs index 2a68ebe9c2b23..2a0047b40e784 100644 --- a/client/network/light/src/light_client_requests/handler.rs +++ b/client/network/light/src/light_client_requests/handler.rs @@ -24,7 +24,7 @@ use crate::schema; use codec::{self, Decode, Encode}; -use futures::{channel::mpsc, prelude::*}; +use futures::prelude::*; use libp2p_identity::PeerId; use log::{debug, trace}; use prost::Message; @@ -43,9 +43,13 @@ use std::{marker::PhantomData, sync::Arc}; const LOG_TARGET: &str = "light-client-request-handler"; +/// Incoming requests bounded queue size. For now due to lack of data on light client request +/// handling in production systems, this value is chosen to match the block request limit. +const MAX_LIGHT_REQUEST_QUEUE: usize = 20; + /// Handler for incoming light client requests from a remote peer. pub struct LightClientRequestHandler { - request_receiver: mpsc::Receiver, + request_receiver: async_channel::Receiver, /// Blockchain client. client: Arc, _block: PhantomData, @@ -62,9 +66,7 @@ where fork_id: Option<&str>, client: Arc, ) -> (Self, ProtocolConfig) { - // For now due to lack of data on light client request handling in production systems, this - // value is chosen to match the block request limit. - let (tx, request_receiver) = mpsc::channel(20); + let (tx, request_receiver) = async_channel::bounded(MAX_LIGHT_REQUEST_QUEUE); let mut protocol_config = super::generate_protocol_config( protocol_id, diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index e0f4074e0a22e..e21ff3a3412d9 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -36,10 +36,7 @@ use crate::{types::ProtocolName, ReputationChange}; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, -}; +use futures::{channel::oneshot, prelude::*}; use libp2p::{ core::{Endpoint, Multiaddr}, request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel}, @@ -126,7 +123,7 @@ pub struct ProtocolConfig { /// other peers. If this is `Some` but the channel is closed, then the local node will /// advertise support for this protocol, but any incoming request will lead to an error being /// sent back. - pub inbound_queue: Option>, + pub inbound_queue: Option>, } /// A single request received by a peer on a request-response protocol. @@ -259,8 +256,10 @@ pub struct RequestResponsesBehaviour { /// /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: - HashMap, Option>)>, + protocols: HashMap< + ProtocolName, + (Behaviour, Option>), + >, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: @@ -295,7 +294,10 @@ struct MessageRequest { request: Vec, channel: ResponseChannel, ()>>, protocol: ProtocolName, - resp_builder: Option>, + // A builder used for building responses for incoming requests. Note that we use + // `async_channel` and not `mpsc` on purpose, because `mpsc::channel` allocates an extra + // message slot for every cloned `Sender` and this breaks a back-pressure mechanism. + resp_builder: Option>, // Once we get incoming request we save all params, create an async call to Peerset // to get the reputation of the peer. get_peer_reputation: Pin> + Send>>, @@ -618,10 +620,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Submit the request to the "response builder" passed by the user at // initialization. - if let Some(mut resp_builder) = resp_builder { + if let Some(resp_builder) = resp_builder { // If the response builder is too busy, silently drop `tx`. This // will be reported by the corresponding request-response [`Behaviour`] // through an `InboundFailure::Omission` event. + // Note that we use `async_channel::bounded` and not `mpsc::channel` + // because the latter allocates an extra slot for every cloned sender. let _ = resp_builder.try_send(IncomingRequest { peer, payload: request, @@ -1036,11 +1040,7 @@ impl Codec for GenericCodec { mod tests { use super::*; - use futures::{ - channel::{mpsc, oneshot}, - executor::LocalPool, - task::Spawn, - }; + use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; use libp2p::{ core::{ transport::{MemoryTransport, Transport}, @@ -1112,7 +1112,7 @@ mod tests { // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { - let (tx, mut rx) = mpsc::channel::(64); + let (tx, mut rx) = async_channel::bounded::(64); pool.spawner() .spawn_obj( @@ -1215,7 +1215,7 @@ mod tests { // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { - let (tx, mut rx) = mpsc::channel::(64); + let (tx, mut rx) = async_channel::bounded::(64); pool.spawner() .spawn_obj( @@ -1353,8 +1353,8 @@ mod tests { }; let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = { - let (tx_1, rx_1) = mpsc::channel(64); - let (tx_2, rx_2) = mpsc::channel(64); + let (tx_1, rx_1) = async_channel::bounded(64); + let (tx_2, rx_2) = async_channel::bounded(64); let protocol_configs = vec![ ProtocolConfig { diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index ce713596011fb..81d39d87f54f3 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -17,6 +17,7 @@ prost-build = "0.11" [dependencies] array-bytes = "4.1" +async-channel = "1.8.0" async-trait = "0.1.58" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } futures = "0.3.21" diff --git a/client/network/sync/src/block_request_handler.rs b/client/network/sync/src/block_request_handler.rs index 256c0ad382b92..df17d56ba59f7 100644 --- a/client/network/sync/src/block_request_handler.rs +++ b/client/network/sync/src/block_request_handler.rs @@ -23,10 +23,7 @@ use crate::{ }; use codec::{Decode, Encode}; -use futures::{ - channel::{mpsc, oneshot}, - stream::StreamExt, -}; +use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::debug; use lru::LruCache; @@ -136,7 +133,7 @@ enum SeenRequestsValue { /// Handler for incoming block requests from a remote peer. pub struct BlockRequestHandler { client: Arc, - request_receiver: mpsc::Receiver, + request_receiver: async_channel::Receiver, /// Maps from request to number of times we have seen this request. /// /// This is used to check if a peer is spamming us with the same request. @@ -157,7 +154,7 @@ where ) -> (Self, ProtocolConfig) { // Reserve enough request slots for one request per peer when we are at the maximum // number of peers. - let (tx, request_receiver) = mpsc::channel(num_peer_hint); + let (tx, request_receiver) = async_channel::bounded(num_peer_hint); let mut protocol_config = generate_protocol_config( protocol_id, diff --git a/client/network/sync/src/state_request_handler.rs b/client/network/sync/src/state_request_handler.rs index 93597453aa8a2..e5b0da6ce1501 100644 --- a/client/network/sync/src/state_request_handler.rs +++ b/client/network/sync/src/state_request_handler.rs @@ -20,10 +20,7 @@ use crate::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse}; use codec::{Decode, Encode}; -use futures::{ - channel::{mpsc, oneshot}, - stream::StreamExt, -}; +use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::{debug, trace}; use lru::LruCache; @@ -114,7 +111,7 @@ enum SeenRequestsValue { /// Handler for incoming block requests from a remote peer. pub struct StateRequestHandler { client: Arc, - request_receiver: mpsc::Receiver, + request_receiver: async_channel::Receiver, /// Maps from request to number of times we have seen this request. /// /// This is used to check if a peer is spamming us with the same request. @@ -135,7 +132,7 @@ where ) -> (Self, ProtocolConfig) { // Reserve enough request slots for one request per peer when we are at the maximum // number of peers. - let (tx, request_receiver) = mpsc::channel(num_peer_hint); + let (tx, request_receiver) = async_channel::bounded(num_peer_hint); let mut protocol_config = generate_protocol_config( protocol_id, diff --git a/client/network/sync/src/warp_request_handler.rs b/client/network/sync/src/warp_request_handler.rs index 7061d6485d092..a49a65af51d0b 100644 --- a/client/network/sync/src/warp_request_handler.rs +++ b/client/network/sync/src/warp_request_handler.rs @@ -17,10 +17,7 @@ //! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer. use codec::Decode; -use futures::{ - channel::{mpsc, oneshot}, - stream::StreamExt, -}; +use futures::{channel::oneshot, stream::StreamExt}; use log::debug; use sc_network::{ @@ -36,6 +33,9 @@ use std::{sync::Arc, time::Duration}; const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024; +/// Incoming warp requests bounded queue size. +const MAX_WARP_REQUEST_QUEUE: usize = 20; + /// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing /// incoming requests. pub fn generate_request_response_config>( @@ -72,7 +72,7 @@ fn generate_legacy_protocol_name(protocol_id: ProtocolId) -> String { /// Handler for incoming grandpa warp sync requests from a remote peer. pub struct RequestHandler { backend: Arc>, - request_receiver: mpsc::Receiver, + request_receiver: async_channel::Receiver, } impl RequestHandler { @@ -83,7 +83,7 @@ impl RequestHandler { fork_id: Option<&str>, backend: Arc>, ) -> (Self, RequestResponseConfig) { - let (tx, request_receiver) = mpsc::channel(20); + let (tx, request_receiver) = async_channel::bounded(MAX_WARP_REQUEST_QUEUE); let mut request_response_config = generate_request_response_config(protocol_id, genesis_hash, fork_id);