Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove: (#8748)
Browse files Browse the repository at this point in the history
* `NetworkStatusSinks`
* `sc_service::SpawnTasksParams::network_status_sinks`

Also:
* `sc_service::build_network()` does not return `network_status_sinks`
  • Loading branch information
kpp committed May 27, 2021
1 parent 1cbf0e1 commit 14fcad9
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 132 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions bin/node-template/node/src/service.rs
Expand Up @@ -149,7 +149,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>

config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config());

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -199,7 +199,6 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
on_demand: None,
remote_blockchain: None,
backend,
network_status_sinks,
system_rpc_tx,
config,
telemetry: telemetry.as_mut(),
Expand Down Expand Up @@ -370,7 +369,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
},
)?;

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -418,7 +417,6 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore: keystore_container.sync_keystore(),
backend,
network,
network_status_sinks,
system_rpc_tx,
telemetry: telemetry.as_mut(),
})?;
Expand Down
9 changes: 3 additions & 6 deletions bin/node/cli/src/service.rs
Expand Up @@ -204,7 +204,6 @@ pub struct NewFullBase {
pub task_manager: TaskManager,
pub client: Arc<FullClient>,
pub network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
pub network_status_sinks: sc_service::NetworkStatusSinks<Block>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
}

Expand Down Expand Up @@ -242,7 +241,7 @@ pub fn new_full_base(
)
);

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -279,7 +278,6 @@ pub fn new_full_base(
task_manager: &mut task_manager,
on_demand: None,
remote_blockchain: None,
network_status_sinks: network_status_sinks.clone(),
system_rpc_tx,
telemetry: telemetry.as_mut(),
},
Expand Down Expand Up @@ -415,7 +413,6 @@ pub fn new_full_base(
task_manager,
client,
network,
network_status_sinks,
transaction_pool,
})
}
Expand Down Expand Up @@ -519,7 +516,7 @@ pub fn new_light_base(
telemetry.as_ref().map(|x| x.handle()),
)?;

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -576,7 +573,7 @@ pub fn new_light_base(
client: client.clone(),
transaction_pool: transaction_pool.clone(),
keystore: keystore_container.sync_keystore(),
config, backend, network_status_sinks, system_rpc_tx,
config, backend, system_rpc_tx,
network: network.clone(),
task_manager: &mut task_manager,
telemetry: telemetry.as_mut(),
Expand Down
2 changes: 1 addition & 1 deletion client/informant/Cargo.toml
Expand Up @@ -15,12 +15,12 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
ansi_term = "0.12.1"
futures = "0.3.9"
futures-timer = "3.0.1"
log = "0.4.8"
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
sc-client-api = { version = "3.0.0", path = "../api" }
sc-network = { version = "0.9.0", path = "../network" }
sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
sp-utils = { version = "3.0.0", path = "../../primitives/utils" }
sp-transaction-pool = { version = "3.0.0", path = "../../primitives/transaction-pool" }
wasm-timer = "0.2"
31 changes: 19 additions & 12 deletions client/informant/src/lib.rs
Expand Up @@ -20,18 +20,23 @@

use ansi_term::Colour;
use futures::prelude::*;
use futures_timer::Delay;
use log::{info, trace, warn};
use parity_util_mem::MallocSizeOf;
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::NetworkStatus;
use sc_network::NetworkService;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
use sp_utils::{status_sinks, mpsc::tracing_unbounded};
use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque};

mod display;

/// Creates a stream that returns a new value every `duration`.
fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
futures::stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}

/// The format to print telemetry output in.
#[derive(Clone, Debug)]
pub struct OutputFormat {
Expand Down Expand Up @@ -64,23 +69,25 @@ impl<T: TransactionPool> TransactionPoolAndMaybeMallogSizeOf for T {}
impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for T {}

/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
pub async fn build<B: BlockT, C>(
client: Arc<C>,
network_status_sinks: Arc<status_sinks::StatusSinks<NetworkStatus<B>>>,
network: Arc<NetworkService<B, <B as BlockT>::Hash>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
)
where
C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
<C as HeaderMetadata<B>>::Error: Display,
{
let mut display = display::InformantDisplay::new(format.clone());

let client_1 = client.clone();
let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status");
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);

let display_notifications = network_status_stream
let display_notifications = interval(Duration::from_millis(5000))
.filter_map(|_| async {
let status = network.status().await;
status.ok()
})
.for_each(move |net_status| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
Expand All @@ -101,10 +108,10 @@ where
future::ready(())
});

future::join(
display_notifications,
display_block_import(client),
).map(|_| ())
futures::select! {
() = display_notifications.fuse() => (),
() = display_block_import(client).fuse() => (),
};
}

fn display_block_import<B: BlockT, C>(client: Arc<C>) -> impl Future<Output = ()>
Expand Down
49 changes: 49 additions & 0 deletions client/network/src/service.rs
Expand Up @@ -888,6 +888,43 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
});
}

/// High-level network status information.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn status(&self) -> Result<NetworkStatus<B>, ()> {
let (tx, rx) = oneshot::channel();

let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkStatus {
pending_response: tx,
});

match rx.await {
Ok(v) => v.map_err(|_| ()),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// Get network state.
///
/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
/// everywhere about this. Please don't use this function to retrieve actual information.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn network_state(&self) -> Result<NetworkState, ()> {
let (tx, rx) = oneshot::channel();

let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkState {
pending_response: tx,
});

match rx.await {
Ok(v) => v.map_err(|_| ()),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// You may call this when new transactons are imported by the transaction pool.
///
/// All transactions will be fetched from the `TransactionPool` that was passed at
Expand Down Expand Up @@ -1307,6 +1344,12 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
},
NetworkStatus {
pending_response: oneshot::Sender<Result<NetworkStatus<B>, RequestFailure>>,
},
NetworkState {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
DisconnectPeer(PeerId, Cow<'static, str>),
NewBestBlockImported(B::Hash, NumberFor<B>),
}
Expand Down Expand Up @@ -1434,6 +1477,12 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => {
this.network_service.behaviour_mut().send_request(&target, &protocol, request, pending_response, connect);
},
ServiceToWorkerMsg::NetworkStatus { pending_response } => {
let _ = pending_response.send(Ok(this.status()));
},
ServiceToWorkerMsg::NetworkState { pending_response } => {
let _ = pending_response.send(Ok(this.network_state()));
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) =>
this.network_service.behaviour_mut().user_protocol_mut().disconnect_peer(&who, &protocol_name),
ServiceToWorkerMsg::NewBestBlockImported(hash, number) =>
Expand Down
14 changes: 4 additions & 10 deletions client/service/src/builder.rs
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks,
error::Error, MallocSizeOfWasm, RpcHandlers,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
metrics::MetricsService,
client::{light, Client, ClientConfig},
Expand Down Expand Up @@ -519,8 +519,6 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A shared network instance.
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// Sinks to propagate network status updates.
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Telemetry instance for this node.
Expand Down Expand Up @@ -590,7 +588,6 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder,
remote_blockchain,
network,
network_status_sinks,
system_rpc_tx,
telemetry,
} = params;
Expand Down Expand Up @@ -654,7 +651,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network_status_sinks.clone()
network.clone(),
)
);

Expand All @@ -679,7 +676,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.status.clone(),
network.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
Expand Down Expand Up @@ -865,7 +862,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
NetworkStatusSinks<TBl>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
NetworkStarter,
),
Expand Down Expand Up @@ -959,15 +955,13 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = NetworkStatusSinks::new();

let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");

let future = build_network_future(
config.role.clone(),
network_mut,
client,
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
Expand Down Expand Up @@ -1010,7 +1004,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
future.await
});

Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
Ok((network, system_rpc_tx, NetworkStarter(network_start_tx)))
}

/// Object used to start the network.
Expand Down

0 comments on commit 14fcad9

Please sign in to comment.