Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow to broadcast network messages in parallel #7542

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,099 changes: 1,602 additions & 1,497 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.2.0" }

metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features=["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
19 changes: 19 additions & 0 deletions node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,16 @@ impl Metrics {

pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&["report_peer"]).inc();
metrics.report_events.inc()
}
}

pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}
}

#[derive(Clone)]
Expand All @@ -123,6 +130,8 @@ pub(crate) struct MetricsInner {

bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,

messages_sent: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -217,6 +226,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down
1 change: 1 addition & 0 deletions node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};

Expand Down
52 changes: 45 additions & 7 deletions node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use super::*;

use always_assert::never;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{
stream::{BoxStream, FuturesUnordered, StreamExt},
Future,
};
use parity_scale_codec::{Decode, DecodeAll};

use sc_network::Event as NetworkEvent;
Expand Down Expand Up @@ -55,6 +58,7 @@ pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::{
collections::{hash_map, HashMap},
iter::ExactSizeIterator,
pin::Pin,
};

use super::validator_discovery;
Expand Down Expand Up @@ -890,21 +894,55 @@ fn dispatch_collation_event_to_all_unbounded(
}
}

fn try_send_validation_event<E>(
event: E,
sender: &mut (impl overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>),
delayed_queue: &FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
) where
E: Send + 'static,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}

async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
FuturesUnordered::new();
for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to ignore a possible error?

try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
try_send_validation_event(msg, sender, &delayed_messages);
}
}

let _: Vec<()> = delayed_messages.collect().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically the bridge is still stopped until all futures complete here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but other subsystems have a chance to receive updates if some other subsystem is slow. That's the goal of this PR technically.

}

async fn dispatch_collation_events_to_all<I>(
Expand Down
11 changes: 11 additions & 0 deletions node/network/bridge/src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use polkadot_node_subsystem::{
///
/// To be passed to [`FullNetworkConfiguration::add_notification_protocol`]().
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::request_response::Requests;
use sc_network::ReputationChange;

use crate::validator_discovery;
Expand Down Expand Up @@ -261,6 +262,16 @@ where
);

for req in reqs {
match req {
Requests::ChunkFetchingV1(_) => metrics.on_message("chunk_fetching_v1"),
Requests::AvailableDataFetchingV1(_) =>
metrics.on_message("available_data_fetching_v1"),
Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
Requests::StatementFetchingV1(_) => metrics.on_message("statement_fetching_v1"),
}

network_service
.start_request(
&mut authority_discovery_service,
Expand Down
7 changes: 4 additions & 3 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-metrics = { path = "../metrics" }
polkadot-primitives = { path = "../../primitives" }
orchestra = "0.0.5"
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false }
gum = { package = "tracing-gum", path = "../gum" }
lru = "0.9"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.57"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }

[dev-dependencies]
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features=["futures_channel"] }

sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.21", features = ["thread-pool"] }
Expand All @@ -35,7 +35,8 @@ test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../pri
tikv-jemalloc-ctl = "0.5.0"

[features]
default = []
default = ["futures_channel"]
expand = ["orchestra/expand"]
dotgraph = ["orchestra/dotgraph"]
futures_channel = ["orchestra/futures_channel", "metered/futures_channel"]
jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
1 change: 1 addition & 0 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub use orchestra::{
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra,
TrySendError,
};

/// Store 2 days worth of blocks, not accounting for forks,
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-jaeger = { path = "../jaeger" }
orchestra = "0.0.5"
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-overseer = { path = "../overseer" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features=["futures_channel"] }

sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down