Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert #1409 partially #1603

Merged
merged 8 commits into from
Sep 18, 2023
64 changes: 9 additions & 55 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use super::*;

use always_assert::never;
use bytes::Bytes;
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered, StreamExt},
};
use futures::stream::{BoxStream, StreamExt};
use parity_scale_codec::{Decode, DecodeAll};

use sc_network::Event as NetworkEvent;
Expand Down Expand Up @@ -748,6 +745,7 @@ where

futures::pin_mut!(orchestra_signal_handler);

// TODO: use `select_with_strategy` here to prefer signals processing
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
futures::future::select(orchestra_signal_handler, network_event_handler)
.await
.factor_first()
Expand Down Expand Up @@ -1044,65 +1042,21 @@ fn dispatch_collation_event_to_all_unbounded(
}
}

fn send_or_queue_validation_event<E, Sender>(
event: E,
sender: &mut Sender,
delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
) where
E: Send + 'static,
Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}

async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
metrics: &Metrics,
_metrics: &Metrics,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();

// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
// the slow path future in the `delayed_messages` queue.
for event in events {
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
}

let delayed_messages_count = delayed_messages.len();
metrics.on_delayed_rx_queue(delayed_messages_count);

if delayed_messages_count > 0 {
// Here we wait for all the delayed messages to be sent.
let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is useful, why not keep it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR just reverts the "bad things", so I've decided to leave it as simple and quick as possible. I'm working on a better version in the meantime on top of this one.

let _: Vec<()> = delayed_messages.collect().await;
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
}
}

Expand Down