diff --git a/comms/dht/src/store_forward/store.rs b/comms/dht/src/store_forward/store.rs index 32144df2a7..4393f36518 100644 --- a/comms/dht/src/store_forward/store.rs +++ b/comms/dht/src/store_forward/store.rs @@ -226,7 +226,7 @@ where S: Service + Se } if message.dht_header.message_type.is_saf_message() { - log_not_eligible("it is a SAF message"); + log_not_eligible("it is a SAF protocol message"); return Ok(None); } diff --git a/comms/src/bounded_executor.rs b/comms/src/bounded_executor.rs index 239b938b2e..f82b4a9f84 100644 --- a/comms/src/bounded_executor.rs +++ b/comms/src/bounded_executor.rs @@ -40,6 +40,7 @@ pub struct TrySpawnError; pub struct BoundedExecutor { inner: runtime::Handle, semaphore: Arc, + max_available: usize, } impl BoundedExecutor { @@ -47,6 +48,7 @@ impl BoundedExecutor { Self { inner: executor, semaphore: Arc::new(Semaphore::new(num_permits)), + max_available: num_permits, } } @@ -70,12 +72,18 @@ impl BoundedExecutor { self.num_available() > 0 } - /// Returns the number tasks that can be spawned on this executor without blocking. + /// Returns the remaining number of tasks that can be spawned on this executor without waiting. #[inline] pub fn num_available(&self) -> usize { self.semaphore.available_permits() } + /// Returns the maximum number of concurrent tasks that can be spawned on this executor without waiting. + #[inline] + pub fn max_available(&self) -> usize { + self.max_available + } + pub fn try_spawn(&self, future: F) -> Result, TrySpawnError> where F: Future + Send + 'static, diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs index a1a9bffe9d..d339af71ea 100644 --- a/comms/src/connection_manager/manager.rs +++ b/comms/src/connection_manager/manager.rs @@ -392,15 +392,25 @@ where node_id.short_str(), proto_str ); - if let Err(err) = self + let notify_fut = self .protocols - .notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream)) - .await - { - error!( - target: LOG_TARGET, - "Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err - ); + .notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream)); + match time::timeout(Duration::from_secs(10), notify_fut).await { + Ok(Err(err)) => { + error!( + target: LOG_TARGET, + "Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err + ); + }, + Err(err) => { + error!( + target: LOG_TARGET, + "Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err + ); + }, + _ => { + debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str); + }, } }, diff --git a/comms/src/pipeline/inbound.rs b/comms/src/pipeline/inbound.rs index c2035cf9f0..0b2116bc37 100644 --- a/comms/src/pipeline/inbound.rs +++ b/comms/src/pipeline/inbound.rs @@ -71,6 +71,18 @@ where return; } let service = self.service.clone(); + + let num_available = self.executor.num_available(); + let max_available = self.executor.max_available(); + // Only emit this message if there is any concurrent usage + if num_available < max_available { + debug!( + target: LOG_TARGET, + "Inbound pipeline usage: {}/{}", + max_available - num_available, + max_available + ); + } // Call the service in it's own spawned task self.executor .spawn(async move { @@ -80,6 +92,7 @@ where }) .await; } + info!(target: LOG_TARGET, "Inbound pipeline terminated: the stream completed"); } } diff --git a/comms/src/protocol/messaging/consts.rs b/comms/src/protocol/messaging/consts.rs deleted file mode 100644 index 483512a39f..0000000000 --- a/comms/src/protocol/messaging/consts.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2020, The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming -/// messages before creating backpressure on peers speaking the messaging protocol. -pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100; -/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low -/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10 -/// peers to concurrently request to speak /tari/messaging. -pub const MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE: usize = 10; - -/// Buffer size for requests to the messaging protocol. All outbound messages will be sent along this channel. Some -/// buffering may be required if the node needs to send many messages out at the same time. -pub const MESSAGING_REQUEST_BUFFER_SIZE: usize = 50; diff --git a/comms/src/protocol/messaging/extension.rs b/comms/src/protocol/messaging/extension.rs index 58b8a67248..241a152a5b 100644 --- a/comms/src/protocol/messaging/extension.rs +++ b/comms/src/protocol/messaging/extension.rs @@ -26,7 +26,7 @@ use crate::{ message::InboundMessage, pipeline, protocol::{ - messaging::{consts, protocol::MESSAGING_PROTOCOL, MessagingEventSender}, + messaging::{protocol::MESSAGING_PROTOCOL, MessagingEventSender}, ProtocolExtension, ProtocolExtensionContext, ProtocolExtensionError, @@ -38,6 +38,18 @@ use futures::channel::mpsc; use std::fmt; use tower::Service; +/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming +/// messages before creating backpressure on peers speaking the messaging protocol. +pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100; +/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low +/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10 +/// peers to concurrently request to speak /tari/messaging. +pub const MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE: usize = 30; + +/// Buffer size for requests to the messaging protocol. All outbound messages will be sent along this channel. Some +/// buffering may be required if the node needs to send many messages out at the same time. +pub const MESSAGING_REQUEST_BUFFER_SIZE: usize = 50; + pub struct MessagingProtocolExtension { event_tx: MessagingEventSender, pipeline: pipeline::Config, @@ -60,11 +72,11 @@ where TOutReq: Send + 'static, { fn install(self: Box, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> { - let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE); + let (proto_tx, proto_rx) = mpsc::channel(MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE); context.add_protocol(&[MESSAGING_PROTOCOL.clone()], proto_tx); - let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE); - let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE); + let (messaging_request_tx, messaging_request_rx) = mpsc::channel(MESSAGING_REQUEST_BUFFER_SIZE); + let (inbound_message_tx, inbound_message_rx) = mpsc::channel(INBOUND_MESSAGE_BUFFER_SIZE); let messaging = MessagingProtocol::new( Default::default(), diff --git a/comms/src/protocol/messaging/mod.rs b/comms/src/protocol/messaging/mod.rs index 732bc73c0c..88fca6af05 100644 --- a/comms/src/protocol/messaging/mod.rs +++ b/comms/src/protocol/messaging/mod.rs @@ -26,8 +26,6 @@ pub use config::MessagingConfig; mod extension; pub use extension::MessagingProtocolExtension; -mod consts; - mod error; mod inbound; mod outbound; diff --git a/comms/src/protocol/rpc/server/router.rs b/comms/src/protocol/rpc/server/router.rs index 1e15434b69..9d03c6535d 100644 --- a/comms/src/protocol/rpc/server/router.rs +++ b/comms/src/protocol/rpc/server/router.rs @@ -208,7 +208,7 @@ where >>::Future: Send + 'static, { fn install(self: Box, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> { - let (proto_notif_tx, proto_notif_rx) = mpsc::channel(10); + let (proto_notif_tx, proto_notif_rx) = mpsc::channel(20); context.add_protocol(&self.protocol_names, proto_notif_tx); let rpc_context = RpcCommsBackend::new(context.peer_manager(), context.connectivity()); task::spawn(self.serve(proto_notif_rx, rpc_context)); diff --git a/integration_tests/features/WalletBaseNodeSwitch.feature b/integration_tests/features/WalletBaseNodeSwitch.feature index 403b35f674..cf18bd9cd9 100644 --- a/integration_tests/features/WalletBaseNodeSwitch.feature +++ b/integration_tests/features/WalletBaseNodeSwitch.feature @@ -1,6 +1,5 @@ Feature: Wallet Base Node Switch - @doit Scenario: As a user I want to change base node for a wallet Given I have a base node Node1 connected to all seed nodes And I have a base node Node2 connected to all seed nodes