Skip to content

Commit

Permalink
Add max-tasks-per-worker-xxx and remove end-point-max-tasks (tikv…
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored Jun 7, 2018
1 parent 6e1d040 commit 4363c06
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 102 deletions.
21 changes: 9 additions & 12 deletions etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
# normal-concurrency = 4
# size of thread pool for low-priority operations
# low-concurrency = 4
# max running high-priority operations, reject if exceed
# max-tasks-high = 8000
# max running normal-priority operations, reject if exceed
# max-tasks-normal = 8000
# max running low-priority operations, reject if exceed
# max-tasks-low = 8000
# max running high-priority operations of each worker, reject if exceed
# max-tasks-per-worker-high = 2000
# max running normal-priority operations of each worker, reject if exceed
# max-tasks-per-worker-normal = 2000
# max running low-priority operations of each worker, reject if exceed
# max-tasks-per-worker-low = 2000
# size of stack size for each thread pool
# stack-size = "10MB"

Expand All @@ -35,9 +35,9 @@
# high-concurrency = 8
# normal-concurrency = 8
# low-concurrency = 8
# max-tasks-high = 16000
# max-tasks-normal = 16000
# max-tasks-low = 16000
# max-tasks-per-worker-high = 2000
# max-tasks-per-worker-normal = 2000
# max-tasks-per-worker-low = 2000
# stack-size = "10MB"

[server]
Expand Down Expand Up @@ -66,9 +66,6 @@
# How many snapshots can be recv concurrently.
# concurrent-recv-snap-limit = 32

# max count of tasks being handled, new tasks will be rejected.
# end-point-max-tasks = 2000

# max recursion level allowed when decoding dag expression
# end-point-recursion-limit = 1000

Expand Down
99 changes: 50 additions & 49 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,9 @@ macro_rules! readpool_config {
pub high_concurrency: usize,
pub normal_concurrency: usize,
pub low_concurrency: usize,
pub max_tasks_high: usize,
pub max_tasks_normal: usize,
pub max_tasks_low: usize,
pub max_tasks_per_worker_high: usize,
pub max_tasks_per_worker_normal: usize,
pub max_tasks_per_worker_low: usize,
pub stack_size: ReadableSize,
}

Expand All @@ -738,9 +738,9 @@ macro_rules! readpool_config {
high_concurrency: self.high_concurrency,
normal_concurrency: self.normal_concurrency,
low_concurrency: self.low_concurrency,
max_tasks_high: self.max_tasks_high,
max_tasks_normal: self.max_tasks_normal,
max_tasks_low: self.max_tasks_low,
max_tasks_per_worker_high: self.max_tasks_per_worker_high,
max_tasks_per_worker_normal: self.max_tasks_per_worker_normal,
max_tasks_per_worker_low: self.max_tasks_per_worker_low,
stack_size: self.stack_size,
}
}
Expand Down Expand Up @@ -768,31 +768,22 @@ macro_rules! readpool_config {
format!("readpool.{}.stack-size should be >= 2mb", $display_name).into(),
);
}
if self.max_tasks_high
< self.high_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
{
if self.max_tasks_per_worker_high <= 1 {
return Err(format!(
"readpool.{}.max-tasks-high should be >= {}",
$display_name,
self.high_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
"readpool.{}.max-tasks-per-worker-high should be > 1",
$display_name
).into());
}
if self.max_tasks_normal
< self.normal_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
{
if self.max_tasks_per_worker_normal <= 1 {
return Err(format!(
"readpool.{}.max-tasks-normal should be >= {}",
$display_name,
self.normal_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
"readpool.{}.max-tasks-per-worker-normal should be > 1",
$display_name
).into());
}
if self.max_tasks_low
< self.low_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
{
if self.max_tasks_per_worker_low <= 1 {
return Err(format!(
"readpool.{}.max-tasks-low should be >= {}",
$display_name,
self.low_concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE
"readpool.{}.max-tasks-per-worker-low should be > 1",
$display_name
).into());
}

Expand Down Expand Up @@ -826,24 +817,27 @@ macro_rules! readpool_config {
assert!(invalid_cfg.validate().is_err());

let mut invalid_cfg = cfg.clone();
invalid_cfg.high_concurrency = 5;
invalid_cfg.max_tasks_high = 100;
invalid_cfg.max_tasks_per_worker_high = 0;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_high = 10000;
invalid_cfg.max_tasks_per_worker_high = 1;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_per_worker_high = 100;
assert!(cfg.validate().is_ok());

let mut invalid_cfg = cfg.clone();
invalid_cfg.normal_concurrency = 2;
invalid_cfg.max_tasks_normal = 2000;
invalid_cfg.max_tasks_per_worker_normal = 0;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_per_worker_normal = 1;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_normal = 4000;
invalid_cfg.max_tasks_per_worker_normal = 100;
assert!(cfg.validate().is_ok());

let mut invalid_cfg = cfg.clone();
invalid_cfg.low_concurrency = 2;
invalid_cfg.max_tasks_low = 123;
invalid_cfg.max_tasks_per_worker_low = 0;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_low = 5000;
invalid_cfg.max_tasks_per_worker_low = 1;
assert!(invalid_cfg.validate().is_err());
invalid_cfg.max_tasks_per_worker_low = 100;
assert!(cfg.validate().is_ok());
}
}
Expand All @@ -860,12 +854,9 @@ impl Default for StorageReadPoolConfig {
high_concurrency: DEFAULT_STORAGE_READPOOL_CONCURRENCY,
normal_concurrency: DEFAULT_STORAGE_READPOOL_CONCURRENCY,
low_concurrency: DEFAULT_STORAGE_READPOOL_CONCURRENCY,
max_tasks_high: DEFAULT_STORAGE_READPOOL_CONCURRENCY
* readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_normal: DEFAULT_STORAGE_READPOOL_CONCURRENCY
* readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_low: DEFAULT_STORAGE_READPOOL_CONCURRENCY
* readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_per_worker_high: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_normal: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_low: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
stack_size: ReadableSize::mb(readpool::config::DEFAULT_STACK_SIZE_MB),
}
}
Expand All @@ -891,9 +882,9 @@ impl Default for CoprocessorReadPoolConfig {
high_concurrency: concurrency,
normal_concurrency: concurrency,
low_concurrency: concurrency,
max_tasks_high: concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_normal: concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_low: concurrency * readpool::config::DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_per_worker_high: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_normal: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_low: readpool::config::DEFAULT_MAX_TASKS_PER_WORKER,
stack_size: ReadableSize::mb(readpool::config::DEFAULT_STACK_SIZE_MB),
}
}
Expand Down Expand Up @@ -1031,35 +1022,45 @@ impl TiKvConfig {
}
self.raft_store.region_split_size = default_raft_store.region_split_size;
}
if self.server.end_point_concurrency != None {
if self.server.end_point_concurrency.is_some() {
warn!(
"deprecated configuration, {} has been moved to {}",
"server.end_point_concurrency", "readpool.coprocessor.xxx-concurrency",
"server.end-point-concurrency", "readpool.coprocessor.xxx-concurrency",
);
warn!(
"override {} with {}, {:?}",
"readpool.coprocessor.xxx-concurrency",
"server.end_point_concurrency",
"server.end-point-concurrency",
self.server.end_point_concurrency
);
let concurrency = self.server.end_point_concurrency.unwrap();
self.readpool.coprocessor.high_concurrency = concurrency;
self.readpool.coprocessor.normal_concurrency = concurrency;
self.readpool.coprocessor.low_concurrency = concurrency;
}
if self.server.end_point_stack_size != None {
if self.server.end_point_stack_size.is_some() {
warn!(
"deprecated configuration, {} has been moved to {}",
"server.end_point_stack_size", "readpool.coprocessor.stack_size",
"server.end-point-stack-size", "readpool.coprocessor.stack-size",
);
warn!(
"override {} with {}, {:?}",
"readpool.coprocessor.stack_size",
"server.end_point_stack_size",
"readpool.coprocessor.stack-size",
"server.end-point-stack-size",
self.server.end_point_stack_size
);
self.readpool.coprocessor.stack_size = self.server.end_point_stack_size.unwrap();
}
if self.server.end_point_max_tasks.is_some() {
warn!(
"deprecated configuration, {} is no longer used and ignored, please use {}.",
"server.end-point-max-tasks", "readpool.coprocessor.max-tasks-per-worker-xxx",
);
// Note:
// Our `end_point_max_tasks` is mostly mistakenly configured, so we don't override
// new configuration using old values.
self.server.end_point_max_tasks = None;
}
if self.raft_store.clean_stale_peer_delay.as_secs() > 0 {
let delay_secs = self.raft_store.clean_stale_peer_delay.as_secs()
+ self.server.end_point_request_max_handle_duration.as_secs();
Expand Down
9 changes: 8 additions & 1 deletion src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ pub struct Host {
last_req_id: u64,
pool: ReadPool<ReadPoolContext>,
basic_local_metrics: BasicLocalMetrics,
// TODO: Deprecate after totally switching to read pool
max_running_task_count: usize,
// TODO: Deprecate after totally switching to read pool
running_task_count: Arc<AtomicUsize>,
batch_row_limit: usize,
stream_batch_row_limit: usize,
Expand All @@ -79,22 +81,27 @@ impl Host {
cfg: &Config,
pool: ReadPool<ReadPoolContext>,
) -> Host {
// Use read pool's max task config
let max_running_task_count = pool.get_max_tasks();
Host {
engine,
sched,
reqs: HashMap::default(),
last_req_id: 0,
pool,
basic_local_metrics: BasicLocalMetrics::default(),
max_running_task_count: cfg.end_point_max_tasks,
// TODO: Deprecate after totally switching to read pool
max_running_task_count,
batch_row_limit: cfg.end_point_batch_row_limit,
stream_batch_row_limit: cfg.end_point_stream_batch_row_limit,
request_max_handle_duration: cfg.end_point_request_max_handle_duration.0,
// TODO: Deprecate after totally switching to read pool
running_task_count: Arc::new(AtomicUsize::new(0)),
}
}

#[inline]
// TODO: Deprecate after totally switching to read pool
fn running_task_count(&self) -> usize {
self.running_task_count.load(Ordering::Acquire)
}
Expand Down
21 changes: 7 additions & 14 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ const DEFAULT_GRPC_RAFT_CONN_NUM: usize = 10;
const DEFAULT_GRPC_STREAM_INITIAL_WINDOW_SIZE: u64 = 2 * 1024 * 1024;
const DEFAULT_MESSAGES_PER_TICK: usize = 4096;

// Assume a request can be finished in 1ms, a request at position x will wait about
// 0.001 * x secs to be actual started. A server-is-busy error will trigger 2 seconds
// backoff. So when it needs to wait for more than 2 seconds, return error won't causse
// larger latency.
pub const DEFAULT_MAX_RUNNING_TASK_COUNT: usize = 2 as usize * 1000;

// Number of rows in each chunk.
pub const DEFAULT_ENDPOINT_BATCH_ROW_LIMIT: usize = 64;

Expand Down Expand Up @@ -79,7 +73,6 @@ pub struct Config {
pub concurrent_send_snap_limit: usize,
/// How many snapshots can be recv concurrently.
pub concurrent_recv_snap_limit: usize,
pub end_point_max_tasks: usize,
pub end_point_recursion_limit: u32,
pub end_point_stream_channel_size: usize,
pub end_point_batch_row_limit: usize,
Expand All @@ -100,6 +93,11 @@ pub struct Config {
#[doc(hidden)]
#[serde(skip_serializing)]
pub end_point_stack_size: Option<ReadableSize>,

// deprecated. use readpool.coprocessor.max_tasks_per_worker_xx.
#[doc(hidden)]
#[serde(skip_serializing)]
pub end_point_max_tasks: Option<usize>,
}

impl Default for Config {
Expand All @@ -123,8 +121,8 @@ impl Default for Config {
concurrent_send_snap_limit: 32,
concurrent_recv_snap_limit: 32,
end_point_concurrency: None, // deprecated
end_point_max_tasks: DEFAULT_MAX_RUNNING_TASK_COUNT,
end_point_stack_size: None, // deprecated
end_point_max_tasks: None, // deprecated
end_point_stack_size: None, // deprecated
end_point_recursion_limit: 1000,
end_point_stream_channel_size: 8,
end_point_batch_row_limit: DEFAULT_ENDPOINT_BATCH_ROW_LIMIT,
Expand Down Expand Up @@ -163,7 +161,6 @@ impl Config {
"concurrent-recv-snap-limit",
self.concurrent_recv_snap_limit,
),
("end-point-max-tasks", self.end_point_max_tasks),
];
for (label, value) in non_zero_entries {
if value == 0 {
Expand Down Expand Up @@ -249,10 +246,6 @@ mod tests {
invalid_cfg.concurrent_recv_snap_limit = 0;
assert!(invalid_cfg.validate().is_err());

let mut invalid_cfg = cfg.clone();
invalid_cfg.end_point_max_tasks = 0;
assert!(invalid_cfg.validate().is_err());

let mut invalid_cfg = cfg.clone();
invalid_cfg.end_point_recursion_limit = 0;
assert!(invalid_cfg.validate().is_err());
Expand Down
14 changes: 7 additions & 7 deletions src/server/readpool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use util::config::ReadableSize;
// 0.001 * x secs to be actual started. A server-is-busy error will trigger 2 seconds
// backoff. So when it needs to wait for more than 2 seconds, return error won't causse
// larger latency.
pub const DEFAULT_MAX_TASKS_PER_CORE: usize = 2 as usize * 1000;
pub const DEFAULT_MAX_TASKS_PER_WORKER: usize = 2 as usize * 1000;

pub const DEFAULT_STACK_SIZE_MB: u64 = 10;

Expand All @@ -26,9 +26,9 @@ pub struct Config {
pub high_concurrency: usize,
pub normal_concurrency: usize,
pub low_concurrency: usize,
pub max_tasks_high: usize,
pub max_tasks_normal: usize,
pub max_tasks_low: usize,
pub max_tasks_per_worker_high: usize,
pub max_tasks_per_worker_normal: usize,
pub max_tasks_per_worker_low: usize,
pub stack_size: ReadableSize,
}

Expand All @@ -39,9 +39,9 @@ impl Config {
high_concurrency: concurrency,
normal_concurrency: concurrency,
low_concurrency: concurrency,
max_tasks_high: concurrency * DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_normal: concurrency * DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_low: concurrency * DEFAULT_MAX_TASKS_PER_CORE,
max_tasks_per_worker_high: DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_normal: DEFAULT_MAX_TASKS_PER_WORKER,
max_tasks_per_worker_low: DEFAULT_MAX_TASKS_PER_WORKER,
stack_size: ReadableSize::mb(DEFAULT_STACK_SIZE_MB),
}
}
Expand Down
Loading

0 comments on commit 4363c06

Please sign in to comment.