Skip to content

Commit

Permalink
Revert #1409 partially
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov committed Sep 17, 2023
1 parent 11d1a39 commit 9b3b7ca
Showing 1 changed file with 9 additions and 53 deletions.
62 changes: 9 additions & 53 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use super::*;
use always_assert::never;
use bytes::Bytes;
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered, StreamExt},
stream::{BoxStream, StreamExt},
};
use parity_scale_codec::{Decode, DecodeAll};

Expand Down Expand Up @@ -748,6 +747,7 @@ where

futures::pin_mut!(orchestra_signal_handler);

// TODO: use `select_with_strategy` here to prefer signals processing
futures::future::select(orchestra_signal_handler, network_event_handler)
.await
.factor_first()
Expand Down Expand Up @@ -1044,65 +1044,21 @@ fn dispatch_collation_event_to_all_unbounded(
}
}

fn send_or_queue_validation_event<E, Sender>(
event: E,
sender: &mut Sender,
delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
) where
E: Send + 'static,
Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}

async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
metrics: &Metrics,
_metrics: &Metrics,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();

// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
// the slow path future in the `delayed_messages` queue.
for event in events {
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
}

let delayed_messages_count = delayed_messages.len();
metrics.on_delayed_rx_queue(delayed_messages_count);

if delayed_messages_count > 0 {
// Here we wait for all the delayed messages to be sent.
let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed
let _: Vec<()> = delayed_messages.collect().await;
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
}
}

Expand Down

0 comments on commit 9b3b7ca

Please sign in to comment.