From 770d8a702aed28f5c46866a89d15de78199b3625 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 11 May 2024 11:34:37 +0900 Subject: [PATCH] Compression cpu thread pool (#4970) * Added a thread pool for ingest decompression * Running small but cpu intensive tasks in a thread pool. This PR removes usage of spawn_blocking for cpu intensive tasks. The problem with spawn_blocking is that these tasks get scheduled on a ever growing thread pool. For instance, when the server is under load, the GZIP decompression of payloads could considerably increase the load factor of quickwit, possibly making it unresponsive to healthcheck. This PR isolates the thread pool used for the searcher, and instantiates a second generic thread pool dedicated to those short cpu-intensive tasks. * Apply suggestions from code review Co-authored-by: Adrien Guillo --------- Co-authored-by: Adrien Guillo --- quickwit/Cargo.lock | 33 ++- quickwit/Cargo.toml | 12 +- quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/lib.rs | 1 + quickwit/quickwit-common/src/metrics.rs | 49 +++- quickwit/quickwit-common/src/stream_utils.rs | 2 +- quickwit/quickwit-common/src/thread_pool.rs | 215 ++++++++++++++++++ .../quickwit-indexing/src/actors/indexer.rs | 4 +- .../src/models/indexed_split.rs | 4 +- .../src/models/processed_doc.rs | 2 +- .../src/models/raw_doc_batch.rs | 2 +- quickwit/quickwit-indexing/src/source/mod.rs | 2 +- .../quickwit-metastore/src/metastore/mod.rs | 21 +- .../quickwit-opentelemetry/src/otlp/logs.rs | 3 +- .../quickwit-opentelemetry/src/otlp/traces.rs | 3 +- quickwit/quickwit-search/src/fetch_docs.rs | 8 +- quickwit/quickwit-search/src/leaf.rs | 20 +- quickwit/quickwit-search/src/lib.rs | 10 +- quickwit/quickwit-search/src/metrics.rs | 11 +- quickwit/quickwit-search/src/root.rs | 15 +- .../quickwit-search/src/search_stream/leaf.rs | 2 +- quickwit/quickwit-search/src/thread_pool.rs | 131 ----------- quickwit/quickwit-serve/src/decompression.rs | 8 +- 23 files changed, 343 insertions(+), 216 deletions(-) create mode 100644 quickwit/quickwit-common/src/thread_pool.rs delete mode 100644 quickwit/quickwit-search/src/thread_pool.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6d96cd2a2f4..d743700dfbf 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4342,6 +4342,14 @@ dependencies = [ "loom", ] +[[package]] +name = "oneshot" +version = "0.1.6" +source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" +dependencies = [ + "loom", +] + [[package]] name = "onig" version = "6.4.0" @@ -4667,7 +4675,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "stable_deref_trait", ] @@ -5690,6 +5698,7 @@ dependencies = [ "prometheus", "proptest", "rand 0.8.5", + "rayon", "regex", "serde", "serde_json", @@ -5874,7 +5883,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot", + "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "openssl", "proptest", "prost", @@ -7998,8 +8007,8 @@ dependencies = [ [[package]] name = "tantivy" -version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +version = "0.23.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "aho-corasick", "arc-swap", @@ -8023,7 +8032,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot", + "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", "rayon", "regex", "rust-stemmers", @@ -8051,7 +8060,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "bitpacking", ] @@ -8059,7 +8068,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "downcast-rs", "fastdivide", @@ -8074,7 +8083,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "async-trait", "byteorder", @@ -8097,7 +8106,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "nom", ] @@ -8105,7 +8114,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8116,7 +8125,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "murmurhash32", "rand_distr", @@ -8126,7 +8135,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 616c19c6545..7d9dca7249b 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -187,7 +187,7 @@ pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04 quote = "1.0.23" rand = "0.8" rand_distr = "0.4" -rayon = "1" +rayon = "1.10" rdkafka = { version = "0.33", default-features = false, features = [ "cmake-build", "libz", @@ -269,16 +269,12 @@ wiremock = "0.5" zstd = "0.13.0" aws-config = "1.2" -aws-credential-types = { version = "1.2", features = [ - "hardcoded-credentials", -] } +aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] } aws-sdk-kinesis = "1.21" aws-sdk-s3 = "1.24" aws-smithy-async = "1.2" aws-smithy-runtime = "1.3" -aws-smithy-types = { version = "1.1", features = [ - "byte-stream-poll-next" -] } +aws-smithy-types = { version = "1.1", features = ["byte-stream-poll-next"] } aws-types = "1.2" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } @@ -321,7 +317,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [ +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index d6041c3b7bb..9b5609b14e1 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -30,6 +30,7 @@ pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } regex = { workspace = true } serde = { workspace = true } siphasher = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 15bcd12ab85..6de4a337c0a 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -43,6 +43,7 @@ pub mod stream_utils; pub mod temp_dir; #[cfg(any(test, feature = "testsuite"))] pub mod test_utils; +pub mod thread_pool; pub mod tower; pub mod type_map; pub mod uri; diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 323362e65ed..96b42186f17 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -179,19 +179,19 @@ pub fn new_histogram_vec( HistogramVec { underlying } } -pub struct GaugeGuard { - gauge: &'static IntGauge, +pub struct GaugeGuard<'a> { + gauge: &'a IntGauge, delta: i64, } -impl std::fmt::Debug for GaugeGuard { +impl<'a> std::fmt::Debug for GaugeGuard<'a> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl GaugeGuard { - pub fn from_gauge(gauge: &'static IntGauge) -> Self { +impl<'a> GaugeGuard<'a> { + pub fn from_gauge(gauge: &'a IntGauge) -> Self { Self { gauge, delta: 0i64 } } @@ -210,7 +210,44 @@ impl GaugeGuard { } } -impl Drop for GaugeGuard { +impl<'a> Drop for GaugeGuard<'a> { + fn drop(&mut self) { + self.gauge.sub(self.delta) + } +} + +pub struct OwnedGaugeGuard { + gauge: IntGauge, + delta: i64, +} + +impl std::fmt::Debug for OwnedGaugeGuard { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.delta.fmt(f) + } +} + +impl OwnedGaugeGuard { + pub fn from_gauge(gauge: IntGauge) -> Self { + Self { gauge, delta: 0i64 } + } + + pub fn get(&self) -> i64 { + self.delta + } + + pub fn add(&mut self, delta: i64) { + self.gauge.add(delta); + self.delta += delta; + } + + pub fn sub(&mut self, delta: i64) { + self.gauge.sub(delta); + self.delta -= delta; + } +} + +impl Drop for OwnedGaugeGuard { fn drop(&mut self) { self.gauge.sub(self.delta) } diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index edca41a8b70..d3a591099fd 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -233,7 +233,7 @@ where T: RpcName } } -pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard); +pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard<'static>); impl fmt::Debug for InFlightValue where T: fmt::Debug diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs new file mode 100644 index 00000000000..f7469c24374 --- /dev/null +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -0,0 +1,215 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt; +use std::sync::Arc; + +use futures::{Future, TryFutureExt}; +use once_cell::sync::Lazy; +use prometheus::IntGauge; +use tokio::sync::oneshot; +use tracing::error; + +use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard}; + +/// An executor backed by a thread pool to run CPU-intensive tasks. +/// +/// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its +/// thread count. +#[derive(Clone)] +pub struct ThreadPool { + thread_pool: Arc, + ongoing_tasks: IntGauge, + pending_tasks: IntGauge, +} + +impl ThreadPool { + pub fn new(name: &'static str, num_threads_opt: Option) -> ThreadPool { + let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new() + .thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}")) + .panic_handler(move |_my_panic| { + error!("task running in the quickwit {name} thread pool panicked"); + }); + if let Some(num_threads) = num_threads_opt { + rayon_pool_builder = rayon_pool_builder.num_threads(num_threads); + } + let thread_pool = rayon_pool_builder + .build() + .expect("failed to spawn the spawning pool"); + let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]); + let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]); + ThreadPool { + thread_pool: Arc::new(thread_pool), + ongoing_tasks, + pending_tasks, + } + } + + pub fn get_underlying_rayon_thread_pool(&self) -> Arc { + self.thread_pool.clone() + } + + /// Function similar to `tokio::spawn_blocking`. + /// + /// Here are two important differences however: + /// + /// 1) The task runs on a rayon thread pool managed by Quickwit. + /// This pool is specifically used only to run CPU-intensive work + /// and is configured to contain `num_cpus` cores. + /// + /// 2) Before the task is effectively scheduled, we check that + /// the spawner is still interested in its result. + /// + /// It is therefore required to `await` the result of this + /// function to get any work done. + /// + /// This is nice because it makes work that has been scheduled + /// but is not running yet "cancellable". + pub fn run_cpu_intensive( + &self, + cpu_heavy_task: F, + ) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + let ongoing_tasks = self.ongoing_tasks.clone(); + let mut pending_tasks_guard: OwnedGaugeGuard = + OwnedGaugeGuard::from_gauge(self.pending_tasks.clone()); + pending_tasks_guard.add(1i64); + let (tx, rx) = oneshot::channel(); + self.thread_pool.spawn(move || { + drop(pending_tasks_guard); + if tx.is_closed() { + return; + } + let _guard = span.enter(); + let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks); + ongoing_task_guard.add(1i64); + let result = cpu_heavy_task(); + let _ = tx.send(result); + }); + rx.map_err(|_| Panicked) + } +} + +/// Run a small (<200ms) CPU-intensive task on a dedicated thread pool with a few threads. +/// +/// When running blocking io (or side-effects in general), prefer using `tokio::spawn_blocking` +/// instead. When running long tasks or a set of tasks that you expect to take more than 33% of +/// your vCPUs, use a dedicated thread/runtime or executor instead. +/// +/// Disclaimer: The function will no be executed if the Future is dropped. +#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"] +pub fn run_cpu_intensive(cpu_heavy_task: F) -> impl Future> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + static SMALL_TASK_EXECUTOR: std::sync::OnceLock = std::sync::OnceLock::new(); + SMALL_TASK_EXECUTOR + .get_or_init(|| { + let num_threads: usize = (crate::num_cpus() / 3).max(2); + ThreadPool::new("small_tasks", Some(num_threads)) + }) + .run_cpu_intensive(cpu_heavy_task) +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Panicked; + +impl fmt::Display for Panicked { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "scheduled task panicked") + } +} + +impl std::error::Error for Panicked {} + +struct ThreadPoolMetrics { + ongoing_tasks: IntGaugeVec<1>, + pending_tasks: IntGaugeVec<1>, +} + +impl Default for ThreadPoolMetrics { + fn default() -> Self { + ThreadPoolMetrics { + ongoing_tasks: new_gauge_vec( + "ongoing_tasks", + "number of tasks being currently processed by threads in the thread pool", + "thread_pool", + &[], + ["pool"], + ), + pending_tasks: new_gauge_vec( + "pending_tasks", + "number of tasks waiting in the queue before being processed by the thread pool", + "thread_pool", + &[], + ["pool"], + ), + } + } +} + +static THREAD_POOL_METRICS: Lazy = Lazy::new(ThreadPoolMetrics::default); + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc; + use std::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_run_cpu_intensive() { + assert_eq!(run_cpu_intensive(|| 1).await, Ok(1)); + } + + #[tokio::test] + async fn test_run_cpu_intensive_panicks() { + assert!(run_cpu_intensive(|| panic!("")).await.is_err()); + } + + #[tokio::test] + async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() { + for _ in 0..100 { + assert!(run_cpu_intensive(|| panic!("")).await.is_err()); + } + } + + #[tokio::test] + async fn test_run_cpu_intensive_abort() { + let counter: Arc = Default::default(); + let mut futures = Vec::new(); + for _ in 0..1_000 { + let counter_clone = counter.clone(); + let fut = run_cpu_intensive(move || { + std::thread::sleep(Duration::from_millis(5)); + counter_clone.fetch_add(1, Ordering::SeqCst) + }); + // The first few num_cores tasks should run, but the other should get cancelled. + futures.push(tokio::time::timeout(Duration::from_millis(1), fut)); + } + futures::future::join_all(futures).await; + assert!(counter.load(Ordering::SeqCst) < 100); + } +} diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 13143b8eb85..93b8bb47061 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -361,8 +361,8 @@ struct IndexingWorkbench { // We use this value to set the `delete_opstamp` of the workbench splits. last_delete_opstamp: u64, // Number of bytes declared as used by tantivy. - memory_usage: GaugeGuard, - split_builders_guard: GaugeGuard, + memory_usage: GaugeGuard<'static>, + split_builders_guard: GaugeGuard<'static>, cooperative_indexing_period: Option, } diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 0dff364c6f3..6236728d99c 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -184,8 +184,8 @@ pub struct IndexedSplitBatchBuilder { pub publish_token_opt: Option, pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, - pub memory_usage: GaugeGuard, - pub _split_builders_guard: GaugeGuard, + pub memory_usage: GaugeGuard<'static>, + pub _split_builders_guard: GaugeGuard<'static>, } /// Sends notifications to the Publisher that the last batch of splits was emtpy. diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index cd9d1df0e24..14a75298d29 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -46,7 +46,7 @@ pub struct ProcessedDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard, + _gauge_guard: GaugeGuard<'static>, } impl ProcessedDocBatch { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f9cf0720c0b..8882c51e961 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -29,7 +29,7 @@ pub struct RawDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard, + _gauge_guard: GaugeGuard<'static>, } impl RawDocBatch { diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index f42fbd444a9..1803dc6a7f5 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -485,7 +485,7 @@ pub(super) struct BatchBuilder { num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, - gauge_guard: GaugeGuard, + gauge_guard: GaugeGuard<'static>, } impl BatchBuilder { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 8fe8c173ca5..4cb6819ab08 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,6 +31,7 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ @@ -286,7 +287,7 @@ impl IndexesMetadataResponseExt for IndexesMetadataResponse { indexes_metadata: Vec, failures: Vec, ) -> MetastoreResult { - let indexes_metadata_json_zstd = tokio::task::spawn_blocking(move || { + let indexes_metadata_json_zstd = run_cpu_intensive(move || { serde_utils::to_json_zstd(&indexes_metadata, 0).map(Bytes::from) }) .await @@ -302,14 +303,12 @@ impl IndexesMetadataResponseExt for IndexesMetadataResponse { } async fn deserialize_indexes_metadata(self) -> MetastoreResult> { - tokio::task::spawn_blocking(move || { - serde_utils::from_json_zstd(&self.indexes_metadata_json_zstd) - }) - .await - .map_err(|join_error| MetastoreError::Internal { - message: "failed to deserialize indexes metadata".to_string(), - cause: join_error.to_string(), - })? + run_cpu_intensive(move || serde_utils::from_json_zstd(&self.indexes_metadata_json_zstd)) + .await + .map_err(|join_error| MetastoreError::Internal { + message: "failed to deserialize indexes metadata".to_string(), + cause: join_error.to_string(), + })? } } @@ -338,7 +337,7 @@ impl ListIndexesMetadataResponseExt for ListIndexesMetadataResponse { async fn try_from_indexes_metadata( indexes_metadata: Vec, ) -> MetastoreResult { - let indexes_metadata_json_zstd = tokio::task::spawn_blocking(move || { + let indexes_metadata_json_zstd = run_cpu_intensive(move || { serde_utils::to_json_zstd(&indexes_metadata, 0).map(Bytes::from) }) .await @@ -354,7 +353,7 @@ impl ListIndexesMetadataResponseExt for ListIndexesMetadataResponse { } async fn deserialize_indexes_metadata(self) -> MetastoreResult> { - tokio::task::spawn_blocking(move || { + run_cpu_intensive(move || { if let Some(indexes_metadata_json) = &self.indexes_metadata_json_opt { return serde_utils::from_json_str(indexes_metadata_json); }; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index fdfa1a58b38..f803bcca1ad 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -23,6 +23,7 @@ use std::collections::{btree_set, BTreeSet, HashMap}; use async_trait::async_trait; use prost::Message; use quickwit_common::rate_limited_error; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -262,7 +263,7 @@ impl OtlpGrpcLogsService { num_log_records, num_parse_errors, error_message, - } = tokio::task::spawn_blocking({ + } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); || Self::parse_logs(request, parent_span, index_id) }) diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index 40e211697b8..f4ed43fd0b0 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use async_trait::async_trait; use prost::Message; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -711,7 +712,7 @@ impl OtlpGrpcTracesService { num_spans, num_parse_errors, error_message, - } = tokio::task::spawn_blocking({ + } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); || Self::parse_spans(request, parent_span, index_id) }) diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index a3b4cc166b3..ed6da6347aa 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -36,7 +36,6 @@ use tracing::{error, Instrument}; use crate::leaf::open_index_with_caches; use crate::service::SearcherContext; -use crate::thread_pool::search_executor; use crate::{convert_document_to_json_string, GlobalDocAddress}; const SNIPPET_MAX_NUM_CHARS: usize = 150; @@ -185,9 +184,10 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - index - .set_shared_multithread_executor(search_executor()) - .context("failed to set search pool")?; + let tantivy_executor = crate::search_thread_pool() + .get_underlying_rayon_thread_pool() + .into(); + index.set_executor(tantivy_executor); let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index f839e206bb5..a983d3e9015 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -378,14 +378,15 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::run_cpu_intensive(move || { - let _span_guard = span.enter(); - searcher.search(&query, &quickwit_collector) - }) - .await - .map_err(|_| { - crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })??; + let leaf_search_response = crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + searcher.search(&query, &quickwit_collector) + }) + .await + .map_err(|_| { + crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) + })??; searcher_context .leaf_search_cache @@ -921,7 +922,8 @@ pub async fn leaf_search( } } - crate::run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) + crate::search_thread_pool() + .run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) .instrument(info_span!("incremental_merge_finalize")) .await .context("failed to merge split search responses")? diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 11618ac603e..c8f02a391c2 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -41,7 +41,6 @@ mod search_job_placer; mod search_response_rest; mod search_stream; mod service; -mod thread_pool; pub(crate) mod top_k_collector; mod metrics; @@ -51,6 +50,7 @@ mod tests; pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; +use quickwit_common::thread_pool::ThreadPool; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::{ @@ -62,7 +62,7 @@ use tantivy::schema::NamedFieldDocument; pub type Result = std::result::Result; use std::net::{Ipv4Addr, SocketAddr}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; pub use find_trace_ids_collector::FindTraceIdsCollector; use quickwit_config::SearcherConfig; @@ -92,11 +92,15 @@ pub use crate::search_job_placer::{Job, SearchJobPlacer}; pub use crate::search_response_rest::SearchResponseRest; pub use crate::search_stream::root_search_stream; pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; -use crate::thread_pool::run_cpu_intensive; /// A pool of searcher clients identified by their gRPC socket address. pub type SearcherPool = Pool; +fn search_thread_pool() -> &'static ThreadPool { + static SEARCH_THREAD_POOL: OnceLock = OnceLock::new(); + SEARCH_THREAD_POOL.get_or_init(|| ThreadPool::new("search", None)) +} + /// GlobalDocAddress serves as a hit address. #[derive(Clone, Eq, Debug, PartialEq, Hash, Ord, PartialOrd)] pub struct GlobalDocAddress { diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index b1d89abff57..47e06129d61 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -20,14 +20,11 @@ // See https://prometheus.io/docs/practices/naming/ use once_cell::sync::Lazy; -use quickwit_common::metrics::{ - new_counter, new_gauge, new_histogram, Histogram, IntCounter, IntGauge, -}; +use quickwit_common::metrics::{new_counter, new_histogram, Histogram, IntCounter}; pub struct SearchMetrics { pub leaf_searches_splits_total: IntCounter, pub leaf_search_split_duration_secs: Histogram, - pub active_search_threads_count: IntGauge, } impl Default for SearchMetrics { @@ -44,12 +41,6 @@ impl Default for SearchMetrics { starts after the semaphore is obtained.", "search", ), - active_search_threads_count: new_gauge( - "active_search_threads_count", - "Number of threads in use in the CPU thread pool", - "search", - &[], - ), } } } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 9ae0e02aab3..b5c8a2c1e27 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -667,13 +667,14 @@ pub(crate) async fn search_partial_hits_phase( let leaf_search_responses: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); - let leaf_search_response = crate::run_cpu_intensive(move || { - let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) - }) - .await - .context("failed to merge leaf search responses")? - .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?; + let leaf_search_response = crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + merge_collector.merge_fruits(leaf_search_responses) + }) + .await + .context("failed to merge leaf search responses")? + .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?; debug!( num_hits = leaf_search_response.num_hits, failed_splits = ?leaf_search_response.failed_splits, diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 53c3021652c..cca351ef4e2 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -196,7 +196,7 @@ async fn leaf_search_stream_single_split( let _ = span.enter(); let m_request_fields = request_fields.clone(); - let collect_handle = crate::run_cpu_intensive(move || { + let collect_handle = crate::search_thread_pool().run_cpu_intensive(move || { let mut buffer = Vec::new(); match m_request_fields.fast_field_types() { (Type::I64, None) => { diff --git a/quickwit/quickwit-search/src/thread_pool.rs b/quickwit/quickwit-search/src/thread_pool.rs deleted file mode 100644 index 3f4f9aba2b0..00000000000 --- a/quickwit/quickwit-search/src/thread_pool.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::fmt; -use std::sync::Arc; - -use once_cell::sync::OnceCell; -use quickwit_common::metrics::GaugeGuard; -use tantivy::Executor; -use tracing::error; - -static SEARCH_THREAD_POOL: OnceCell> = OnceCell::new(); - -fn build_executor() -> Arc { - let rayon_pool = rayon::ThreadPoolBuilder::new() - .thread_name(|thread_id| format!("quickwit-search-{thread_id}")) - .panic_handler(|_my_panic| { - error!("task running in the quickwit search pool panicked"); - }) - .build() - .expect("Failed to spawn the spawning pool"); - Arc::new(Executor::ThreadPool(rayon_pool)) -} - -pub(crate) fn search_executor() -> Arc { - SEARCH_THREAD_POOL.get_or_init(build_executor).clone() -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct Panicked; - -impl fmt::Display for Panicked { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Scheduled job panicked") - } -} - -impl std::error::Error for Panicked {} - -/// Function similar to `tokio::spawn_blocking`. -/// -/// Here are two important differences however: -/// -/// 1) The task is running on a rayon thread pool managed by quickwit. -/// This pool is specifically used only to run CPU intensive work -/// and is configured to contain `num_cpus` cores. -/// -/// 2) Before the task is effectively scheduled, we check that -/// the spawner is still interested by its result. -/// -/// It is therefore required to `await` the result of this -/// function to get anywork done. -/// -/// This is nice, because it makes work that has been scheduled -/// but is not running yet "cancellable". -pub async fn run_cpu_intensive(cpu_heavy_task: F) -> Result -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let span = tracing::Span::current(); - search_executor() - .spawn_blocking(move || { - let _guard = span.enter(); - let mut active_thread_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count); - active_thread_guard.add(1i64); - cpu_heavy_task() - }) - .await - .map_err(|_| Panicked) -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicU64, Ordering}; - use std::sync::Arc; - use std::time::Duration; - - use super::*; - - #[tokio::test] - async fn test_run_cpu_intensive() { - assert_eq!(run_cpu_intensive(|| 1).await, Ok(1)); - } - - #[tokio::test] - async fn test_run_cpu_intensive_panicks() { - assert!(run_cpu_intensive(|| panic!("")).await.is_err()); - } - - #[tokio::test] - async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() { - for _ in 0..100 { - assert!(run_cpu_intensive(|| panic!("")).await.is_err()); - } - } - - #[tokio::test] - async fn test_run_cpu_intensive_abort() { - let counter: Arc = Default::default(); - let mut futures = Vec::new(); - for _ in 0..1_000 { - let counter_clone = counter.clone(); - let fut = run_cpu_intensive(move || { - std::thread::sleep(Duration::from_millis(5)); - counter_clone.fetch_add(1, Ordering::SeqCst) - }); - // The first few num_cores tasks should run, but the other should get cancelled. - futures.push(tokio::time::timeout(Duration::from_millis(1), fut)); - } - futures::future::join_all(futures).await; - assert!(counter.load(Ordering::SeqCst) < 100); - } -} diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index 37e68074949..7e6e2946b37 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -22,8 +22,8 @@ use std::io::Read; use bytes::Bytes; use flate2::read::GzDecoder; use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::thread_pool::run_cpu_intensive; use thiserror::Error; -use tokio::task; use warp::reject::Reject; use warp::Filter; @@ -37,7 +37,7 @@ use warp::Filter; async fn decompress_body(encoding: Option, body: Bytes) -> Result { match encoding.as_deref() { Some("gzip" | "x-gzip") => { - let decompressed = task::spawn_blocking(move || { + let decompressed = run_cpu_intensive(move || { let mut decompressed = Vec::new(); let mut decoder = GzDecoder::new(body.as_ref()); decoder @@ -50,7 +50,7 @@ async fn decompress_body(encoding: Option, body: Bytes) -> Result { - let decompressed = task::spawn_blocking(move || { + let decompressed = run_cpu_intensive(move || { zstd::decode_all(body.as_ref()) .map(Bytes::from) .map_err(|_| warp::reject::custom(CorruptedData)) @@ -89,7 +89,7 @@ pub(crate) fn get_body_bytes() -> impl Filter, } impl From for Body {