Skip to content

Commit

Permalink
Revert "*: make unified-pool use FuturePool (#15925)"
Browse files Browse the repository at this point in the history
This reverts commit 7be1b17.

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 20, 2023
1 parent d7959b8 commit 55fa2a5
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 66 deletions.
4 changes: 2 additions & 2 deletions components/tikv_util/src/worker/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl Worker {
metrics_pending_task_count: IntGauge,
) {
let counter = self.counter.clone();
let _ = self.pool.spawn(async move {
self.pool.spawn(async move {
let mut handle = RunnableWrapper { inner: runner };
while let Some(msg) = receiver.next().await {
match msg {
Expand All @@ -498,7 +498,7 @@ impl Worker {
let counter = self.counter.clone();
let timeout = runner.get_interval();
Self::delay_notify(tx.clone(), timeout);
let _ = self.pool.spawn(async move {
self.pool.spawn(async move {
let mut handle = RunnableWrapper { inner: runner };
while let Some(msg) = receiver.next().await {
match msg {
Expand Down
15 changes: 2 additions & 13 deletions components/tikv_util/src/yatp_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,26 +391,15 @@ impl<T: PoolTicker> YatpPoolBuilder<T> {
FuturePool::from_pool(pool, &name, size, task)
}

fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
let (builder, runner) = self.create_builder();
builder.build_with_queue_and_runner(
yatp::queue::QueueType::SingleLevel,
yatp::pool::CloneRunnerBuilder(runner),
)
}

pub fn build_multi_level_future_pool(self) -> FuturePool {
let name = self
.name_prefix
.clone()
.unwrap_or_else(|| "yatp_pool".to_string());
let size = self.core_thread_count;
let task = self.max_tasks;
let pool = self.build_multi_level_pool();
FuturePool::from_pool(pool, &name, size, task)
}

fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
let name = self
.name_prefix
.clone()
Expand Down
145 changes: 94 additions & 51 deletions src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use tikv_util::{
worker::{Runnable, RunnableWithTimer, Scheduler, Worker},
yatp_pool::{self, CleanupMethod, FuturePool, PoolTicker, YatpPoolBuilder},
};
use yatp::{metrics::MULTILEVEL_LEVEL_ELAPSED, queue::Extras};
use tracker::TrackedFuture;
use yatp::{
metrics::MULTILEVEL_LEVEL_ELAPSED, pool::Remote, queue::Extras, task::future::TaskCell,
};

use self::metrics::*;
use crate::{
Expand All @@ -53,9 +56,11 @@ pub enum ReadPool {
read_pool_low: FuturePool,
},
Yatp {
pool: FuturePool,
// deprecated. will remove in the v8.x.
pool: yatp::ThreadPool<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand All @@ -76,11 +81,17 @@ impl ReadPool {
ReadPool::Yatp {
pool,
running_tasks,
running_threads,
max_tasks,
pool_size,
resource_ctl,
time_slice_inspector,
} => ReadPoolHandle::Yatp {
remote: pool.clone(),
remote: pool.remote().clone(),
running_tasks: running_tasks.clone(),
running_threads: running_threads.clone(),
max_tasks: *max_tasks,
pool_size: *pool_size,
resource_ctl: resource_ctl.clone(),
time_slice_inspector: time_slice_inspector.clone(),
},
Expand All @@ -96,8 +107,11 @@ pub enum ReadPoolHandle {
read_pool_low: FuturePool,
},
Yatp {
remote: FuturePool,
remote: Remote<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand Down Expand Up @@ -132,10 +146,19 @@ impl ReadPoolHandle {
ReadPoolHandle::Yatp {
remote,
running_tasks,
max_tasks,
resource_ctl,
..
} => {
let running_tasks = running_tasks.clone();
// Note that the running task number limit is not strict.
// If several tasks are spawned at the same time while the running task number
// is close to the limit, they may all pass this check and the number of running
// tasks may exceed the limit.
if running_tasks.get() as usize >= *max_tasks {
return Err(ReadPoolError::UnifiedReadPoolFull);
}

running_tasks.inc();
let fixed_level = match priority {
CommandPri::High => Some(0),
Expand All @@ -145,33 +168,31 @@ impl ReadPoolHandle {
let group_name = metadata.group_name().to_owned();
let mut extras = Extras::new_multilevel(task_id, fixed_level);
extras.set_metadata(metadata.to_vec());
let running_tasks1 = running_tasks.clone();
if let Some(resource_ctl) = resource_ctl {
let fut = with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
);
remote.spawn_with_extras(fut, extras).map_err(|e| {
running_tasks1.dec();
e
})?;
let task_cell = if let Some(resource_ctl) = resource_ctl {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
)),
extras,
)
} else {
let fut = async move {
f.await;
running_tasks.dec();
};
remote.spawn_with_extras(fut, extras).map_err(|e| {
running_tasks1.dec();
e
})?;
}
TaskCell::new(
TrackedFuture::new(async move {
f.await;
running_tasks.dec();
}),
extras,
)
};
remote.spawn(task_cell);
}
}
Ok(())
Expand Down Expand Up @@ -211,7 +232,7 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools {
read_pool_normal, ..
} => read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp { remote, .. } => remote.get_pool_size(),
ReadPoolHandle::Yatp { pool_size, .. } => *pool_size,
}
}

Expand All @@ -221,10 +242,10 @@ impl ReadPoolHandle {
read_pool_normal, ..
} => read_pool_normal.get_running_task_count() / read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp {
remote,
running_tasks,
pool_size,
..
} => running_tasks.get() as usize / remote.get_pool_size(),
} => running_tasks.get() as usize / *pool_size,
}
}

Expand All @@ -233,19 +254,34 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools { .. } => {
unreachable!()
}
ReadPoolHandle::Yatp { remote, .. } => {
remote.scale_pool_size(max_thread_count);
ReadPoolHandle::Yatp {
remote,
running_threads,
max_tasks,
pool_size,
..
} => {
remote.scale_workers(max_thread_count);
*max_tasks = max_tasks
.saturating_div(*pool_size)
.saturating_mul(max_thread_count);
running_threads.set(max_thread_count as i64);
*pool_size = max_thread_count;
}
}
}

pub fn set_max_tasks_per_worker(&self, tasks_per_thread: usize) {
pub fn set_max_tasks_per_worker(&mut self, tasks_per_thread: usize) {
match self {
ReadPoolHandle::FuturePools { .. } => {
unreachable!()
}
ReadPoolHandle::Yatp { remote, .. } => {
remote.set_max_tasks_per_worker(tasks_per_thread);
ReadPoolHandle::Yatp {
max_tasks,
pool_size,
..
} => {
*max_tasks = tasks_per_thread.saturating_mul(*pool_size);
}
}
}
Expand Down Expand Up @@ -452,11 +488,6 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
config.max_thread_count,
),
)
.max_tasks(
config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
)
.after_start(move || {
let engine = raftkv.lock().unwrap().clone();
set_tls_engine(engine);
Expand All @@ -468,15 +499,21 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
.enable_task_wait_metrics(enable_task_wait_metrics);

let pool = if let Some(ref r) = resource_ctl {
builder.build_priority_future_pool(r.clone())
builder.build_priority_pool(r.clone())
} else {
builder.build_multi_level_future_pool()
builder.build_multi_level_pool()
};
let time_slice_inspector = Arc::new(TimeSliceInspector::new(&unified_read_pool_name));
ReadPool::Yatp {
pool,
running_tasks: UNIFIED_READ_POOL_RUNNING_TASKS
.with_label_values(&[&unified_read_pool_name]),
running_threads: UNIFIED_READ_POOL_RUNNING_THREADS
.with_label_values(&[&unified_read_pool_name]),
max_tasks: config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
pool_size: config.max_thread_count,
resource_ctl,
time_slice_inspector,
}
Expand Down Expand Up @@ -754,6 +791,12 @@ mod metrics {
&["name"]
)
.unwrap();
pub static ref UNIFIED_READ_POOL_RUNNING_THREADS: IntGaugeVec = register_int_gauge_vec!(
"tikv_unified_read_pool_thread_count",
"The number of running threads in the unified read pool",
&["name"]
)
.unwrap();
}
}

Expand Down Expand Up @@ -823,7 +866,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
tx1.send(()).unwrap();
Expand Down Expand Up @@ -884,7 +927,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

Expand All @@ -897,7 +940,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down Expand Up @@ -946,7 +989,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

Expand All @@ -972,7 +1015,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down

0 comments on commit 55fa2a5

Please sign in to comment.