Skip to content

Commit

Permalink
*: add stack size and max task number limit to unified read pool (#6597)
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Feb 17, 2020
1 parent b0241e8 commit f5b7dc8
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 102 deletions.
24 changes: 12 additions & 12 deletions cmd/src/server.rs
Expand Up @@ -418,21 +418,21 @@ impl TiKVServer {
None
};

let storage_read_pool = if self.config.readpool.unify_read_pool {
ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone())
let storage_read_pool_handle = if self.config.readpool.unify_read_pool {
unified_read_pool.as_ref().unwrap().handle()
} else {
let storage_read_pools = storage::build_read_pool(
let storage_read_pools = ReadPool::from(storage::build_read_pool(
&self.config.readpool.storage,
pd_sender.clone(),
engines.engine.clone(),
);
ReadPool::from(storage_read_pools)
));
storage_read_pools.handle()
};

let storage = create_raft_storage(
engines.engine.clone(),
&self.config.storage,
storage_read_pool,
storage_read_pool_handle,
lock_mgr.clone(),
)
.unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e));
Expand All @@ -454,15 +454,15 @@ impl TiKVServer {
.build(snap_path, Some(self.router.clone()));

// Create coprocessor endpoint.
let cop_read_pool = if self.config.readpool.unify_read_pool {
ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone())
let cop_read_pool_handle = if self.config.readpool.unify_read_pool {
unified_read_pool.as_ref().unwrap().handle()
} else {
let cop_read_pools = coprocessor::readpool_impl::build_read_pool(
let cop_read_pools = ReadPool::from(coprocessor::readpool_impl::build_read_pool(
&self.config.readpool.coprocessor,
pd_sender.clone(),
engines.engine.clone(),
);
ReadPool::from(cop_read_pools)
));
cop_read_pools.handle()
};

let server_config = Arc::new(self.config.server.clone());
Expand All @@ -472,7 +472,7 @@ impl TiKVServer {
&server_config,
&self.security_mgr,
storage,
coprocessor::Endpoint::new(&server_config, cop_read_pool),
coprocessor::Endpoint::new(&server_config, cop_read_pool_handle),
engines.raft_router.clone(),
self.resolver.clone(),
snap_mgr.clone(),
Expand Down
7 changes: 4 additions & 3 deletions components/test_coprocessor/src/fixture.rs
Expand Up @@ -7,6 +7,7 @@ use kvproto::kvrpcpb::Context;
use tidb_query::codec::Datum;
use tikv::config::CoprReadPoolConfig;
use tikv::coprocessor::{readpool_impl, Endpoint};
use tikv::read_pool::ReadPool;
use tikv::server::Config;
use tikv::storage::kv::RocksEngine;
use tikv::storage::{Engine, TestEngineBuilder};
Expand Down Expand Up @@ -79,11 +80,11 @@ pub fn init_data_with_details<E: Engine>(
store.commit_with_ctx(ctx);
}

let pool = readpool_impl::build_read_pool_for_test(
let pool = ReadPool::from(readpool_impl::build_read_pool_for_test(
&CoprReadPoolConfig::default_for_test(),
store.get_engine(),
);
let cop = Endpoint::new(cfg, pool.into());
));
let cop = Endpoint::new(cfg, pool.handle());
(store, cop)
}

Expand Down
13 changes: 7 additions & 6 deletions components/test_raftstore/src/server.rs
Expand Up @@ -28,6 +28,7 @@ use raftstore::Result;
use tikv::config::{ConfigController, ConfigHandler, Module, TiKvConfig};
use tikv::coprocessor;
use tikv::import::{ImportSSTService, SSTImporter};
use tikv::read_pool::ReadPool;
use tikv::server::gc_worker::GcWorker;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::lock_manager::LockManager;
Expand Down Expand Up @@ -145,10 +146,10 @@ impl Simulator for ServerCluster {

// Create storage.
let pd_worker = FutureWorker::new("test-pd-worker");
let storage_read_pool = storage::build_read_pool_for_test(
let storage_read_pool = ReadPool::from(storage::build_read_pool_for_test(
&tikv::config::StorageReadPoolConfig::default_for_test(),
raft_engine.clone(),
);
));

let engine = RaftKv::new(sim_router.clone());

Expand All @@ -165,7 +166,7 @@ impl Simulator for ServerCluster {
let store = create_raft_storage(
engine,
&cfg.storage,
storage_read_pool.into(),
storage_read_pool.handle(),
Some(lock_mgr.clone()),
)?;
self.storages.insert(node_id, raft_engine);
Expand Down Expand Up @@ -203,11 +204,11 @@ impl Simulator for ServerCluster {
let snap_mgr = SnapManager::new(tmp_str, Some(router.clone()));
let server_cfg = Arc::new(cfg.server.clone());
let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap());
let cop_read_pool = coprocessor::readpool_impl::build_read_pool_for_test(
let cop_read_pool = ReadPool::from(coprocessor::readpool_impl::build_read_pool_for_test(
&tikv::config::CoprReadPoolConfig::default_for_test(),
store.get_engine(),
);
let cop = coprocessor::Endpoint::new(&server_cfg, cop_read_pool.into());
));
let cop = coprocessor::Endpoint::new(&server_cfg, cop_read_pool.handle());
let mut server = None;
for _ in 0..100 {
let mut svr = Server::new(
Expand Down
5 changes: 3 additions & 2 deletions components/test_raftstore/src/util.rs
Expand Up @@ -157,8 +157,9 @@ pub fn new_readpool_cfg() -> ReadPoolConfig {
ReadPoolConfig {
unify_read_pool: false,
unified: UnifiedReadPoolConfig {
min_thread_count: 0,
max_thread_count: 0,
min_thread_count: 1,
max_thread_count: 1,
..UnifiedReadPoolConfig::default()
},
storage: StorageReadPoolConfig {
high_concurrency: 1,
Expand Down
7 changes: 7 additions & 0 deletions etc/config-template.toml
Expand Up @@ -38,10 +38,17 @@
[readpool.unified]
## The minimal working thread count of the thread pool.
# min-thread-count = 1

## The maximum working thread count of the thread pool.
## The default value is the max(4, LOGICAL_CPU_NUM * 0.8).
# max-thread-count = 8

## Size of the stack for each thread in the thread pool.
# stack-size = "10MB"

## Max running tasks of each worker, reject if exceeded.
# max-tasks-per-worker = 2000

[readpool.storage]
## Size of the thread pool for high-priority operations.
# high-concurrency = 4
Expand Down
45 changes: 40 additions & 5 deletions src/config.rs
Expand Up @@ -1326,6 +1326,8 @@ pub mod log_level_serde {
pub struct UnifiedReadPoolConfig {
pub min_thread_count: usize,
pub max_thread_count: usize,
pub stack_size: ReadableSize,
pub max_tasks_per_worker: usize,
// FIXME: Add more configs when they are effective in yatp
}

Expand All @@ -1343,6 +1345,16 @@ impl UnifiedReadPoolConfig {
.into(),
);
}
if self.stack_size.0 < ReadableSize::mb(2).0 {
return Err("readpool.unified.stack-size should be >= 2mb"
.to_string()
.into());
}
if self.max_tasks_per_worker <= 1 {
return Err("readpool.unified.max-tasks-per-worker should be > 1"
.to_string()
.into());
}
Ok(())
}
}
Expand All @@ -1358,6 +1370,8 @@ impl Default for UnifiedReadPoolConfig {
Self {
min_thread_count: 1,
max_thread_count: concurrency,
stack_size: ReadableSize::mb(DEFAULT_READPOOL_STACK_SIZE_MB),
max_tasks_per_worker: DEFAULT_READPOOL_MAX_TASKS_PER_WORKER,
}
}
}
Expand All @@ -1371,6 +1385,8 @@ mod unified_read_pool_tests {
let cfg = UnifiedReadPoolConfig {
min_thread_count: 1,
max_thread_count: 2,
stack_size: ReadableSize::mb(2),
max_tasks_per_worker: 2000,
};
assert!(cfg.validate().is_ok());

Expand All @@ -1383,6 +1399,19 @@ mod unified_read_pool_tests {
let invalid_cfg = UnifiedReadPoolConfig {
min_thread_count: 2,
max_thread_count: 1,
..cfg
};
assert!(invalid_cfg.validate().is_err());

let invalid_cfg = UnifiedReadPoolConfig {
stack_size: ReadableSize::mb(1),
..cfg
};
assert!(invalid_cfg.validate().is_err());

let invalid_cfg = UnifiedReadPoolConfig {
max_tasks_per_worker: 1,
..cfg
};
assert!(invalid_cfg.validate().is_err());
}
Expand Down Expand Up @@ -1459,10 +1488,12 @@ macro_rules! readpool_config {
)
.into());
}
if self.stack_size.0 < ReadableSize::mb(2).0 {
return Err(
format!("readpool.{}.stack-size should be >= 2mb", $display_name).into(),
);
if self.stack_size.0 < ReadableSize::mb(MIN_READPOOL_STACK_SIZE_MB).0 {
return Err(format!(
"readpool.{}.stack-size should be >= {}mb",
$display_name, MIN_READPOOL_STACK_SIZE_MB
)
.into());
}
if self.max_tasks_per_worker_high <= 1 {
return Err(format!(
Expand Down Expand Up @@ -1550,8 +1581,9 @@ const DEFAULT_STORAGE_READPOOL_MAX_CONCURRENCY: usize = 8;
// 0.001 * x secs to be actual started. A server-is-busy error will trigger 2 seconds
// backoff. So when it needs to wait for more than 2 seconds, return error won't causse
// larger latency.
const DEFAULT_READPOOL_MAX_TASKS_PER_WORKER: usize = 2 as usize * 1000;
const DEFAULT_READPOOL_MAX_TASKS_PER_WORKER: usize = 2 * 1000;

const MIN_READPOOL_STACK_SIZE_MB: u64 = 2;
const DEFAULT_READPOOL_STACK_SIZE_MB: u64 = 10;

readpool_config!(StorageReadPoolConfig, storage_read_pool_test, "storage");
Expand Down Expand Up @@ -1642,6 +1674,8 @@ mod readpool_tests {
let unified = UnifiedReadPoolConfig {
min_thread_count: 0,
max_thread_count: 0,
stack_size: ReadableSize::mb(0),
max_tasks_per_worker: 0,
};
assert!(unified.validate().is_err());
let storage = StorageReadPoolConfig::default();
Expand Down Expand Up @@ -1701,6 +1735,7 @@ mod readpool_tests {
let unified = UnifiedReadPoolConfig {
min_thread_count: 0,
max_thread_count: 0,
..Default::default()
};
assert!(unified.validate().is_err());
let storage = StorageReadPoolConfig::default();
Expand Down

0 comments on commit f5b7dc8

Please sign in to comment.