Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use SpawnTaskHandles for spawning tasks in the tx pool #8958

Merged
5 commits merged into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{sync::Arc, time::Duration};
use sc_client_api::{ExecutorProvider, RemoteBackend};
use node_template_runtime::{self, opaque::Block, RuntimeApi};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager, SpawnTaskHandle};
use sc_executor::native_executor_instance;
pub use sc_executor::NativeExecutor;
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
Expand All @@ -28,7 +28,7 @@ type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponents<
FullClient, FullBackend, FullSelectChain,
sp_consensus::DefaultImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient, SpawnTaskHandle>,
(
sc_finality_grandpa::GrandpaBlockImport<FullBackend, Block, FullClient, FullSelectChain>,
sc_finality_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
Expand Down
5 changes: 3 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use node_primitives::Block;
use node_runtime::RuntimeApi;
use sc_service::{
config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager,
SpawnTaskHandle,
};
use sc_network::{Event, NetworkService};
use sp_runtime::traits::Block as BlockT;
Expand All @@ -47,7 +48,7 @@ pub fn new_partial(
) -> Result<sc_service::PartialComponents<
FullClient, FullBackend, FullSelectChain,
sp_consensus::DefaultImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient, SpawnTaskHandle>,
expenses marked this conversation as resolved.
Show resolved Hide resolved
(
impl Fn(
node_rpc::DenyUnsafe,
Expand Down Expand Up @@ -204,7 +205,7 @@ pub struct NewFullBase {
pub task_manager: TaskManager,
pub client: Arc<FullClient>,
pub network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient, SpawnTaskHandle>>,
}

/// Creates a full service from the configuration.
Expand Down
1 change: 0 additions & 1 deletion client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "2.0.0" }
thiserror = "1.0.21"
futures = { version = "0.3.1", features = ["compat"] }
futures-diagnose = "1.0"
intervalier = "0.4.0"
log = "0.4.8"
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
Expand Down
34 changes: 17 additions & 17 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
channel::oneshot, future::{Future, FutureExt, ready, Ready},
};

use sc_client_api::{
Expand All @@ -38,18 +38,19 @@ use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}};

/// The transaction pool logic for full client.
pub struct FullChainApi<Client, Block> {
pub struct FullChainApi<Client, Block, Spawner> {
client: Arc<Client>,
pool: ThreadPool,
spawner: Spawner,
_marker: PhantomData<Block>,
metrics: Option<Arc<ApiMetrics>>,
}

impl<Client, Block> FullChainApi<Client, Block> {
impl<Client, Block, Spawner> FullChainApi<Client, Block, Spawner> {
/// Create new transaction pool logic.
pub fn new(
client: Arc<Client>,
prometheus: Option<&PrometheusRegistry>,
spawner: Spawner,
) -> Self {
let metrics = prometheus.map(ApiMetrics::register).and_then(|r| {
match r {
Expand All @@ -67,23 +68,20 @@ impl<Client, Block> FullChainApi<Client, Block> {

FullChainApi {
client,
pool: ThreadPoolBuilder::new()
.pool_size(2)
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
Comment on lines -70 to -74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we spawned 2 threads and now you spawn unlimited of them...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still going to be limited by the number of blocking threads, no?
If it's not limited then we might be generating quite huge load on the entire machine due to revalidation (which is not bounded currently) and that might interfere with other tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is quite high, I don't remember it, but it is way more than 2

_marker: Default::default(),
metrics,
spawner,
}
}
}

impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block>
impl<Client, Block, Spawner> sc_transaction_graph::ChainApi for FullChainApi<Client, Block, Spawner>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
Spawner: sp_core::traits::SpawnNamed,
{
type Block = Block;
type Error = error::Error;
Expand All @@ -109,10 +107,10 @@ where
let metrics = self.metrics.clone();
metrics.report(|m| m.validations_scheduled.inc());

self.pool.spawn_ok(futures_diagnose::diagnose(
self.spawner.spawn_blocking(
"validate-transaction",
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
Box::pin(async move {
let res = validate_transaction_blocking::<_, _, Spawner>(&*client, &at, source, uxt);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
Expand Down Expand Up @@ -154,17 +152,18 @@ where

/// Helper function to validate a transaction using a full chain API.
/// This method will call into the runtime to perform the validation.
fn validate_transaction_blocking<Client, Block>(
fn validate_transaction_blocking<Client, Block, Spawner>(
client: &Client,
at: &BlockId<Block>,
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<FullChainApi<Client, Block>>,
uxt: sc_transaction_graph::ExtrinsicFor<FullChainApi<Client, Block, Spawner>>,
) -> error::Result<TransactionValidity>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
Spawner: sp_core::traits::SpawnNamed,
{
sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
{
Expand All @@ -190,12 +189,13 @@ where
})
}

impl<Client, Block> FullChainApi<Client, Block>
impl<Client, Block, Spawner> FullChainApi<Client, Block, Spawner>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
Spawner: sp_core::traits::SpawnNamed,
{
/// Validates a transaction by calling into the runtime, same as
/// `validate_transaction` but blocks the current thread when performing
Expand All @@ -207,7 +207,7 @@ where
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
validate_transaction_blocking::<_, _, Spawner>(&*self.client, at, source, uxt)
}
}

Expand Down
18 changes: 10 additions & 8 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<
type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output=ReadyIteratorFor<PoolApi>> + Send>>;

/// A transaction pool for a full node.
pub type FullPool<Block, Client> = BasicPool<FullChainApi<Client, Block>, Block>;
pub type FullPool<Block, Client, Spawner> = BasicPool<FullChainApi<Client, Block, Spawner>, Block>;
/// A transaction pool for a light node.
pub type LightPool<Block, Client, Fetcher> = BasicPool<LightChainApi<Client, Fetcher, Block>, Block>;

Expand Down Expand Up @@ -352,24 +352,25 @@ where
}
}

impl<Block, Client> FullPool<Block, Client>
impl<Block, Client, Spawner> FullPool<Block, Client, Spawner>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
Spawner: SpawnNamed + Clone + 'static,
{
/// Create new basic transaction pool for a full node with the provided api.
pub fn new_full(
options: sc_transaction_graph::Options,
is_validator: txpool::IsValidator,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
spawner: Spawner,
client: Arc<Client>,
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, spawner.clone()));
let pool = Arc::new(Self::with_revalidation_type(
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
));
Expand All @@ -381,19 +382,20 @@ where
}
}

impl<Block, Client> sp_transaction_pool::LocalTransactionPool
for BasicPool<FullChainApi<Client, Block>, Block>
impl<Block, Client, Spawner> sp_transaction_pool::LocalTransactionPool
for BasicPool<FullChainApi<Client, Block, Spawner>, Block>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
Spawner: SpawnNamed,
{
type Block = Block;
type Hash = sc_transaction_graph::ExtrinsicHash<FullChainApi<Client, Block>>;
type Error = <FullChainApi<Client, Block> as ChainApi>::Error;
type Hash = sc_transaction_graph::ExtrinsicHash<FullChainApi<Client, Block, Spawner>>;
type Error = <FullChainApi<Client, Block, Spawner> as ChainApi>::Error;

fn submit_local(
&self,
Expand Down