Skip to content

fix: Surface ingestion errors#1652

Merged
nitisht merged 3 commits into
parseablehq:mainfrom
parmesant:surface-ingestion-error
May 27, 2026
Merged

fix: Surface ingestion errors#1652
nitisht merged 3 commits into
parseablehq:mainfrom
parmesant:surface-ingestion-error

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented May 26, 2026

  • DiskWriter and MemWriter expect and unwrap replaced
  • New cli env var P_DATAFUSION_TARGET_PARTITIONS for controlling number of partitions (default num cpu / 2)
  • Streaming response uses unbounded channel now

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • CLI option to configure target partitions for query parallelism.
  • Improvements

    • Staging pipeline made fully fallible: in-memory writers and related operations now return and propagate errors instead of panicking.
    • Streaming execution switched to an unbounded producer stream for more resilient streaming behavior.
    • Query/session tuning for Parquet filtering and execution batch sizing.
    • Staging cleared only when an event is produced.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: fc5547a2-4a95-4457-988d-46d77021023b

📥 Commits

Reviewing files that changed from the base of the PR and between 30f3583 and bdbd8ce.

📒 Files selected for processing (1)
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/streams.rs

Walkthrough

Refactors staging writer and Stream to return Results, adds a CLI target_partitions option and applies it to DataFusion session config, switches streaming fan-in to unbounded channels with non-awaiting sends, and adjusts ParquetSource scan and staging error mapping.

Changes

Streaming error propagation and query optimization

Layer / File(s) Summary
CLI target_partitions configuration
src/cli.rs
New target_partitions field on Options with environment variable binding and CPU-count-derived default.
Staging writer error handling refactor
src/parseable/staging/writer.rs
MemWriter::push, recordbatch_cloned, and concat_records converted to return Result to propagate schema merge and concat errors.
Stream fallible APIs, DiskWriter init, and flush adaptation
src/parseable/streams.rs
Stream::push uses DiskWriter::try_new(...) ? and propagates guard.mem.push(...) ?; Stream::recordbatches_cloned, Stream::clear, and Stream::flush now return Result and map lock poison into StagingError; call sites and tests updated.
Query execution config and streaming fan-in
src/query/mod.rs
Sets DataFusion target_partitions, updates Parquet execution options, replaces bounded mpsc::channel + ReceiverStream with unbounded_channel + UnboundedReceiverStream, and uses non-awaiting tx.send(...) in producers.
ParquetSource and staging error mapping
src/query/stream_schema_provider.rs
ParquetSource enables pushdown and reordering in both branches, uses PARSEABLE.options.execution_batch_size for scan batch size, and maps staging recordbatches_cloned errors into DataFusionError::Internal.
Airplane handler staging clear call
src/handlers/airplane.rs
Calls clear() inside the event.is_some() branch and explicitly ignores its Result via let _ = ....

Sequence Diagram(s)

sequenceDiagram
  participant ProducerTask as ProducerTask
  participant UnboundedSender as UnboundedSender
  participant UnboundedReceiverStream as UnboundedReceiverStream
  participant RecordBatchAdapter as RecordBatchStreamAdapter
  participant QueryExecutor as QueryExecutor

  ProducerTask->>UnboundedSender: tx.send(batch)
  UnboundedSender-->>UnboundedReceiverStream: deliver batch
  UnboundedReceiverStream->>RecordBatchAdapter: stream next batch
  RecordBatchAdapter->>QueryExecutor: supply RecordBatch
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

🐰 I stitched Results where panics used to play,
Unbounded streams now carry batches away.
CPUs decide partitions, tidy and neat,
Parquet filters dance to a reordering beat.
Hop, hop — the rabbit approves this feat!

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description lists key changes but lacks essential detail: no issue number in Fixes section, missing rationale for changes, no explanation of why unbounded channels are needed, and testing/documentation checklist items remain unchecked. Provide the actual issue number in 'Fixes #', explain the rationale for each change (why surface errors, why unbounded channels), and update checklist items or explain why they're incomplete.
Docstring Coverage ⚠️ Warning Docstring coverage is 62.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: Surface ingestion errors' directly relates to the main objective of replacing expect/unwrap calls with proper error handling in DiskWriter and MemWriter to surface errors instead of panicking.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/query/mod.rs (1)

325-349: ⚖️ Poor tradeoff

Unbounded channel can grow without limit under slow consumers.

Switching from a bounded channel to unbounded_channel removes backpressure. If the downstream consumer (e.g., HTTP response writer) stalls, batches will accumulate in memory until the DataFusion memory pool limit is hit. The existing memory pool provides some protection, but this could lead to OOM in edge cases where the pool check doesn't account for channel-buffered batches.

Consider whether the original bounded channel caused specific deadlock issues, or if a larger but still bounded buffer (e.g., channel(1024)) would provide sufficient throughput while maintaining backpressure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/query/mod.rs` around lines 325 - 349, The unbounded mpsc channel (tx/rx
with UnboundedReceiverStream) can let batches accumulate and OOM; replace it
with a bounded tokio::sync::mpsc::channel with a configured capacity (e.g.,
1024) to restore backpressure, change the spawned tasks to use the bounded
Sender clone and await tx.send(batch).await (or handle TrySendError
appropriately) and replace UnboundedReceiverStream::new(rx) with the appropriate
bounded receiver wrapper (e.g., ReceiverStream::new(rx) /
tokio_stream::wrappers::ReceiverStream) so sends await when the consumer is
slow; keep using PartitionedMetricMonitor::new(...) and drop(tx) after spawning
to close the channel when all producers finish.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/query/mod.rs`:
- Around line 325-349: The unbounded mpsc channel (tx/rx with
UnboundedReceiverStream) can let batches accumulate and OOM; replace it with a
bounded tokio::sync::mpsc::channel with a configured capacity (e.g., 1024) to
restore backpressure, change the spawned tasks to use the bounded Sender clone
and await tx.send(batch).await (or handle TrySendError appropriately) and
replace UnboundedReceiverStream::new(rx) with the appropriate bounded receiver
wrapper (e.g., ReceiverStream::new(rx) / tokio_stream::wrappers::ReceiverStream)
so sends await when the consumer is slow; keep using
PartitionedMetricMonitor::new(...) and drop(tx) after spawning to close the
channel when all producers finish.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e917816f-8ecc-4a9e-9ae8-6f3bec081236

📥 Commits

Reviewing files that changed from the base of the PR and between 138fea3 and 2e182cd.

📒 Files selected for processing (5)
  • src/cli.rs
  • src/parseable/staging/writer.rs
  • src/parseable/streams.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs

- DiskWriter and MemWriter expect and unwrap replaced
- New cli env var `P_DATAFUSION_TARGET_PARTITIONS` for controlling number of partitions (default num cpu / 2)
- Streaming response uses unbounded channel now
@parmesant parmesant force-pushed the surface-ingestion-error branch from 2e182cd to 535bf40 Compare May 26, 2026 16:48
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/parseable/streams.rs`:
- Around line 534-536: The recordbatches_cloned method currently calls
self.writer.lock().unwrap() which will panic on a poisoned lock; change it to
handle PoisonError like the push method does by using lock().map_err(|e|
StagingError::from_lock_poison(e)) (or the same conversion used in push) and
propagate a StagingError instead of unwrapping, then call
mem.recordbatch_cloned(schema) on the guarded value; reference:
recordbatches_cloned, push, writer.lock().unwrap(), mem.recordbatch_cloned and
StagingError to ensure consistent lock-error handling.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 92f7934e-4704-49d7-b206-d93549f5a259

📥 Commits

Reviewing files that changed from the base of the PR and between 2e182cd and 535bf40.

📒 Files selected for processing (5)
  • src/cli.rs
  • src/parseable/staging/writer.rs
  • src/parseable/streams.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs

Comment thread src/parseable/streams.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/handlers/airplane.rs (1)

236-239: ⚡ Quick win

Log errors from staging clear instead of silently ignoring them.

The clear() method now returns Result<(), StagingError>, but this code ignores failures via let _ = .... If clearing staging fails (e.g., due to lock poisoning), temporary events remain in memory without any visibility, potentially causing memory growth or stale data to affect subsequent queries.

📝 Proposed fix to log clear errors
     if event.is_some() {
         // Clear staging of stream once airplane has taxied
-        let _ = PARSEABLE.get_or_create_stream(&stream_name, &None).clear();
+        if let Err(e) = PARSEABLE.get_or_create_stream(&stream_name, &None).clear() {
+            error!("Failed to clear staging for stream {}: {}", stream_name, e);
+        }
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/handlers/airplane.rs` around lines 236 - 239, The code currently ignores
errors from PARSEABLE.get_or_create_stream(&stream_name, &None).clear() when
event.is_some(), which can hide failures; change this to handle the Result by
matching or using .map_err/.unwrap_or_else to log any StagingError via the
existing logger (or create one) instead of discarding it—locate the clear() call
inside the conditional near event.is_some() and replace the let _ = ... with
code that captures the Result and calls something like logger.error or
tracing::error with context including stream_name and the error so clear
failures are visible.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/handlers/airplane.rs`:
- Around line 236-239: The code currently ignores errors from
PARSEABLE.get_or_create_stream(&stream_name, &None).clear() when
event.is_some(), which can hide failures; change this to handle the Result by
matching or using .map_err/.unwrap_or_else to log any StagingError via the
existing logger (or create one) instead of discarding it—locate the clear() call
inside the conditional near event.is_some() and replace the let _ = ... with
code that captures the Result and calls something like logger.error or
tracing::error with context including stream_name and the error so clear
failures are visible.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f808ab35-78e6-4d8b-9365-e6dfc270eac7

📥 Commits

Reviewing files that changed from the base of the PR and between 535bf40 and 30f3583.

📒 Files selected for processing (2)
  • src/handlers/airplane.rs
  • src/parseable/streams.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 26, 2026
@nitisht nitisht merged commit a3949b0 into parseablehq:main May 27, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants