Skip to content

Commit

Permalink
Allow to broadcast network messages in parallel (#1409)
Browse files Browse the repository at this point in the history
This PR addresses multiple issues pending:

* [x] Update orchestra to the recent version and test how the node
performs
* [x] Add some useful metrics for outbound network bridge
* [x] Try to send incoming network requests to all subsystems without
blocking on some particular subsystem in that loop
* [x] Fix all incompatibilities between orchestra and polkadot code
(e.g. malus node)
  • Loading branch information
vstakhov committed Sep 11, 2023
1 parent 2c8021f commit 44dbb73
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 56 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-inprocess-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
# Polkadot
polkadot-primitives = { path = "../../../polkadot/primitives" }
polkadot-test-client = { path = "../../../polkadot/node/test/client" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }

# Cumulus
cumulus-test-service = { path = "../../test/service" }
Expand Down
62 changes: 53 additions & 9 deletions polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@ where
Some(msg)
}

/// Modify outgoing messages.
/// Specifies if we need to replace some outgoing message with another (potentially empty)
/// message
fn need_intercept_outgoing(
&self,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> bool {
false
}
/// Send modified message instead of the original one
fn intercept_outgoing(
&self,
msg: <Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
Some(msg)
None
}
}

Expand All @@ -66,7 +74,7 @@ pub struct InterceptedSender<Sender, Fil> {
#[async_trait::async_trait]
impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
where
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static,
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
Sender: overseer::SubsystemSender<OutgoingMessage>
+ overseer::SubsystemSender<
<
Expand All @@ -78,17 +86,48 @@ where
<
<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages:
From<OutgoingMessage>,
From<OutgoingMessage> + Send + Sync,
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_message(msg).await;
}
}
else {
self.inner.send_message(msg).await;
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(real_msg).map_err(|e| {
match e {
TrySendError::Full(_) => TrySendError::Full(orig_msg),
TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
}
})
}
else {
// No message to send after intercepting
Ok(())
}
}
else {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(orig_msg)
}
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = OutgoingMessage> + Send,
Expand All @@ -101,9 +140,14 @@ where

fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_unbounded_message(msg);
}
}
else {
self.inner.send_unbounded_message(msg);
}
}
Expand Down
7 changes: 0 additions & 7 deletions polkadot/node/malus/src/variants/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,4 @@ where
msg => Some(msg),
}
}

fn intercept_outgoing(
&self,
msg: overseer::CandidateValidationOutgoingMessages,
) -> Option<overseer::CandidateValidationOutgoingMessages> {
Some(msg)
}
}
3 changes: 1 addition & 2 deletions polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.2.0" }

metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { path = "../../../substrate/client/service" }
sc-cli = { path = "../../../substrate/client/cli" }
Expand Down
53 changes: 53 additions & 0 deletions polkadot/node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,27 @@ impl Metrics {

pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
self.on_message("report_peer");
metrics.report_events.inc()
}
}

pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}

pub fn on_delayed_rx_queue(&self, queue_size: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics.rx_delayed_processing.observe(queue_size as f64);
}
}
pub fn time_delayed_rx_events(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.rx_delayed_processing_time.start_timer())
}
}

#[derive(Clone)]
Expand All @@ -123,6 +141,13 @@ pub(crate) struct MetricsInner {

bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,

messages_sent: prometheus::CounterVec<prometheus::U64>,
// The reason why a `Histogram` is used to track a queue size is that
// we need not only an average size of the queue (that will be 0 normally), but
// we also need a dynamics for this queue size in case of messages delays.
rx_delayed_processing: prometheus::Histogram,
rx_delayed_processing_time: prometheus::Histogram,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -217,6 +242,34 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
rx_delayed_processing: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed",
"Number of events being delayed while broadcasting from the network bridge",
).buckets(vec![0.0, 1.0, 2.0, 8.0, 16.0]),
)?,
registry,
)?,
rx_delayed_processing_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed_time",
"Time spent for waiting of the delayed events",
),
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};

Expand Down
Loading

0 comments on commit 44dbb73

Please sign in to comment.