Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Adrien Guillo <adrien@quickwit.io>
  • Loading branch information
fulmicoton and guilload committed May 10, 2024
1 parent 006eb73 commit f9f2c88
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
24 changes: 12 additions & 12 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::error;

use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec};

/// An executor backed by a thread pool to run CPU intensive tasks.
/// An executor backed by a thread pool to run CPU-intensive tasks.
///
/// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its
/// thread count.
Expand All @@ -43,14 +43,14 @@ impl ThreadPool {
let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new()
.thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}"))
.panic_handler(|_my_panic| {
error!("task running in the quickwit search pool panicked");
error!("task running in the quickwit {} thread pool panicked", name);
});
if let Some(num_threads) = num_threads_opt {
rayon_pool_builder = rayon_pool_builder.num_threads(num_threads);
}
let thread_pool = rayon_pool_builder
.build()
.expect("Failed to spawn the spawning pool");
.expect("failed to spawn thread pool");
let active_threads_gauge = ACTIVE_THREAD_COUNT.with_label_values([name]);
ThreadPool {
thread_pool: Arc::new(thread_pool),
Expand All @@ -66,17 +66,17 @@ impl ThreadPool {
///
/// Here are two important differences however:
///
/// 1) The task is running on a rayon thread pool managed by quickwit.
/// This pool is specifically used only to run CPU intensive work
/// 1) The task runs on a rayon thread pool managed by Quickwit.
/// This pool is specifically used only to run CPU-intensive work
/// and is configured to contain `num_cpus` cores.
///
/// 2) Before the task is effectively scheduled, we check that
/// the spawner is still interested by its result.
/// the spawner is still interested in its result.
///
/// It is therefore required to `await` the result of this
/// function to get anywork done.
/// function to get any work done.
///
/// This is nice, because it makes work that has been scheduled
/// This is nice because it makes work that has been scheduled
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
Expand All @@ -103,10 +103,10 @@ impl ThreadPool {
}
}

/// Run a small (<200ms) CPU intensive task on a dedicated thread pool with a few threads.
/// Run a small (<200ms) CPU-intensive task on a dedicated thread pool with a few threads.
///
/// When running blocking io (or side-effects in general), prefer use `tokio::spawn_blocking`
/// instead. When running long tasks or a set of tasks that you expect should take more than 33% of
/// When running blocking io (or side-effects in general), prefer using `tokio::spawn_blocking`
/// instead. When running long tasks or a set of tasks that you expect to take more than 33% of
/// your vCPUs, use a dedicated thread/runtime or executor instead.
///
/// Disclaimer: The function will no be executed if the Future is dropped.
Expand All @@ -130,7 +130,7 @@ pub struct Panicked;

impl fmt::Display for Panicked {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Scheduled job panicked")
write!(f, "scheduled task panicked")
}
}

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ async fn fetch_docs_in_split(
.context("open-index-for-split")?;
// we add an executor here, we could add it in open_index_with_caches, though we should verify
// the side-effect before
let executor_tantivy = crate::search_thread_pool()
let tantivy_executor = crate::search_thread_pool()
.get_underlying_rayon_thread_pool()
.into();
index.set_executor(executor_tantivy);
index.set_executor(tantivy_executor);
let index_reader = index
.reader_builder()
// the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};
pub type SearcherPool = Pool<SocketAddr, SearchServiceClient>;

fn search_thread_pool() -> &'static ThreadPool {
static SEARCH_EXECUTOR: OnceLock<ThreadPool> = OnceLock::new();
SEARCH_EXECUTOR.get_or_init(|| ThreadPool::new("search", None))
static SEARCH_THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
SEARCH_THREAD_POOL.get_or_init(|| ThreadPool::new("search", None))
}

/// GlobalDocAddress serves as a hit address.
Expand Down

0 comments on commit f9f2c88

Please sign in to comment.