Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to broadcast network messages in parallel #1409

Merged
merged 15 commits into from
Sep 11, 2023
Merged
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-inprocess-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
# Polkadot
polkadot-primitives = { path = "../../../polkadot/primitives" }
polkadot-test-client = { path = "../../../polkadot/node/test/client" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }

# Cumulus
cumulus-test-service = { path = "../../test/service" }
Expand Down
62 changes: 53 additions & 9 deletions polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@ where
Some(msg)
}

/// Modify outgoing messages.
/// Specifies if we need to replace some outgoing message with another (potentially empty)
/// message
fn need_intercept_outgoing(
&self,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> bool {
false
}
/// Send modified message instead of the original one
fn intercept_outgoing(
&self,
msg: <Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
Some(msg)
None
}
}

Expand All @@ -66,7 +74,7 @@ pub struct InterceptedSender<Sender, Fil> {
#[async_trait::async_trait]
impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
where
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static,
OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
Sender: overseer::SubsystemSender<OutgoingMessage>
+ overseer::SubsystemSender<
<
Expand All @@ -78,17 +86,48 @@ where
<
<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages:
From<OutgoingMessage>,
From<OutgoingMessage> + Send + Sync,
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_message(msg).await;
}
}
else {
self.inner.send_message(msg).await;
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(real_msg).map_err(|e| {
match e {
TrySendError::Full(_) => TrySendError::Full(orig_msg),
TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
}
})
}
else {
// No message to send after intercepting
Ok(())
}
}
else {
let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
self.inner.try_send_message(orig_msg)
}
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = OutgoingMessage> + Send,
Expand All @@ -101,9 +140,14 @@ where

fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
if self.message_filter.need_intercept_outgoing(&msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
self.inner.send_unbounded_message(msg);
}
}
else {
self.inner.send_unbounded_message(msg);
}
}
Expand Down
7 changes: 0 additions & 7 deletions polkadot/node/malus/src/variants/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,4 @@ where
msg => Some(msg),
}
}

fn intercept_outgoing(
&self,
msg: overseer::CandidateValidationOutgoingMessages,
) -> Option<overseer::CandidateValidationOutgoingMessages> {
Some(msg)
}
}
3 changes: 1 addition & 2 deletions polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.2.0" }

metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { path = "../../../substrate/client/service" }
sc-cli = { path = "../../../substrate/client/cli" }
Expand Down
19 changes: 19 additions & 0 deletions polkadot/node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,16 @@ impl Metrics {

pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
self.on_message("report_peer");
metrics.report_events.inc()
}
}

pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}
}

#[derive(Clone)]
Expand All @@ -123,6 +130,8 @@ pub(crate) struct MetricsInner {

bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,

messages_sent: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -217,6 +226,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};

Expand Down
54 changes: 47 additions & 7 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use super::*;

use always_assert::never;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered, StreamExt},
};
use parity_scale_codec::{Decode, DecodeAll};

use sc_network::Event as NetworkEvent;
Expand Down Expand Up @@ -1038,21 +1041,58 @@ fn dispatch_collation_event_to_all_unbounded(
}
}

fn try_send_validation_event<E>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the name a bit misleading. The try suggest that we don't wait when full - which is true, but we also don't give up. Maybe send_or_queue_validation_event?

event: E,
sender: &mut (impl overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add the trait bound to the where clause

delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
) where
E: Send + 'static,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}

async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();

// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
// the slow path future in the `delayed_messages` queue.
for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
}

// Here we wait for all the delayed messages to be sent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add a metric that tells us how much we wait here. This will be helpful to measure the level of back pressure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best metric type here will be a Histogram, on the other hand it will be quite expensive in terms of the metrics space. WDYT about adding a histogram with low amount of buckets (like 0, 1, 5, 10)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it would be sufficient to just have a counter that measure how much time is spent there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we still need a histogram for time I suppose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also add a metric for the size of the delayed_messages queue

let _: Vec<()> = delayed_messages.collect().await;
}

async fn dispatch_collation_events_to_all<I>(
Expand Down
15 changes: 15 additions & 0 deletions polkadot/node/network/bridge/src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use polkadot_node_subsystem::{
///
/// To be passed to [`FullNetworkConfiguration::add_notification_protocol`]().
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::request_response::Requests;
use sc_network::ReputationChange;

use crate::validator_discovery;
Expand Down Expand Up @@ -290,6 +291,20 @@ where
);

for req in reqs {
match req {
Requests::ChunkFetchingV1(_) => metrics.on_message("chunk_fetching_v1"),
Requests::AvailableDataFetchingV1(_) =>
metrics.on_message("available_data_fetching_v1"),
Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
Requests::CollationFetchingVStaging(_) =>
metrics.on_message("collation_fetching_vstaging"),
Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
Requests::StatementFetchingV1(_) => metrics.on_message("statement_fetching_v1"),
Requests::AttestedCandidateVStaging(_) =>
metrics.on_message("attested_candidate_vstaging"),
}

network_service
.start_request(
&mut authority_discovery_service,
Expand Down
9 changes: 5 additions & 4 deletions polkadot/node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-metrics = { path = "../metrics" }
polkadot-primitives = { path = "../../primitives" }
orchestra = "0.0.5"
orchestra = { version = "0.3.3", default-features = false, features=["futures_channel"] }
gum = { package = "tracing-gum", path = "../gum" }
schnellru = "0.2.1"
sp-core = { path = "../../../substrate/primitives/core" }
async-trait = "0.1.57"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }

[dev-dependencies]
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
sp-core = { path = "../../../substrate/primitives/core" }
futures = { version = "0.3.21", features = ["thread-pool"] }
femme = "2.2.1"
Expand All @@ -36,7 +36,8 @@ node-test-helpers = { package = "polkadot-node-subsystem-test-helpers", path = "
tikv-jemalloc-ctl = "0.5.0"

[features]
default = []
expand = [ "orchestra/expand" ]
default = [ "futures_channel" ]
dotgraph = [ "orchestra/dotgraph" ]
expand = [ "orchestra/expand" ]
futures_channel = [ "metered/futures_channel", "orchestra/futures_channel" ]
jemalloc-allocator = [ "dep:tikv-jemalloc-ctl" ]
Loading