Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to broadcast network messages in parallel #1409

Merged
merged 15 commits into from
Sep 11, 2023
Merged
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-inprocess-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
# Polkadot
polkadot-primitives = { path = "../../../polkadot/primitives" }
polkadot-test-client = { path = "../../../polkadot/node/test/client" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }

# Cumulus
cumulus-test-service = { path = "../../test/service" }
Expand Down
62 changes: 53 additions & 9 deletions polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@ where
Some(msg)
}

/// Modify outgoing messages.
/// Specifies if we need to replace some outgoing message with another (potentially empty)
/// message
fn need_intercept_outgoing(
&self,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> bool {
false
}
/// Send modified message instead of the original one
fn intercept_outgoing(
&self,
msg: <Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
Some(msg)
None
}
}

Expand All @@ -66,7 +74,7 @@ pub struct InterceptedSender<Sender, Fil> {
#[async_trait::async_trait]
impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
where
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static,
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
Sender: overseer::SubsystemSender<OutgoingMessage>
+ overseer::SubsystemSender<
<
Expand All @@ -78,17 +86,48 @@ where
<
<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages:
From<OutgoingMessage>,
From<OutgoingMessage> + Send + Sync,
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_message(msg).await;
}
}
else {
self.inner.send_message(msg).await;
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(real_msg).map_err(|e| {
match e {
TrySendError::Full(_) => TrySendError::Full(orig_msg),
TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
}
})
}
else {
// No message to send after intercepting
Ok(())
}
}
else {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(orig_msg)
}
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = OutgoingMessage> + Send,
Expand All @@ -101,9 +140,14 @@ where

fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_unbounded_message(msg);
}
}
else {
self.inner.send_unbounded_message(msg);
}
}
Expand Down
7 changes: 0 additions & 7 deletions polkadot/node/malus/src/variants/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,4 @@ where
msg => Some(msg),
}
}

fn intercept_outgoing(
&self,
msg: overseer::CandidateValidationOutgoingMessages,
) -> Option<overseer::CandidateValidationOutgoingMessages> {
Some(msg)
}
}
3 changes: 1 addition & 2 deletions polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.2.0" }

metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { path = "../../../substrate/client/service" }
sc-cli = { path = "../../../substrate/client/cli" }
Expand Down
53 changes: 53 additions & 0 deletions polkadot/node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,27 @@ impl Metrics {

pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
self.on_message("report_peer");
metrics.report_events.inc()
}
}

pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}

pub fn on_delayed_rx_queue(&self, queue_size: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics.rx_delayed_processing.observe(queue_size as f64);
}
}
pub fn time_delayed_rx_events(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.rx_delayed_processing_time.start_timer())
}
}

#[derive(Clone)]
Expand All @@ -123,6 +141,13 @@ pub(crate) struct MetricsInner {

bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,

messages_sent: prometheus::CounterVec<prometheus::U64>,
// The reason why a `Histogram` is used to track a queue size is that
// we need not only an average size of the queue (that will be 0 normally), but
// we also need a dynamics for this queue size in case of messages delays.
rx_delayed_processing: prometheus::Histogram,
rx_delayed_processing_time: prometheus::Histogram,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -217,6 +242,34 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
rx_delayed_processing: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed",
"Number of events being delayed while broadcasting from the network bridge",
).buckets(vec![0.0, 1.0, 2.0, 8.0, 16.0]),
)?,
registry,
)?,
rx_delayed_processing_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed_time",
"Time spent for waiting of the delayed events",
),
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};

Expand Down