Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Grab stream of networking events earlier (#3025)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed May 14, 2021
1 parent 13af232 commit d8f6170
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;

Expand Down Expand Up @@ -277,10 +278,14 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
// The stream of networking events has to be created at initialization, otherwise the
// networking might open connections before the stream of events has been grabbed.
let network_stream = self.network_service.event_stream();

// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
let future = run_network(self, ctx)
let future = run_network(self, ctx, network_stream)
.map_err(|e| {
SubsystemError::with_origin("network-bridge", e)
})
Expand Down Expand Up @@ -535,13 +540,12 @@ where
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender,
mut network_service: impl Network,
mut network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
metrics: Metrics,
shared: Shared,
) -> Result<(), UnexpectedAbort> {
let mut network_stream = network_service.event_stream();

loop {
futures::select! {
network_event = network_stream.next().fuse() => match network_event {
Expand Down Expand Up @@ -798,10 +802,11 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
/// #fn is_send<T: Send>();
/// #is_send::<parking_lot::MutexGuard<'static, ()>();
/// ```
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(bridge, ctx, network_stream), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
bridge: NetworkBridge<N, AD>,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
network_stream: BoxStream<'static, NetworkEvent>,
) -> SubsystemResult<()>
where
N: Network,
Expand All @@ -824,6 +829,7 @@ where
let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
network_stream,
authority_discovery_service.clone(),
request_multiplexer,
metrics.clone(),
Expand Down Expand Up @@ -1351,8 +1357,9 @@ mod tests {
) {
let pool = sp_core::testing::TaskExecutor::new();
let (request_multiplexer, req_configs) = RequestMultiplexer::new();
let (network, network_handle, discovery) = new_test_network(req_configs);
let (mut network, network_handle, discovery) = new_test_network(req_configs);
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let network_stream = network.event_stream();

let bridge = NetworkBridge {
network_service: network,
Expand All @@ -1365,6 +1372,7 @@ mod tests {
let network_bridge = run_network(
bridge,
context,
network_stream,
)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());
Expand Down

0 comments on commit d8f6170

Please sign in to comment.