Skip to content

add series hash in metrics ingestion, bounded streaming merge#1648

Merged
nitisht merged 7 commits into
parseablehq:mainfrom
nikhilsinhaparseable:streaming-fix
May 21, 2026
Merged

add series hash in metrics ingestion, bounded streaming merge#1648
nitisht merged 7 commits into
parseablehq:mainfrom
nikhilsinhaparseable:streaming-fix

Conversation

@nikhilsinhaparseable
Copy link
Copy Markdown
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented May 20, 2026

Summary by CodeRabbit

  • New Features

    • Deterministic per-series hashing added to OTEL metrics, producing a stable series identifier in flattened metric outputs.
    • OTEL-metrics–aware per-batch sorting by metric name to improve Parquet pruning and read performance.
  • Refactor

    • Streaming query merge redesigned for more efficient, stable multi-partition result delivery.
  • Chores

    • Updated dependency on rustc-hash to v2.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Important

Review skipped

This PR was authored by the user configured for CodeRabbit reviews. CodeRabbit does not review PRs authored by this user. It's recommended to use a dedicated user account to post CodeRabbit review feedback.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 2f999d31-1574-4fad-b0c3-07d34ab218e4

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • ✅ Review completed - (🔄 Check again to review again)

Walkthrough

Adds deterministic per-series hashing for OTEL metrics, refactors partitioned query streaming to use a bounded Tokio mpsc channel with per-partition forwarders, and conditionally sorts parquet batches for OTel metrics by metric_name then time_partition.

Changes

Metrics, Query Streaming, and Parquet Sorting

Layer / File(s) Summary
Query Result Streaming Refactor
src/query/mod.rs
Replaces futures::stream::select_all merging with a bounded Tokio mpsc channel and one spawned forwarder task per partition; wraps the receiver with ReceiverStream and RecordBatchStreamAdapter to return a single SendableRecordBatchStream. Simplifies BoxedBatchStream alias and updates imports.
OTEL Series Hash Implementation
Cargo.toml, src/otel/metrics.rs
Adds rustc-hash dependency. Implements compute_series_hash() using FxHasher to build a deterministic u64 from metric_name plus sorted label key/value pairs (excluding OTEL_METRICS_KNOWN_FIELD_LIST), inserts _series_hash into flattened data points, updates the known-field list, and adds unit tests covering stability, ordering independence, and sensitivity to label/metric changes.
Parquet Sorting for OTel Metrics
src/parseable/streams.rs
Adds ArrayRef import, conditionally advertises metric_name sorting for OTel streams, and introduces is_otel_metrics() and sort_batch_for_metric_pruning() helpers. When writing parquet part-files for OTel streams, buffers/concats batches and sorts merged RecordBatches by (metric_name ASC, time_partition DESC) when possible before writing.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • parseablehq/parseable#1391: Refactors/extends OTEL metric flattening to support protobuf; touches the same flattening code paths affected by _series_hash.
  • parseablehq/parseable#1298: Changes OTEL attribute stringification/merging, which can affect the label fields used as input to _series_hash.

Poem

🐇 I hash the metrics, neat and true,
Fx's little tumbler hums anew.
Channels rumble, tasks set free,
Batches sort in tidy glee.
A rabbit nods at structured dew.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning No pull request description was provided by the author, but the template requires at least a description section and testing/documentation checklist. Add a comprehensive description including the goal, rationale for the chosen approach, and key changes. Complete the testing and documentation checklist items.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the two main changes: adding series hash in metrics ingestion and implementing bounded streaming merge.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
src/otel/metrics.rs (1)

89-91: 💤 Low value

Consider caching the known-fields set to avoid per-call allocation.

OTEL_METRICS_KNOWN_FIELD_LIST is a compile-time constant, but a new HashSet is built on every compute_series_hash invocation. Under high ingestion throughput (many data points per second), this repeated allocation adds overhead.

♻️ Suggested optimization using `LazyLock`
+use std::sync::LazyLock;
+
+static KNOWN_FIELDS: LazyLock<std::collections::HashSet<&'static str>> = LazyLock::new(|| {
+    OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()
+});
+
 fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
-    let known: std::collections::HashSet<&str> =
-        OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
     let mut label_pairs: Vec<(&str, String)> = dp
         .iter()
-        .filter(|(k, _)| !known.contains(k.as_str()))
+        .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))
         .map(|(k, v)| {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/otel/metrics.rs` around lines 89 - 91, The function compute_series_hash
repeatedly builds a HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST
on every call; replace that per-call allocation by creating a cached,
lazily-initialized HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or
once_cell::sync::Lazy) and use the static cached set inside compute_series_hash
so the HashSet is constructed once and reused across invocations.
src/query/mod.rs (1)

322-333: ⚡ Quick win

Spawned task panics would be silently absorbed, potentially losing partition data.

The JoinHandle returned by tokio::spawn is dropped immediately, meaning if a task panics (e.g., due to an unexpected error in the stream), that partition's remaining data is silently lost with no indication to the consumer.

Consider either:

  1. Storing JoinHandles and awaiting them (complex, changes return semantics), or
  2. Using .abort_handle() and adding panic=abort logging, or
  3. At minimum, wrapping the async block body with catch_unwind to send an error through the channel before exiting.
Option: Catch panics and convert to DataFusion errors
 tokio::spawn(
     async move {
+        let result = std::panic::AssertUnwindSafe(async {
             let mut stream: SendableRecordBatchStream = Box::pin(wrapped);
             use futures::StreamExt;
             while let Some(batch) = stream.next().await {
                 if tx.send(batch).await.is_err() {
                     break;
                 }
             }
+        });
+        if let Err(e) = futures::FutureExt::catch_unwind(result).await {
+            let msg = e.downcast_ref::<&str>().map(|s| s.to_string())
+                .or_else(|| e.downcast_ref::<String>().cloned())
+                .unwrap_or_else(|| "partition task panicked".to_string());
+            let _ = tx.send(Err(datafusion::error::DataFusionError::Execution(msg))).await;
         }
     }
     .instrument(span),
 );
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: The function compute_series_hash repeatedly builds a
HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST on every call;
replace that per-call allocation by creating a cached, lazily-initialized
HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or
once_cell::sync::Lazy) and use the static cached set inside compute_series_hash
so the HashSet is constructed once and reused across invocations.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f5871794-e60a-465b-9cd2-470e7a2f13c7

📥 Commits

Reviewing files that changed from the base of the PR and between 1452b4a and 6f0eb2b.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml
  • src/otel/metrics.rs
  • src/query/mod.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 20, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/otel/metrics.rs (1)

89-91: 💤 Low value

Consider caching the known-fields HashSet to avoid per-call allocation.

OTEL_METRICS_KNOWN_FIELD_LIST is a compile-time constant, but the HashSet is rebuilt on every compute_series_hash call. For high-throughput ingestion this adds allocation overhead per data point.

♻️ Proposed optimization using `std::sync::LazyLock`
+use std::sync::LazyLock;
+use std::collections::HashSet;

+static KNOWN_FIELDS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
+    OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()
+});
+
 fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
-    let known: std::collections::HashSet<&str> =
-        OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
     let mut label_pairs: Vec<(&str, String)> = dp
         .iter()
-        .filter(|(k, _)| !known.contains(k.as_str()))
+        .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/otel/metrics.rs` around lines 89 - 91, compute_series_hash currently
rebuilds a HashSet from OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this
to use a lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or
once_cell::sync::Lazy) so the set is constructed once and reused; update
compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of
creating a local HashSet, keeping the rest of the hashing logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: compute_series_hash currently rebuilds a HashSet from
OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this to use a
lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or
once_cell::sync::Lazy) so the set is constructed once and reused; update
compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of
creating a local HashSet, keeping the rest of the hashing logic unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b498a4b0-daf0-4a9d-ad17-143d62d561e0

📥 Commits

Reviewing files that changed from the base of the PR and between 6f0eb2b and c41db80.

📒 Files selected for processing (1)
  • src/otel/metrics.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 20, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/otel/metrics.rs (1)

93-96: ⚡ Quick win

Consider caching the known-fields HashSet to avoid per-call allocation.

compute_series_hash is invoked for every data point in the ingestion hot path. Rebuilding the 37-element HashSet on each call adds unnecessary allocation overhead.

♻️ Suggested refactor using LazyLock
+use std::sync::LazyLock;
+use std::collections::HashSet;
+
+static KNOWN_FIELDS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
+    OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()
+});
+
 fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
-    let known: std::collections::HashSet<&str> =
-        OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
     let mut label_pairs: Vec<(&str, String)> = dp
         .iter()
-        .filter(|(k, _)| !known.contains(k.as_str()))
+        .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))
         .map(|(k, v)| {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/otel/metrics.rs` around lines 93 - 96, compute_series_hash currently
builds a HashSet from OTEL_METRICS_KNOWN_FIELD_LIST on every call causing
allocation overhead; instead create a single static cached HashSet (e.g., via
std::sync::LazyLock/OnceLock or once_cell::sync::Lazy) that is initialized once
and reused, then update compute_series_hash to reference that static (named
e.g., KNOWN_FIELDS_SET) rather than recreating the set each invocation so the
per-datapoint hot path avoids repeated allocations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 93-96: compute_series_hash currently builds a HashSet from
OTEL_METRICS_KNOWN_FIELD_LIST on every call causing allocation overhead; instead
create a single static cached HashSet (e.g., via std::sync::LazyLock/OnceLock or
once_cell::sync::Lazy) that is initialized once and reused, then update
compute_series_hash to reference that static (named e.g., KNOWN_FIELDS_SET)
rather than recreating the set each invocation so the per-datapoint hot path
avoids repeated allocations.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 3170432a-509f-4b7e-8848-55c6c1e9857d

📥 Commits

Reviewing files that changed from the base of the PR and between c41db80 and e77d25b.

📒 Files selected for processing (1)
  • src/otel/metrics.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 20, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/parseable/streams.rs`:
- Around line 676-682: The SortOptions for the time partition currently sets
nulls_first: false while the parquet_writer_props SortingColumn metadata
advertises nulls_first: true, causing a mismatch; update the SortOptions
construction used for time_partition (the SortColumn with values:
batch.column(time_idx)) to set nulls_first: true so it matches the
parquet_writer_props/SortingColumn metadata and keep SortColumn/SortOptions and
parquet_writer_props consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 15ca5235-135d-432e-a3c6-ac7f121f410b

📥 Commits

Reviewing files that changed from the base of the PR and between e77d25b and bc54939.

📒 Files selected for processing (1)
  • src/parseable/streams.rs

Comment thread src/parseable/streams.rs
@nitisht nitisht merged commit 138fea3 into parseablehq:main May 21, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants