Skip to content

Commit

Permalink
5117 disable per index metrics (#5125)
Browse files Browse the repository at this point in the history
- Avoiding unnecessary hashmap lookup on every single doc
- Make it possible to disable per-index metrics with an environment
variable.

By default they are enabled.
If the `QW_DISABLE_PER_INDEX_METRICS` environment variable is set to true,
then all of the index metrics are grouped under the index_id `__any__`.

In addition, this PR refactors a little bit the way we handled the
docprocessor metrics. We now cache the docprocessor counters, hence
preventing 1 hash lookup per document.

Apply suggestions from code review

Co-authored-by: Adrien Guillo <adrien@quickwit.io>
  • Loading branch information
fulmicoton and guilload committed Jun 17, 2024
1 parent 1c2ec26 commit 0555978
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 68 deletions.
12 changes: 12 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,16 @@ impl InFlightDataGauges {
}
}

/// This function returns `index_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn index_label(index_name: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED
.get_or_init(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false));
if per_index_metrics_enabled {
index_name
} else {
"__any__"
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
4 changes: 3 additions & 1 deletion quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,11 @@ impl ShardTable {
} else {
0
};
let index_label =
quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str());
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.with_label_values([index_label])
.set(num_open_shards as i64);
}

Expand Down
170 changes: 104 additions & 66 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::IntCounter;
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
Expand Down Expand Up @@ -271,20 +272,65 @@ impl From<Result<JsonSpanIterator, OtlpTracesError>> for JsonDocIterator {
}
}

#[derive(Debug)]
pub struct DocProcessorCounter {
pub num_docs: AtomicU64,
pub num_docs_metric: IntCounter,
pub num_bytes_metric: IntCounter,
}

impl Serialize for DocProcessorCounter {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_u64(self.get_num_docs())
}
}

impl DocProcessorCounter {
fn for_index_and_doc_processor_outcome(index: &str, outcome: &str) -> DocProcessorCounter {
let index_label = quickwit_common::metrics::index_label(index);
let labels = [index_label, outcome];
DocProcessorCounter {
num_docs: Default::default(),
num_docs_metric: crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values(labels),
num_bytes_metric: crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values(labels),
}
}

#[inline(always)]
fn get_num_docs(&self) -> u64 {
self.num_docs.load(Ordering::Relaxed)
}

fn record_doc(&self, num_bytes: u64) {
self.num_docs.fetch_add(1, Ordering::Relaxed);
self.num_docs_metric.inc();
self.num_bytes_metric.inc_by(num_bytes);
}
}

#[derive(Debug, Serialize)]
pub struct DocProcessorCounters {
index_id: IndexId,
source_id: SourceId,

/// Overall number of documents received, partitioned
/// into 4 categories:
/// into 5 categories:
/// - valid documents
/// - number of docs that could not be parsed.
/// - number of docs that were not valid json.
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
pub num_doc_parse_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
pub num_oltp_parse_errors: AtomicU64,
pub num_valid_docs: AtomicU64,
pub valid: DocProcessorCounter,
pub doc_mapper_errors: DocProcessorCounter,
pub transform_errors: DocProcessorCounter,
pub json_parse_errors: DocProcessorCounter,
pub otlp_parse_errors: DocProcessorCounter,

/// Number of bytes that went through the indexer
/// during its entire lifetime.
Expand All @@ -295,79 +341,70 @@ pub struct DocProcessorCounters {

impl DocProcessorCounters {
pub fn new(index_id: IndexId, source_id: SourceId) -> Self {
Self {
let valid_docs =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "valid");
let doc_mapper_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "doc_mapper_error");
let transform_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_error");
let json_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "json_parse_error");
let otlp_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "otlp_parse_error");
DocProcessorCounters {
index_id,
source_id,
num_doc_parse_errors: Default::default(),
num_transform_errors: Default::default(),
num_oltp_parse_errors: Default::default(),
num_valid_docs: Default::default(),

valid: valid_docs,
doc_mapper_errors,
transform_errors,
json_parse_errors,
otlp_parse_errors,
num_bytes_total: Default::default(),
}
}

/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs.load(Ordering::Relaxed)
+ self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.valid.get_num_docs()
+ self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

pub fn record_valid(&self, num_bytes: u64) {
self.num_valid_docs.fetch_add(1, Ordering::Relaxed);
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, "valid"])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, "valid"])
.inc_by(num_bytes);
self.valid.record_doc(num_bytes);
}

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
let label = match error {
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
match error {
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"doc_mapper_error"
self.doc_mapper_errors.record_doc(num_bytes);
}
DocProcessorError::JsonParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"json_parse_error"
self.json_parse_errors.record_doc(num_bytes);
}
DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => {
self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed);
"otlp_parse_error"
self.otlp_parse_errors.record_doc(num_bytes);
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
self.num_transform_errors.fetch_add(1, Ordering::Relaxed);
"transform_error"
self.transform_errors.record_doc(num_bytes);
}
};
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, label])
.inc();

self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, label])
.inc_by(num_bytes);
}
}

Expand Down Expand Up @@ -395,7 +432,7 @@ impl DocProcessor {
if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() {
bail!("VRL is not enabled: please recompile with the `vrl` feature")
}
let doc_processor = Self {
Ok(DocProcessor {
doc_mapper,
indexer_mailbox,
timestamp_field_opt,
Expand All @@ -406,8 +443,7 @@ impl DocProcessor {
.map(VrlProgram::try_from_transform_config)
.transpose()?,
input_format,
};
Ok(doc_processor)
})
}

// Extract a timestamp from a tantivy document.
Expand Down Expand Up @@ -650,10 +686,11 @@ mod tests {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -890,7 +927,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -969,7 +1006,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1042,7 +1079,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1117,7 +1154,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1176,10 +1213,11 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id.to_string());
assert_eq!(counters.source_id, source_id.to_string());
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -1266,10 +1304,10 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 0,);
assert_eq!(counters.transform_errors.get_num_docs(), 1,);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0,);
assert_eq!(counters.valid.get_num_docs(), 2,);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,);

let output_messages = indexer_inbox.drain_for_test();
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl DeleteTaskPlanner {
self.merge_split_downloader_mailbox.clone(),
)
.await?;
let index_label =
quickwit_common::metrics::index_label(self.index_uid.index_id.as_str());
JANITOR_METRICS
.ongoing_num_delete_operations_total
.with_label_values([&self.index_uid.index_id])
.with_label_values([index_label])
.set(self.ongoing_delete_operations_inventory.list().len() as i64);
}
}
Expand Down

0 comments on commit 0555978

Please sign in to comment.