Skip to content

Commit

Permalink
fix: add timeout to protocol notifications + log improvements (#3143)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

<!--- Describe your changes in detail -->
- protocol notifications now have a set "safety" timeout.
- add log for inbound comms pipeline concurrency usage

<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
Reports of inbound messaging being blocked. This will shed some light.
The logs show that a single client node is excessively trying to create multiple
`t/bn-wallet//1` (rpc) sessions.

<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
Log observed in base node

<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
stringhandler committed Aug 3, 2021
2 parents ee9a225 + a98a698 commit 7701846
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 51 deletions.
2 changes: 1 addition & 1 deletion comms/dht/src/store_forward/store.rs
Expand Up @@ -226,7 +226,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError> + Se
}

if message.dht_header.message_type.is_saf_message() {
log_not_eligible("it is a SAF message");
log_not_eligible("it is a SAF protocol message");
return Ok(None);
}

Expand Down
10 changes: 9 additions & 1 deletion comms/src/bounded_executor.rs
Expand Up @@ -40,13 +40,15 @@ pub struct TrySpawnError;
pub struct BoundedExecutor {
inner: runtime::Handle,
semaphore: Arc<Semaphore>,
max_available: usize,
}

impl BoundedExecutor {
pub fn new(executor: runtime::Handle, num_permits: usize) -> Self {
Self {
inner: executor,
semaphore: Arc::new(Semaphore::new(num_permits)),
max_available: num_permits,
}
}

Expand All @@ -70,12 +72,18 @@ impl BoundedExecutor {
self.num_available() > 0
}

/// Returns the number tasks that can be spawned on this executor without blocking.
/// Returns the remaining number of tasks that can be spawned on this executor without waiting.
#[inline]
pub fn num_available(&self) -> usize {
self.semaphore.available_permits()
}

/// Returns the maximum number of concurrent tasks that can be spawned on this executor without waiting.
#[inline]
pub fn max_available(&self) -> usize {
self.max_available
}

pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, TrySpawnError>
where
F: Future + Send + 'static,
Expand Down
26 changes: 18 additions & 8 deletions comms/src/connection_manager/manager.rs
Expand Up @@ -392,15 +392,25 @@ where
node_id.short_str(),
proto_str
);
if let Err(err) = self
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream))
.await
{
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err
);
.notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err
);
},
Err(err) => {
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
_ => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
}
},

Expand Down
13 changes: 13 additions & 0 deletions comms/src/pipeline/inbound.rs
Expand Up @@ -71,6 +71,18 @@ where
return;
}
let service = self.service.clone();

let num_available = self.executor.num_available();
let max_available = self.executor.max_available();
// Only emit this message if there is any concurrent usage
if num_available < max_available {
debug!(
target: LOG_TARGET,
"Inbound pipeline usage: {}/{}",
max_available - num_available,
max_available
);
}
// Call the service in it's own spawned task
self.executor
.spawn(async move {
Expand All @@ -80,6 +92,7 @@ where
})
.await;
}
info!(target: LOG_TARGET, "Inbound pipeline terminated: the stream completed");
}
}

Expand Down
33 changes: 0 additions & 33 deletions comms/src/protocol/messaging/consts.rs

This file was deleted.

20 changes: 16 additions & 4 deletions comms/src/protocol/messaging/extension.rs
Expand Up @@ -26,7 +26,7 @@ use crate::{
message::InboundMessage,
pipeline,
protocol::{
messaging::{consts, protocol::MESSAGING_PROTOCOL, MessagingEventSender},
messaging::{protocol::MESSAGING_PROTOCOL, MessagingEventSender},
ProtocolExtension,
ProtocolExtensionContext,
ProtocolExtensionError,
Expand All @@ -38,6 +38,18 @@ use futures::channel::mpsc;
use std::fmt;
use tower::Service;

/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming
/// messages before creating backpressure on peers speaking the messaging protocol.
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100;
/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low
/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10
/// peers to concurrently request to speak /tari/messaging.
pub const MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE: usize = 30;

/// Buffer size for requests to the messaging protocol. All outbound messages will be sent along this channel. Some
/// buffering may be required if the node needs to send many messages out at the same time.
pub const MESSAGING_REQUEST_BUFFER_SIZE: usize = 50;

pub struct MessagingProtocolExtension<TInPipe, TOutPipe, TOutReq> {
event_tx: MessagingEventSender,
pipeline: pipeline::Config<TInPipe, TOutPipe, TOutReq>,
Expand All @@ -60,11 +72,11 @@ where
TOutReq: Send + 'static,
{
fn install(self: Box<Self>, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> {
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (proto_tx, proto_rx) = mpsc::channel(MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
context.add_protocol(&[MESSAGING_PROTOCOL.clone()], proto_tx);

let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(INBOUND_MESSAGE_BUFFER_SIZE);

let messaging = MessagingProtocol::new(
Default::default(),
Expand Down
2 changes: 0 additions & 2 deletions comms/src/protocol/messaging/mod.rs
Expand Up @@ -26,8 +26,6 @@ pub use config::MessagingConfig;
mod extension;
pub use extension::MessagingProtocolExtension;

mod consts;

mod error;
mod inbound;
mod outbound;
Expand Down
2 changes: 1 addition & 1 deletion comms/src/protocol/rpc/server/router.rs
Expand Up @@ -208,7 +208,7 @@ where
<B::Service as Service<Request<Bytes>>>::Future: Send + 'static,
{
fn install(self: Box<Self>, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> {
let (proto_notif_tx, proto_notif_rx) = mpsc::channel(10);
let (proto_notif_tx, proto_notif_rx) = mpsc::channel(20);
context.add_protocol(&self.protocol_names, proto_notif_tx);
let rpc_context = RpcCommsBackend::new(context.peer_manager(), context.connectivity());
task::spawn(self.serve(proto_notif_rx, rpc_context));
Expand Down
1 change: 0 additions & 1 deletion integration_tests/features/WalletBaseNodeSwitch.feature
@@ -1,6 +1,5 @@
Feature: Wallet Base Node Switch

@doit
Scenario: As a user I want to change base node for a wallet
Given I have a base node Node1 connected to all seed nodes
And I have a base node Node2 connected to all seed nodes
Expand Down

0 comments on commit 7701846

Please sign in to comment.