Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "*: make unified-pool use FuturePool (#15925)" #16050

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 2 additions & 13 deletions components/tikv_util/src/yatp_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,26 +391,15 @@ impl<T: PoolTicker> YatpPoolBuilder<T> {
FuturePool::from_pool(pool, &name, size, task)
}

fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
let (builder, runner) = self.create_builder();
builder.build_with_queue_and_runner(
yatp::queue::QueueType::SingleLevel,
yatp::pool::CloneRunnerBuilder(runner),
)
}

pub fn build_multi_level_future_pool(self) -> FuturePool {
let name = self
.name_prefix
.clone()
.unwrap_or_else(|| "yatp_pool".to_string());
let size = self.core_thread_count;
let task = self.max_tasks;
let pool = self.build_multi_level_pool();
FuturePool::from_pool(pool, &name, size, task)
}

fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
let name = self
.name_prefix
.clone()
Expand Down
155 changes: 95 additions & 60 deletions src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use tikv_util::{
worker::{Runnable, RunnableWithTimer, Scheduler, Worker},
yatp_pool::{self, CleanupMethod, FuturePool, PoolTicker, YatpPoolBuilder},
};
use yatp::{metrics::MULTILEVEL_LEVEL_ELAPSED, queue::Extras};
use tracker::TrackedFuture;
use yatp::{
metrics::MULTILEVEL_LEVEL_ELAPSED, pool::Remote, queue::Extras, task::future::TaskCell,
};

use self::metrics::*;
use crate::{
Expand All @@ -53,9 +56,11 @@ pub enum ReadPool {
read_pool_low: FuturePool,
},
Yatp {
pool: FuturePool,
// deprecated. will remove in the v8.x.
pool: yatp::ThreadPool<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand All @@ -76,11 +81,17 @@ impl ReadPool {
ReadPool::Yatp {
pool,
running_tasks,
running_threads,
max_tasks,
pool_size,
resource_ctl,
time_slice_inspector,
} => ReadPoolHandle::Yatp {
remote: pool.clone(),
remote: pool.remote().clone(),
running_tasks: running_tasks.clone(),
running_threads: running_threads.clone(),
max_tasks: *max_tasks,
pool_size: *pool_size,
resource_ctl: resource_ctl.clone(),
time_slice_inspector: time_slice_inspector.clone(),
},
Expand All @@ -96,8 +107,11 @@ pub enum ReadPoolHandle {
read_pool_low: FuturePool,
},
Yatp {
remote: FuturePool,
remote: Remote<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand Down Expand Up @@ -132,10 +146,19 @@ impl ReadPoolHandle {
ReadPoolHandle::Yatp {
remote,
running_tasks,
max_tasks,
resource_ctl,
..
} => {
let running_tasks = running_tasks.clone();
// Note that the running task number limit is not strict.
// If several tasks are spawned at the same time while the running task number
// is close to the limit, they may all pass this check and the number of running
// tasks may exceed the limit.
if running_tasks.get() as usize >= *max_tasks {
return Err(ReadPoolError::UnifiedReadPoolFull);
}

running_tasks.inc();
let fixed_level = match priority {
CommandPri::High => Some(0),
Expand All @@ -145,33 +168,31 @@ impl ReadPoolHandle {
let group_name = metadata.group_name().to_owned();
let mut extras = Extras::new_multilevel(task_id, fixed_level);
extras.set_metadata(metadata.to_vec());
let running_tasks1 = running_tasks.clone();
if let Some(resource_ctl) = resource_ctl {
let fut = with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
);
remote.spawn_with_extras(fut, extras).map_err(|e| {
running_tasks1.dec();
e
})?;
let task_cell = if let Some(resource_ctl) = resource_ctl {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
)),
extras,
)
} else {
let fut = async move {
f.await;
running_tasks.dec();
};
remote.spawn_with_extras(fut, extras).map_err(|e| {
running_tasks1.dec();
e
})?;
}
TaskCell::new(
TrackedFuture::new(async move {
f.await;
running_tasks.dec();
}),
extras,
)
};
remote.spawn(task_cell);
}
}
Ok(())
Expand Down Expand Up @@ -211,7 +232,7 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools {
read_pool_normal, ..
} => read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp { remote, .. } => remote.get_pool_size(),
ReadPoolHandle::Yatp { pool_size, .. } => *pool_size,
}
}

Expand All @@ -221,10 +242,10 @@ impl ReadPoolHandle {
read_pool_normal, ..
} => read_pool_normal.get_running_task_count() / read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp {
remote,
running_tasks,
pool_size,
..
} => running_tasks.get() as usize / remote.get_pool_size(),
} => running_tasks.get() as usize / *pool_size,
}
}

Expand All @@ -233,19 +254,34 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools { .. } => {
unreachable!()
}
ReadPoolHandle::Yatp { remote, .. } => {
remote.scale_pool_size(max_thread_count);
ReadPoolHandle::Yatp {
remote,
running_threads,
max_tasks,
pool_size,
..
} => {
remote.scale_workers(max_thread_count);
*max_tasks = max_tasks
.saturating_div(*pool_size)
.saturating_mul(max_thread_count);
running_threads.set(max_thread_count as i64);
*pool_size = max_thread_count;
}
}
}

pub fn set_max_tasks_per_worker(&self, tasks_per_thread: usize) {
pub fn set_max_tasks_per_worker(&mut self, tasks_per_thread: usize) {
match self {
ReadPoolHandle::FuturePools { .. } => {
unreachable!()
}
ReadPoolHandle::Yatp { remote, .. } => {
remote.set_max_tasks_per_worker(tasks_per_thread);
ReadPoolHandle::Yatp {
max_tasks,
pool_size,
..
} => {
*max_tasks = tasks_per_thread.saturating_mul(*pool_size);
}
}
}
Expand Down Expand Up @@ -452,11 +488,6 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
config.max_thread_count,
),
)
.max_tasks(
config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
)
.after_start(move || {
let engine = raftkv.lock().unwrap().clone();
set_tls_engine(engine);
Expand All @@ -468,15 +499,21 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
.enable_task_wait_metrics(enable_task_wait_metrics);

let pool = if let Some(ref r) = resource_ctl {
builder.build_priority_future_pool(r.clone())
builder.build_priority_pool(r.clone())
} else {
builder.build_multi_level_future_pool()
builder.build_multi_level_pool()
};
let time_slice_inspector = Arc::new(TimeSliceInspector::new(&unified_read_pool_name));
ReadPool::Yatp {
pool,
running_tasks: UNIFIED_READ_POOL_RUNNING_TASKS
.with_label_values(&[&unified_read_pool_name]),
running_threads: UNIFIED_READ_POOL_RUNNING_THREADS
.with_label_values(&[&unified_read_pool_name]),
max_tasks: config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
pool_size: config.max_thread_count,
resource_ctl,
time_slice_inspector,
}
Expand Down Expand Up @@ -754,6 +791,12 @@ mod metrics {
&["name"]
)
.unwrap();
pub static ref UNIFIED_READ_POOL_RUNNING_THREADS: IntGaugeVec = register_int_gauge_vec!(
"tikv_unified_read_pool_thread_count",
"The number of running threads in the unified read pool",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The number of running threads in the unified read pool",
"The number of running tasks in the unified read pool",

maybe pending task is more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its threads in here.

&["name"]
)
.unwrap();
}
}

Expand All @@ -762,8 +805,6 @@ mod tests {
use std::{thread, time::Duration};

use futures::channel::oneshot;
use futures_executor::block_on;
use kvproto::kvrpcpb::ResourceControlContext;
use raftstore::store::{ReadStats, WriteStats};
use resource_control::ResourceGroupManager;

Expand Down Expand Up @@ -823,7 +864,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
tx1.send(()).unwrap();
Expand Down Expand Up @@ -884,7 +925,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

Expand All @@ -897,7 +938,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down Expand Up @@ -946,18 +987,12 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

// TODO: move running task by priority to read_pool.
// spawn a high-priority task, should not return Full error.
let (task_high, tx_h) = gen_task();
let mut ctx = ResourceControlContext::default();
ctx.override_priority = 16; // high priority
let metadata = TaskMetadata::from_ctx(&ctx);
let f = handle.spawn_handle(task_high, CommandPri::Normal, 6, metadata, None);
tx_h.send(()).unwrap();
block_on(f).unwrap();

tx1.send(()).unwrap();
tx2.send(()).unwrap();
Expand All @@ -972,7 +1007,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down