Skip to content

Commit

Permalink
Revert "*: make unified-pool use FuturePool (#15925)"
Browse files Browse the repository at this point in the history
This reverts commit 7be1b17.

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Nov 22, 2023
1 parent 86d4a49 commit c6cfecb
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 86 deletions.
34 changes: 20 additions & 14 deletions components/tikv_util/src/worker/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
future::Future,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Arc, Mutex,
},
time::{Duration, Instant},
};
Expand All @@ -20,13 +20,13 @@ use futures::{
stream::StreamExt,
};
use prometheus::IntGauge;
use yatp::Remote;
use yatp::{Remote, ThreadPool};

use super::metrics::*;
use crate::{
future::{block_on_timeout, poll_future_notify},
timer::GLOBAL_TIMER_HANDLE,
yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder},
yatp_pool::{DefaultTicker, YatpPoolBuilder},
};

#[derive(PartialEq)]
Expand Down Expand Up @@ -222,7 +222,7 @@ impl<T: Display + Send + 'static> LazyWorker<T> {
}

pub fn remote(&self) -> Remote<yatp::task::future::TaskCell> {
self.worker.remote()
self.worker.remote.clone()
}
}

Expand Down Expand Up @@ -301,8 +301,11 @@ impl<S: Into<String>> Builder<S> {
let pool = YatpPoolBuilder::new(DefaultTicker::default())
.name_prefix(self.name)
.thread_count(self.thread_count, self.thread_count, self.thread_count)
.build_future_pool();
.build_single_level_pool();
let remote = pool.remote().clone();
let pool = Arc::new(Mutex::new(Some(pool)));
Worker {
remote,
stop: Arc::new(AtomicBool::new(false)),
pool,
counter: Arc::new(AtomicUsize::new(0)),
Expand All @@ -315,7 +318,8 @@ impl<S: Into<String>> Builder<S> {
/// A worker that can schedule time consuming tasks.
#[derive(Clone)]
pub struct Worker {
pool: FuturePool,
pool: Arc<Mutex<Option<ThreadPool<yatp::task::future::TaskCell>>>>,
remote: Remote<yatp::task::future::TaskCell>,
pending_capacity: usize,
counter: Arc<AtomicUsize>,
stop: Arc<AtomicBool>,
Expand Down Expand Up @@ -367,7 +371,7 @@ impl Worker {
.interval(std::time::Instant::now(), interval)
.compat();
let stop = self.stop.clone();
let _ = self.pool.spawn(async move {
self.remote.spawn(async move {
while !stop.load(Ordering::Relaxed)
&& let Some(Ok(_)) = interval.next().await
{
Expand All @@ -385,7 +389,7 @@ impl Worker {
.interval(std::time::Instant::now(), interval)
.compat();
let stop = self.stop.clone();
let _ = self.pool.spawn(async move {
self.remote.spawn(async move {
while !stop.load(Ordering::Relaxed)
&& let Some(Ok(_)) = interval.next().await
{
Expand All @@ -399,7 +403,7 @@ impl Worker {
where
F: Future<Output = ()> + Send + 'static,
{
let _ = self.pool.spawn(f);
self.remote.spawn(f);
}

fn delay_notify<T: Display + Send + 'static>(tx: UnboundedSender<Msg<T>>, timeout: Duration) {
Expand Down Expand Up @@ -434,8 +438,10 @@ impl Worker {

/// Stops the worker thread.
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
self.pool.shutdown();
if let Some(pool) = self.pool.lock().unwrap().take() {
self.stop.store(true, Ordering::Release);
pool.shutdown();
}
}

/// Checks if underlying worker can't handle task immediately.
Expand All @@ -445,7 +451,7 @@ impl Worker {
}

pub fn remote(&self) -> Remote<yatp::task::future::TaskCell> {
self.pool.remote().clone()
self.remote.clone()
}

fn start_impl<R: Runnable + 'static>(
Expand All @@ -455,7 +461,7 @@ impl Worker {
metrics_pending_task_count: IntGauge,
) {
let counter = self.counter.clone();
let _ = self.pool.spawn(async move {
self.remote.spawn(async move {
let mut handle = RunnableWrapper { inner: runner };
while let Some(msg) = receiver.next().await {
match msg {
Expand All @@ -482,7 +488,7 @@ impl Worker {
let counter = self.counter.clone();
let timeout = runner.get_interval();
Self::delay_notify(tx.clone(), timeout);
let _ = self.pool.spawn(async move {
self.remote.spawn(async move {
let mut handle = RunnableWrapper { inner: runner };
while let Some(msg) = receiver.next().await {
match msg {
Expand Down
23 changes: 5 additions & 18 deletions components/tikv_util/src/yatp_pool/future_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl FuturePool {
pool,
env,
pool_size: AtomicUsize::new(pool_size),
max_tasks: AtomicUsize::new(max_tasks),
max_tasks,
}),
}
}
Expand Down Expand Up @@ -119,32 +119,20 @@ impl FuturePool {
pub fn shutdown(&self) {
self.inner.pool.shutdown();
}

// Get a remote queue for spawning tasks without owning the thread pool.
pub fn remote(&self) -> &yatp::Remote<future::TaskCell> {
self.inner.pool.remote()
}
}

struct PoolInner {
pool: ThreadPool,
env: Env,
// for accessing pool_size config since yatp doesn't offer such getter.
pool_size: AtomicUsize,
max_tasks: AtomicUsize,
max_tasks: usize,
}

impl PoolInner {
#[inline]
fn scale_pool_size(&self, thread_count: usize) {
self.pool.scale_workers(thread_count);
let mut max_tasks = self.max_tasks.load(Ordering::Acquire);
if max_tasks != std::usize::MAX {
max_tasks = max_tasks
.saturating_div(self.pool_size.load(Ordering::Acquire))
.saturating_mul(thread_count);
self.max_tasks.store(max_tasks, Ordering::Release);
}
self.pool_size.store(thread_count, Ordering::Release);
}

Expand All @@ -160,16 +148,15 @@ impl PoolInner {
max_tasks: 100,
}));

let max_tasks = self.max_tasks.load(Ordering::Acquire);
if max_tasks == std::usize::MAX {
if self.max_tasks == std::usize::MAX {
return Ok(());
}

let current_tasks = self.get_running_task_count();
if current_tasks >= max_tasks {
if current_tasks >= self.max_tasks {
Err(Full {
current_tasks,
max_tasks,
max_tasks: self.max_tasks,
})
} else {
Ok(())
Expand Down
15 changes: 2 additions & 13 deletions components/tikv_util/src/yatp_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,15 @@ impl<T: PoolTicker> YatpPoolBuilder<T> {
FuturePool::from_pool(pool, &name, size, task)
}

fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
let (builder, runner) = self.create_builder();
builder.build_with_queue_and_runner(
yatp::queue::QueueType::SingleLevel,
yatp::pool::CloneRunnerBuilder(runner),
)
}

pub fn build_multi_level_future_pool(self) -> FuturePool {
let name = self
.name_prefix
.clone()
.unwrap_or_else(|| "yatp_pool".to_string());
let size = self.core_thread_count;
let task = self.max_tasks;
let pool = self.build_multi_level_pool();
FuturePool::from_pool(pool, &name, size, task)
}

fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
let name = self
.name_prefix
.clone()
Expand Down

0 comments on commit c6cfecb

Please sign in to comment.