From f5b7dc8c9321b13274fced3aaa2b6801a715f2ae Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 17 Feb 2020 15:58:26 +0800 Subject: [PATCH] *: add stack size and max task number limit to unified read pool (#6597) Signed-off-by: Yilin Chen --- cmd/src/server.rs | 24 +-- components/test_coprocessor/src/fixture.rs | 7 +- components/test_raftstore/src/server.rs | 13 +- components/test_raftstore/src/util.rs | 5 +- etc/config-template.toml | 7 + src/config.rs | 45 ++++- src/coprocessor/endpoint.rs | 122 ++++++++----- src/read_pool.rs | 193 +++++++++++++++++++-- src/server/node.rs | 4 +- src/server/server.rs | 12 +- src/storage/mod.rs | 15 +- tests/integrations/config/mod.rs | 2 + tests/integrations/config/test-custom.toml | 2 + 13 files changed, 349 insertions(+), 102 deletions(-) diff --git a/cmd/src/server.rs b/cmd/src/server.rs index 074933914f4..502dfa7d065 100644 --- a/cmd/src/server.rs +++ b/cmd/src/server.rs @@ -418,21 +418,21 @@ impl TiKVServer { None }; - let storage_read_pool = if self.config.readpool.unify_read_pool { - ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone()) + let storage_read_pool_handle = if self.config.readpool.unify_read_pool { + unified_read_pool.as_ref().unwrap().handle() } else { - let storage_read_pools = storage::build_read_pool( + let storage_read_pools = ReadPool::from(storage::build_read_pool( &self.config.readpool.storage, pd_sender.clone(), engines.engine.clone(), - ); - ReadPool::from(storage_read_pools) + )); + storage_read_pools.handle() }; let storage = create_raft_storage( engines.engine.clone(), &self.config.storage, - storage_read_pool, + storage_read_pool_handle, lock_mgr.clone(), ) .unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e)); @@ -454,15 +454,15 @@ impl TiKVServer { .build(snap_path, Some(self.router.clone())); // Create coprocessor endpoint. - let cop_read_pool = if self.config.readpool.unify_read_pool { - ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone()) + let cop_read_pool_handle = if self.config.readpool.unify_read_pool { + unified_read_pool.as_ref().unwrap().handle() } else { - let cop_read_pools = coprocessor::readpool_impl::build_read_pool( + let cop_read_pools = ReadPool::from(coprocessor::readpool_impl::build_read_pool( &self.config.readpool.coprocessor, pd_sender.clone(), engines.engine.clone(), - ); - ReadPool::from(cop_read_pools) + )); + cop_read_pools.handle() }; let server_config = Arc::new(self.config.server.clone()); @@ -472,7 +472,7 @@ impl TiKVServer { &server_config, &self.security_mgr, storage, - coprocessor::Endpoint::new(&server_config, cop_read_pool), + coprocessor::Endpoint::new(&server_config, cop_read_pool_handle), engines.raft_router.clone(), self.resolver.clone(), snap_mgr.clone(), diff --git a/components/test_coprocessor/src/fixture.rs b/components/test_coprocessor/src/fixture.rs index bb21d67f40b..97a19b85d3b 100644 --- a/components/test_coprocessor/src/fixture.rs +++ b/components/test_coprocessor/src/fixture.rs @@ -7,6 +7,7 @@ use kvproto::kvrpcpb::Context; use tidb_query::codec::Datum; use tikv::config::CoprReadPoolConfig; use tikv::coprocessor::{readpool_impl, Endpoint}; +use tikv::read_pool::ReadPool; use tikv::server::Config; use tikv::storage::kv::RocksEngine; use tikv::storage::{Engine, TestEngineBuilder}; @@ -79,11 +80,11 @@ pub fn init_data_with_details( store.commit_with_ctx(ctx); } - let pool = readpool_impl::build_read_pool_for_test( + let pool = ReadPool::from(readpool_impl::build_read_pool_for_test( &CoprReadPoolConfig::default_for_test(), store.get_engine(), - ); - let cop = Endpoint::new(cfg, pool.into()); + )); + let cop = Endpoint::new(cfg, pool.handle()); (store, cop) } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 4ad5964ac19..8c5525b886c 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -28,6 +28,7 @@ use raftstore::Result; use tikv::config::{ConfigController, ConfigHandler, Module, TiKvConfig}; use tikv::coprocessor; use tikv::import::{ImportSSTService, SSTImporter}; +use tikv::read_pool::ReadPool; use tikv::server::gc_worker::GcWorker; use tikv::server::load_statistics::ThreadLoad; use tikv::server::lock_manager::LockManager; @@ -145,10 +146,10 @@ impl Simulator for ServerCluster { // Create storage. let pd_worker = FutureWorker::new("test-pd-worker"); - let storage_read_pool = storage::build_read_pool_for_test( + let storage_read_pool = ReadPool::from(storage::build_read_pool_for_test( &tikv::config::StorageReadPoolConfig::default_for_test(), raft_engine.clone(), - ); + )); let engine = RaftKv::new(sim_router.clone()); @@ -165,7 +166,7 @@ impl Simulator for ServerCluster { let store = create_raft_storage( engine, &cfg.storage, - storage_read_pool.into(), + storage_read_pool.handle(), Some(lock_mgr.clone()), )?; self.storages.insert(node_id, raft_engine); @@ -203,11 +204,11 @@ impl Simulator for ServerCluster { let snap_mgr = SnapManager::new(tmp_str, Some(router.clone())); let server_cfg = Arc::new(cfg.server.clone()); let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap()); - let cop_read_pool = coprocessor::readpool_impl::build_read_pool_for_test( + let cop_read_pool = ReadPool::from(coprocessor::readpool_impl::build_read_pool_for_test( &tikv::config::CoprReadPoolConfig::default_for_test(), store.get_engine(), - ); - let cop = coprocessor::Endpoint::new(&server_cfg, cop_read_pool.into()); + )); + let cop = coprocessor::Endpoint::new(&server_cfg, cop_read_pool.handle()); let mut server = None; for _ in 0..100 { let mut svr = Server::new( diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index c4196a57d5e..627c3927c82 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -157,8 +157,9 @@ pub fn new_readpool_cfg() -> ReadPoolConfig { ReadPoolConfig { unify_read_pool: false, unified: UnifiedReadPoolConfig { - min_thread_count: 0, - max_thread_count: 0, + min_thread_count: 1, + max_thread_count: 1, + ..UnifiedReadPoolConfig::default() }, storage: StorageReadPoolConfig { high_concurrency: 1, diff --git a/etc/config-template.toml b/etc/config-template.toml index 5b1d89d49a8..16b667991e4 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -38,10 +38,17 @@ [readpool.unified] ## The minimal working thread count of the thread pool. # min-thread-count = 1 + ## The maximum working thread count of the thread pool. ## The default value is the max(4, LOGICAL_CPU_NUM * 0.8). # max-thread-count = 8 +## Size of the stack for each thread in the thread pool. +# stack-size = "10MB" + +## Max running tasks of each worker, reject if exceeded. +# max-tasks-per-worker = 2000 + [readpool.storage] ## Size of the thread pool for high-priority operations. # high-concurrency = 4 diff --git a/src/config.rs b/src/config.rs index f842de493f7..37f389476b6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1326,6 +1326,8 @@ pub mod log_level_serde { pub struct UnifiedReadPoolConfig { pub min_thread_count: usize, pub max_thread_count: usize, + pub stack_size: ReadableSize, + pub max_tasks_per_worker: usize, // FIXME: Add more configs when they are effective in yatp } @@ -1343,6 +1345,16 @@ impl UnifiedReadPoolConfig { .into(), ); } + if self.stack_size.0 < ReadableSize::mb(2).0 { + return Err("readpool.unified.stack-size should be >= 2mb" + .to_string() + .into()); + } + if self.max_tasks_per_worker <= 1 { + return Err("readpool.unified.max-tasks-per-worker should be > 1" + .to_string() + .into()); + } Ok(()) } } @@ -1358,6 +1370,8 @@ impl Default for UnifiedReadPoolConfig { Self { min_thread_count: 1, max_thread_count: concurrency, + stack_size: ReadableSize::mb(DEFAULT_READPOOL_STACK_SIZE_MB), + max_tasks_per_worker: DEFAULT_READPOOL_MAX_TASKS_PER_WORKER, } } } @@ -1371,6 +1385,8 @@ mod unified_read_pool_tests { let cfg = UnifiedReadPoolConfig { min_thread_count: 1, max_thread_count: 2, + stack_size: ReadableSize::mb(2), + max_tasks_per_worker: 2000, }; assert!(cfg.validate().is_ok()); @@ -1383,6 +1399,19 @@ mod unified_read_pool_tests { let invalid_cfg = UnifiedReadPoolConfig { min_thread_count: 2, max_thread_count: 1, + ..cfg + }; + assert!(invalid_cfg.validate().is_err()); + + let invalid_cfg = UnifiedReadPoolConfig { + stack_size: ReadableSize::mb(1), + ..cfg + }; + assert!(invalid_cfg.validate().is_err()); + + let invalid_cfg = UnifiedReadPoolConfig { + max_tasks_per_worker: 1, + ..cfg }; assert!(invalid_cfg.validate().is_err()); } @@ -1459,10 +1488,12 @@ macro_rules! readpool_config { ) .into()); } - if self.stack_size.0 < ReadableSize::mb(2).0 { - return Err( - format!("readpool.{}.stack-size should be >= 2mb", $display_name).into(), - ); + if self.stack_size.0 < ReadableSize::mb(MIN_READPOOL_STACK_SIZE_MB).0 { + return Err(format!( + "readpool.{}.stack-size should be >= {}mb", + $display_name, MIN_READPOOL_STACK_SIZE_MB + ) + .into()); } if self.max_tasks_per_worker_high <= 1 { return Err(format!( @@ -1550,8 +1581,9 @@ const DEFAULT_STORAGE_READPOOL_MAX_CONCURRENCY: usize = 8; // 0.001 * x secs to be actual started. A server-is-busy error will trigger 2 seconds // backoff. So when it needs to wait for more than 2 seconds, return error won't causse // larger latency. -const DEFAULT_READPOOL_MAX_TASKS_PER_WORKER: usize = 2 as usize * 1000; +const DEFAULT_READPOOL_MAX_TASKS_PER_WORKER: usize = 2 * 1000; +const MIN_READPOOL_STACK_SIZE_MB: u64 = 2; const DEFAULT_READPOOL_STACK_SIZE_MB: u64 = 10; readpool_config!(StorageReadPoolConfig, storage_read_pool_test, "storage"); @@ -1642,6 +1674,8 @@ mod readpool_tests { let unified = UnifiedReadPoolConfig { min_thread_count: 0, max_thread_count: 0, + stack_size: ReadableSize::mb(0), + max_tasks_per_worker: 0, }; assert!(unified.validate().is_err()); let storage = StorageReadPoolConfig::default(); @@ -1701,6 +1735,7 @@ mod readpool_tests { let unified = UnifiedReadPoolConfig { min_thread_count: 0, max_thread_count: 0, + ..Default::default() }; assert!(unified.validate().is_err()); let storage = StorageReadPoolConfig::default(); diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index 718737ee189..7e64090d076 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -19,7 +19,7 @@ use tipb::{AnalyzeReq, AnalyzeType}; use tipb::{ChecksumRequest, ChecksumScanOn}; use tipb::{DagRequest, ExecType}; -use crate::read_pool::ReadPool; +use crate::read_pool::ReadPoolHandle; use crate::server::Config; use crate::storage::kv::with_tls_engine; use crate::storage::kv::{Error as KvError, ErrorInner as KvErrorInner}; @@ -33,7 +33,7 @@ use crate::coprocessor::*; /// A pool to build and run Coprocessor request handlers. pub struct Endpoint { /// The thread pool to run Coprocessor requests. - read_pool: ReadPool, + read_pool: ReadPoolHandle, /// The concurrency limiter of the coprocessor. semaphore: Option>, @@ -67,12 +67,14 @@ impl Clone for Endpoint { impl tikv_util::AssertSend for Endpoint {} impl Endpoint { - pub fn new(cfg: &Config, read_pool: ReadPool) -> Self { + pub fn new(cfg: &Config, read_pool: ReadPoolHandle) -> Self { // FIXME: When yatp is used, we need to limit coprocessor requests in progress to avoid // using too much memory. However, if there are a number of large requests, small requests // will still be blocked. This needs to be improved. let semaphore = match &read_pool { - ReadPool::Yatp(_) => Some(Arc::new(Semaphore::new(cfg.end_point_max_concurrency))), + ReadPoolHandle::Yatp { .. } => { + Some(Arc::new(Semaphore::new(cfg.end_point_max_concurrency))) + } _ => None, }; Self { @@ -578,6 +580,7 @@ mod tests { use crate::config::CoprReadPoolConfig; use crate::coprocessor::readpool_impl::build_read_pool_for_test; + use crate::read_pool::ReadPool; use crate::storage::kv::RocksEngine; use crate::storage::TestEngineBuilder; use protobuf::Message; @@ -704,8 +707,11 @@ mod tests { #[test] fn test_outdated_request() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); // a normal request let handler_builder = @@ -738,8 +744,11 @@ mod tests { #[test] fn test_stack_guard() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let mut cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let mut cop = Endpoint::::new(&Config::default(), read_pool.handle()); cop.recursion_limit = 100; let req = { @@ -771,8 +780,11 @@ mod tests { #[test] fn test_invalid_req_type() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); let mut req = coppb::Request::default(); req.set_tp(9999); @@ -787,8 +799,11 @@ mod tests { #[test] fn test_invalid_req_body() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); let mut req = coppb::Request::default(); req.set_tp(REQ_TYPE_DAG); @@ -805,29 +820,31 @@ mod tests { fn test_full() { use crate::storage::kv::{destroy_tls_engine, set_tls_engine}; use std::sync::Mutex; - use tikv_util::future_pool::{Builder, FuturePool}; + use tikv_util::future_pool::Builder; let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool: Vec = CoprReadPoolConfig { - normal_concurrency: 1, - max_tasks_per_worker_normal: 2, - ..CoprReadPoolConfig::default_for_test() - } - .to_future_pool_configs() - .into_iter() - .map(|config| { - let engine = Arc::new(Mutex::new(engine.clone())); - Builder::from_config(config) - .name_prefix("coprocessor_endpoint_test_full") - .after_start(move || set_tls_engine(engine.lock().unwrap().clone())) - // Safety: we call `set_` and `destroy_` with the same engine type. - .before_stop(|| unsafe { destroy_tls_engine::() }) - .build() - }) - .collect(); - - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from( + CoprReadPoolConfig { + normal_concurrency: 1, + max_tasks_per_worker_normal: 2, + ..CoprReadPoolConfig::default_for_test() + } + .to_future_pool_configs() + .into_iter() + .map(|config| { + let engine = Arc::new(Mutex::new(engine.clone())); + Builder::from_config(config) + .name_prefix("coprocessor_endpoint_test_full") + .after_start(move || set_tls_engine(engine.lock().unwrap().clone())) + // Safety: we call `set_` and `destroy_` with the same engine type. + .before_stop(|| unsafe { destroy_tls_engine::() }) + .build() + }) + .collect::>(), + ); + + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); let (tx, rx) = mpsc::channel(); @@ -864,8 +881,11 @@ mod tests { #[test] fn test_error_unary_response() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); let handler_builder = Box::new(|_, _: &_| Ok(UnaryFixture::new(Err(box_err!("foo"))).into_boxed())); @@ -880,8 +900,11 @@ mod tests { #[test] fn test_error_streaming_response() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); // Fail immediately let handler_builder = @@ -923,8 +946,11 @@ mod tests { #[test] fn test_empty_streaming_response() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed())); let resp_vec = block_on_stream( @@ -941,8 +967,11 @@ mod tests { #[test] fn test_special_streaming_handlers() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); - let cop = Endpoint::::new(&Config::default(), read_pool.into()); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); + let cop = Endpoint::::new(&Config::default(), read_pool.handle()); // handler returns `finished == true` should not be called again. let counter = Arc::new(atomic::AtomicIsize::new(0)); @@ -1027,13 +1056,16 @@ mod tests { #[test] fn test_channel_size() { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test(&CoprReadPoolConfig::default_for_test(), engine); + let read_pool = ReadPool::from(build_read_pool_for_test( + &CoprReadPoolConfig::default_for_test(), + engine, + )); let cop = Endpoint::::new( &Config { end_point_stream_channel_size: 3, ..Config::default() }, - read_pool.into(), + read_pool.handle(), ); let counter = Arc::new(atomic::AtomicIsize::new(0)); @@ -1078,7 +1110,7 @@ mod tests { let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = build_read_pool_for_test( + let read_pool = ReadPool::from(build_read_pool_for_test( &CoprReadPoolConfig { low_concurrency: 1, normal_concurrency: 1, @@ -1086,13 +1118,13 @@ mod tests { ..CoprReadPoolConfig::default_for_test() }, engine, - ); + )); let mut config = Config::default(); config.end_point_request_max_handle_duration = ReadableDuration::millis((PAYLOAD_SMALL + PAYLOAD_LARGE) as u64 * 2); - let cop = Endpoint::::new(&config, read_pool.into()); + let cop = Endpoint::::new(&config, read_pool.handle()); let (tx, rx) = std::sync::mpsc::channel(); diff --git a/src/read_pool.rs b/src/read_pool.rs index 121ccf9b61f..47ae6acf6bd 100644 --- a/src/read_pool.rs +++ b/src/read_pool.rs @@ -1,3 +1,5 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + use futures::sync::oneshot; use futures::{future, Future}; use futures03::prelude::*; @@ -12,26 +14,70 @@ use yatp::queue::{multilevel, Extras, QueueType}; use yatp::task::future::{Runner as FutureRunner, TaskCell}; use yatp::Remote; +use self::metrics::*; use crate::config::UnifiedReadPoolConfig; use crate::storage::kv::{destroy_tls_engine, set_tls_engine, Engine, FlowStatsReporter}; +use prometheus::IntGauge; -#[derive(Clone)] pub enum ReadPool { FuturePools { read_pool_high: FuturePool, read_pool_normal: FuturePool, read_pool_low: FuturePool, }, - Yatp(Remote), + Yatp { + pool: yatp::ThreadPool, + running_tasks: IntGauge, + max_tasks: usize, + }, } impl ReadPool { + pub fn handle(&self) -> ReadPoolHandle { + match self { + ReadPool::FuturePools { + read_pool_high, + read_pool_normal, + read_pool_low, + } => ReadPoolHandle::FuturePools { + read_pool_high: read_pool_high.clone(), + read_pool_normal: read_pool_normal.clone(), + read_pool_low: read_pool_low.clone(), + }, + ReadPool::Yatp { + pool, + running_tasks, + max_tasks, + } => ReadPoolHandle::Yatp { + remote: pool.remote().clone(), + running_tasks: running_tasks.clone(), + max_tasks: *max_tasks, + }, + } + } +} + +#[derive(Clone)] +pub enum ReadPoolHandle { + FuturePools { + read_pool_high: FuturePool, + read_pool_normal: FuturePool, + read_pool_low: FuturePool, + }, + Yatp { + remote: Remote, + running_tasks: IntGauge, + max_tasks: usize, + }, +} + +impl ReadPoolHandle { pub fn spawn(&self, f: F, priority: CommandPri, task_id: u64) -> Result<(), ReadPoolError> where F: StdFuture + Send + 'static, { match self { - ReadPool::FuturePools { + ReadPoolHandle::FuturePools { read_pool_high, read_pool_normal, read_pool_low, @@ -44,14 +90,34 @@ impl ReadPool { pool.spawn(move || Box::pin(f.never_error()).compat())?; } - ReadPool::Yatp(remote) => { + ReadPoolHandle::Yatp { + remote, + running_tasks, + max_tasks, + } => { + let running_tasks = running_tasks.clone(); + // Note that the running task number limit is not strict. + // If several tasks are spawned at the same time while the running task number + // is close to the limit, they may all pass this check and the number of running + // tasks may exceed the limit. + if running_tasks.get() as usize >= *max_tasks { + return Err(ReadPoolError::UnifiedReadPoolFull); + } + let fixed_level = match priority { CommandPri::High => Some(0), CommandPri::Normal => None, CommandPri::Low => Some(2), }; let extras = Extras::new_multilevel(task_id, fixed_level); - let task_cell = TaskCell::new(f, extras); + let task_cell = TaskCell::new( + async move { + running_tasks.inc(); + f.await; + running_tasks.dec(); + }, + extras, + ); remote.spawn(task_cell); } } @@ -156,21 +222,48 @@ impl ReadPoolRunner { } } +#[cfg(test)] +fn get_unified_read_pool_name() -> String { + use std::sync::atomic::{AtomicU64, Ordering}; + + static COUNTER: AtomicU64 = AtomicU64::new(0); + format!( + "unified-read-pool-test-{}", + COUNTER.fetch_add(1, Ordering::Relaxed) + ) +} + +#[cfg(not(test))] +fn get_unified_read_pool_name() -> String { + "unified-read-pool".to_string() +} + pub fn build_yatp_read_pool( config: &UnifiedReadPoolConfig, reporter: R, engine: E, -) -> yatp::ThreadPool { - let pool_name = "unified-read-pool"; - let mut builder = yatp::Builder::new(pool_name); +) -> ReadPool { + let unified_read_pool_name = get_unified_read_pool_name(); + + let mut builder = yatp::Builder::new(&unified_read_pool_name); builder + .stack_size(config.stack_size.0 as usize) .min_thread_count(config.min_thread_count) .max_thread_count(config.max_thread_count); let multilevel_builder = - multilevel::Builder::new(multilevel::Config::default().name(Some(pool_name))); + multilevel::Builder::new(multilevel::Config::default().name(Some(&unified_read_pool_name))); let read_pool_runner = ReadPoolRunner::new(engine, Default::default(), reporter); let runner_builder = multilevel_builder.runner_builder(CloneRunnerBuilder(read_pool_runner)); - builder.build_with_queue_and_runner(QueueType::Multilevel(multilevel_builder), runner_builder) + let pool = builder + .build_with_queue_and_runner(QueueType::Multilevel(multilevel_builder), runner_builder); + ReadPool::Yatp { + pool, + running_tasks: UNIFIED_READ_POOL_RUNNING_TASKS + .with_label_values(&[&unified_read_pool_name]), + max_tasks: config + .max_tasks_per_worker + .saturating_mul(config.max_thread_count), + } } impl From> for ReadPool { @@ -187,12 +280,6 @@ impl From> for ReadPool { } } -impl From> for ReadPool { - fn from(yatp_remote: Remote) -> Self { - ReadPool::Yatp(yatp_remote) - } -} - quick_error! { #[derive(Debug)] pub enum ReadPoolError { @@ -201,6 +288,9 @@ quick_error! { cause(err) description(err.description()) } + UnifiedReadPoolFull { + description("Unified read pool is full") + } Canceled(err: oneshot::Canceled) { from() cause(err) @@ -208,3 +298,74 @@ quick_error! { } } } + +mod metrics { + use prometheus::*; + + lazy_static! { + pub static ref UNIFIED_READ_POOL_RUNNING_TASKS: IntGaugeVec = register_int_gauge_vec!( + "tikv_unified_read_pool_running_tasks", + "The number of running tasks in the unified read pool", + &["name"] + ) + .unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::TestEngineBuilder; + use futures03::channel::oneshot; + use raftstore::store::FlowStatistics; + use std::thread; + use tikv_util::collections::HashMap; + + #[derive(Clone)] + struct DummyReporter; + + impl FlowStatsReporter for DummyReporter { + fn report_read_stats(&self, _read_stats: HashMap) {} + } + + #[test] + fn test_yatp_full() { + let config = UnifiedReadPoolConfig { + min_thread_count: 1, + max_thread_count: 2, + max_tasks_per_worker: 1, + ..Default::default() + }; + // max running tasks number should be 2*1 = 2 + + let engine = TestEngineBuilder::new().build().unwrap(); + let pool = build_yatp_read_pool(&config, DummyReporter, engine); + + let gen_task = || { + let (tx, rx) = oneshot::channel::<()>(); + let task = async move { + let _ = rx.await; + }; + (task, tx) + }; + + let handle = pool.handle(); + let (task1, tx1) = gen_task(); + let (task2, _tx2) = gen_task(); + let (task3, _tx3) = gen_task(); + let (task4, _tx4) = gen_task(); + + assert!(handle.spawn(task1, CommandPri::Normal, 1).is_ok()); + assert!(handle.spawn(task2, CommandPri::Normal, 2).is_ok()); + + thread::sleep(Duration::from_millis(300)); + match handle.spawn(task3, CommandPri::Normal, 3) { + Err(ReadPoolError::UnifiedReadPoolFull) => {} + _ => panic!("should return full error"), + } + tx1.send(()).unwrap(); + + thread::sleep(Duration::from_millis(300)); + assert!(handle.spawn(task4, CommandPri::Normal, 4).is_ok()); + } +} diff --git a/src/server/node.rs b/src/server/node.rs index e5b5ab5d0ed..a748126eef8 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -7,7 +7,7 @@ use std::time::Duration; use super::RaftKv; use super::Result; use crate::import::SSTImporter; -use crate::read_pool::ReadPool; +use crate::read_pool::ReadPoolHandle; use crate::server::lock_manager::LockManager; use crate::server::Config as ServerConfig; use crate::storage::{config::Config as StorageConfig, Storage}; @@ -35,7 +35,7 @@ const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; pub fn create_raft_storage( engine: RaftKv, cfg: &StorageConfig, - read_pool: ReadPool, + read_pool: ReadPoolHandle, lock_mgr: Option, ) -> Result, LockManager>> where diff --git a/src/server/server.rs b/src/server/server.rs index f80c735a55d..21c86e80aca 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -13,7 +13,6 @@ use grpcio::{ use kvproto::tikvpb::*; use tokio_threadpool::{Builder as ThreadPoolBuilder, ThreadPool}; use tokio_timer::timer::Handle; -use yatp::task::future::TaskCell; use crate::coprocessor::Endpoint; use crate::server::gc_worker::GcWorker; @@ -33,6 +32,7 @@ use super::service::*; use super::snap::{Runner as SnapHandler, Task as SnapTask}; use super::transport::ServerTransport; use super::{Config, Result}; +use crate::read_pool::ReadPool; const LOAD_STATISTICS_SLOTS: usize = 4; const LOAD_STATISTICS_INTERVAL: Duration = Duration::from_millis(100); @@ -61,7 +61,7 @@ pub struct Server // Currently load statistics is done in the thread. stats_pool: Option, grpc_thread_load: Arc, - yatp_read_pool: Option>, + yatp_read_pool: Option, readpool_normal_concurrency: usize, readpool_normal_thread_load: Arc, timer: Handle, @@ -78,7 +78,7 @@ impl Server { resolver: S, snap_mgr: SnapManager, gc_worker: GcWorker, - yatp_read_pool: Option>, + yatp_read_pool: Option, ) -> Result { // A helper thread (or pool) for transport layer. let stats_pool = ThreadPoolBuilder::new() @@ -376,11 +376,11 @@ mod tests { let cfg = Arc::new(cfg); let security_mgr = Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()); - let cop_read_pool = readpool_impl::build_read_pool_for_test( + let cop_read_pool = ReadPool::from(readpool_impl::build_read_pool_for_test( &CoprReadPoolConfig::default_for_test(), storage.get_engine(), - ); - let cop = coprocessor::Endpoint::new(&cfg, cop_read_pool.into()); + )); + let cop = coprocessor::Endpoint::new(&cfg, cop_read_pool.handle()); let addr = Arc::new(Mutex::new(None)); let mut server = Server::new( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f3a2e649c2f..14564043f95 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -31,7 +31,7 @@ pub use self::{ types::{StorageCallback, TxnStatus}, }; -use crate::read_pool::ReadPool; +use crate::read_pool::{ReadPool, ReadPoolHandle}; use crate::storage::{ config::Config, kv::{with_tls_engine, Error as EngineError, ErrorInner as EngineErrorInner, Modify}, @@ -81,7 +81,7 @@ pub struct Storage { sched: TxnScheduler, /// The thread pool used to run most read operations. - read_pool: ReadPool, + read_pool: ReadPoolHandle, /// How many strong references. Thread pool and workers will be stopped /// once there are no more references. @@ -149,7 +149,7 @@ macro_rules! check_key_size { impl Storage { /// Get concurrency of normal readpool. pub fn readpool_normal_concurrency(&self) -> usize { - if let ReadPool::FuturePools { + if let ReadPoolHandle::FuturePools { read_pool_normal, .. } = &self.read_pool { @@ -163,7 +163,7 @@ impl Storage { pub fn from_engine( engine: E, config: &Config, - read_pool: ReadPool, + read_pool: ReadPoolHandle, lock_mgr: Option, ) -> Result { let pessimistic_txn_enabled = lock_mgr.is_some(); @@ -1155,7 +1155,12 @@ impl TestStorageBuilder { &crate::config::StorageReadPoolConfig::default_for_test(), self.engine.clone(), ); - Storage::from_engine(self.engine, &self.config, read_pool.into(), None) + Storage::from_engine( + self.engine, + &self.config, + ReadPool::from(read_pool).handle(), + None, + ) } } diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 3c31d1a5a13..a7a91b805b9 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -91,6 +91,8 @@ fn test_serde_custom_tikv_config() { unified: UnifiedReadPoolConfig { min_thread_count: 5, max_thread_count: 10, + stack_size: ReadableSize::mb(20), + max_tasks_per_worker: 2200, }, storage: StorageReadPoolConfig { high_concurrency: 1, diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index 4f28e610141..0db5f17908b 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -9,6 +9,8 @@ unify-read-pool = true [readpool.unified] min-thread-count = 5 max-thread-count = 10 +stack-size = "20MB" +max-tasks-per-worker = 2200 [readpool.storage] high-concurrency = 1