Skip to content

Conversation

@ankitsheoran1
Copy link

@ankitsheoran1 ankitsheoran1 commented Nov 23, 2025

Fixes #452

Description

Blocking I/O moved to the tokio spawn_blocking so it would not block the worker thread and move away the mem write from write path

To improve performance of ingest flow

May be other possible solutions can be various we can discuss like may be introduce a different lock for memtable and disk so we would have minimum contention, or introduce a channel so rather than writting directly to disk first push to channel and consumer would consume and write it to disk.
But here in this change we just move both write to memtable and disk to spawn_blocking so worker thread would not be blocked, and take memtable write from core ingest path.
Note - for now not changed anything in memtable concats, need explore that more

Major change is in src/parseable/streams.rs all other changes are just to support it, now disk write and memtable write are moved to tokio::spawn_blocking and we are waiting for write on memtable anymore.


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor

    • Event processing and stream push flows converted to async so processing yields correctly; disk persistence is awaited for durability while in-memory updates run in background tasks.
    • Internal call sites updated to await new async flows.
  • Tests

    • I/O and push-related tests updated to match async behavior.
  • Chores

    • Public method signatures changed to async—callers may need to update call sites to await these methods.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 23, 2025

Walkthrough

Converts several synchronous processing and push paths to async: Event::process/process_unchecked become async; Stream::push becomes async with an Arc<Self> receiver; multiple call sites updated to .await?; disk writes moved to blocking tasks and memtable pushes run asynchronously.

Changes

Cohort / File(s) Summary
Event processing methods
src/event/mod.rs
pub fn process(self)pub async fn process(self) and pub fn process_unchecked(&self)pub async fn process_unchecked(&self); call sites updated to await processing and push behavior preserved.
Stream storage layer
src/parseable/streams.rs
pub fn push(...)pub async fn push(self: &Arc<Self>, ...); disk writes dispatched to blocking thread pool and awaited; memtable pushes run fire-and-forget on blocking pool; tests adjusted to async runtime.
HTTP handlers / ingest utils
src/handlers/http/ingest.rs, src/handlers/http/modal/utils/ingest_utils.rs
Internal calls updated from .process()? / .process_unchecked()? to .process().await? / .process_unchecked().await?.
Connectors & storage call sites
src/connectors/kafka/processor.rs, src/storage/field_stats.rs
Replaced synchronous ...process()? chains with ...process().await?, adding awaits into async control flow.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client
    participant Handler
    participant Event
    participant Stream
    participant BlockPool as BlockingPool
    participant Disk

    Client->>Handler: ingest request
    Handler->>Event: Event::process().await?
    Event->>Stream: Stream::push(Arc<Self>, ...).await
    Stream->>BlockPool: spawn_blocking -> disk write (await)
    BlockPool->>Disk: write data
    Disk-->>BlockPool: write result
    BlockPool-->>Stream: disk write complete
    Stream->>BlockPool: spawn_blocking -> memtable push (fire-and-forget)
    Note right of BlockPool: memtable push continues in background (no await)
    Stream-->>Event: push/durable result
    Event-->>Handler: processing complete
    Handler-->>Client: response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Pay extra attention to:
    • src/parseable/streams.rs: receiver changed to Arc<Self>, spawn_blocking usage, DiskWriter lifecycle, lock poisoning handling, and error propagation.
    • src/event/mod.rs and all call sites: API signature changes to async—ensure no remaining sync callers and that ordering/durability expectations are preserved.
    • Tests updated for async runtime: verify determinism and proper awaiting of background tasks.

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • de-sh
  • nikhilsinhaparseable
  • parmesant

Poem

🐇 I hopped through bytes with eager paws,
Awaited writes and mended flaws.
Disk hummed low in blocking light,
Memtable danced off into night,
A rabbit cheers the async cause!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'optimize performance of ingest path' is concise and directly aligns with the main objective of moving blocking I/O to tokio::spawn_blocking to improve ingest flow performance.
Description check ✅ Passed The PR description identifies the core changes (issue #452, spawn_blocking usage) and acknowledges the ingest path is the focus. However, the checklist items for testing, comments, and documentation remain unchecked.
Linked Issues check ✅ Passed The PR addresses issue #452 requirements: disk.push is now async, memtable.push is deferred via spawn_blocking, and the ingest path no longer waits for memtable writes. Memtable concat behavior remains unchanged as noted.
Out of Scope Changes check ✅ Passed All changes are scoped to supporting the spawn_blocking refactoring: Stream::push conversion to async, updates to call sites in processors and handlers, and test updates. No unrelated modifications detected.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

github-actions bot commented Nov 23, 2025

CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/parseable/streams.rs (1)

1378-1400: Nested Tokio runtime in write_log will cause panic in async tests

The issue is confirmed. write_log (line 1378) creates a new tokio::runtime::Runtime and calls block_on, which works in the two #[test] tests (lines 1432, 1481) but will panic in the #[tokio::test] async test miss_current_arrow_file_when_converting_to_parquet (lines 1532, 1536) because that test already runs inside a Tokio runtime.

The recommended fix—making write_log async and only creating a runtime in sync tests—is the correct pattern and should be applied:

  • Rename write_log to write_log_async and make it async fn, removing the internal Runtime::new() and block_on wrapper
  • In the two #[test] functions, wrap the loop in let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(...)
  • In #[tokio::test] miss_current_arrow_file_when_converting_to_parquet, call the async helper directly with .await
src/event/mod.rs (1)

59-101: Inline comment at line 76 contradicts actual Stream::push durability behavior; Event::process lacks doc comment explaining guarantees to callers

The review comment's concern is confirmed as a critical issue. The code has a direct contradiction:

  • Line 76 comment (src/event/mod.rs): "memtable push is awaited, disk write is fire-and-forget"
  • Stream::push doc comment (src/parseable/streams.rs, lines 137-141): "Disk write is awaited to ensure durability, while memtable push is fire-and-forget"

These are opposite statements. The Stream::push documentation appears authoritative (it explains the design rationale: "Disk write is awaited to ensure durability"; "If memtable push fails, data is still on disk (safe)").

The impact is significant: The Kafka processor treats Event::process().await? as the success boundary for committing offsets (src/connectors/kafka/processor.rs). An incorrect inline comment combined with missing doc comments on Event::process means callers cannot confidently determine what durability is actually guaranteed.

Fix required:

  1. Correct the inline comment at line 76 to match Stream::push's documented behavior
  2. Add a doc comment to Event::process explaining: when it returns Ok(()), data is guaranteed on disk but may not yet be in memtable (fire-and-forget)
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

137-245: Async Stream::push design (disk awaited, memtable fire‑and‑forget) looks sound, with minor caveats

The restructuring of push to:

  • offload disk I/O to spawn_blocking and await completion, and
  • offload memtable concatenation to a fire‑and‑forget spawn_blocking task

is consistent with the PR goal: durability still gates success (disk write must finish), while CPU‑bound memtable ops no longer block the async ingest path. Locking via writer.lock() inside each blocking closure matches existing patterns (e.g., flush) and avoids data races on Writer.

Two minor points to keep in mind:

  1. Mode/stream‑type interaction
    The condition if options_mode != Mode::Query || stream_type == StreamType::Internal causes user‑defined streams in Mode::Query to skip disk writes entirely but still perform memtable pushes. If ingestion for user streams is truly unreachable in Mode::Query (as other code suggests), this is fine but slightly surprising; otherwise, it would mean non‑durable writes in that mode. Worth a quick sanity check.

  2. Per‑event spawn_blocking overhead
    Under very high ingest rates, per‑event spawn_blocking for disk and memtable can add overhead and contention on the blocking pool. If this becomes a hotspot, you might consider batching pushes or sharing a per‑stream worker for disk writes in a follow‑up refactor.

Functionally, though, the new push implementation is coherent and preserves the intended durability semantics.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7adda55 and 5d28bc6.

📒 Files selected for processing (6)
  • src/connectors/kafka/processor.rs (1 hunks)
  • src/event/mod.rs (3 hunks)
  • src/handlers/http/ingest.rs (2 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (1 hunks)
  • src/parseable/streams.rs (2 hunks)
  • src/storage/field_stats.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-08-18T17:59:31.642Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/modal/utils/ingest_utils.rs:149-156
Timestamp: 2025-08-18T17:59:31.642Z
Learning: The time_partition parameter in push_logs() function in src/handlers/http/modal/utils/ingest_utils.rs is determined by the caller (flatten_and_push_logs). OSS callers pass None, enterprise callers pass the appropriate value (None or Some<>), and OTEL callers always pass None. The push_logs() function should not add additional time partition logic since it's already handled at the caller level.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/handlers/http/ingest.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/streams.rs
  • src/event/mod.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/storage/field_stats.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/event/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/event/mod.rs
🧬 Code graph analysis (2)
src/parseable/streams.rs (1)
src/parseable/staging/writer.rs (3)
  • push (138-148)
  • push (182-206)
  • try_new (58-73)
src/event/mod.rs (1)
src/connectors/kafka/processor.rs (1)
  • process (102-110)
🔇 Additional comments (6)
src/handlers/http/modal/utils/ingest_utils.rs (1)

158-174: Async process().await? wiring in push_logs looks correct

Awaiting Event::process() here aligns with the new async event API and preserves error propagation, without altering time-partition behavior (still caller‑controlled as intended).

src/storage/field_stats.rs (1)

145-167: Awaiting async process() for field‑stats events is consistent and safe

The switch to .process().await? correctly matches the async Event API and ensures stats events are durably processed before calculate_field_stats returns, while still omitting time partitions as expected for stats.

src/handlers/http/ingest.rs (2)

141-155: Internal ingest now correctly awaits async process()

Updating to .process().await? here matches the async Event API and maintains existing error‑handling behavior for internal streams (still fails fast on processing errors).


404-422: push_logs_unchecked correctly awaits async process_unchecked()

Awaiting process_unchecked().await? keeps the unchecked path behavior (no schema validation, but still durable writes) while aligning with the async Event API; callers still receive the original Event after processing.

src/connectors/kafka/processor.rs (1)

100-110: Kafka processor correctly awaits async event processing before commit

Chaining self.build_event_from_chunk(&records).await?.process().await?; ensures the batch is fully processed (including async push) before the surrounding code reaches offset commit, preserving at‑least‑once semantics.

src/event/mod.rs (1)

103-116: Async conversion of process_unchecked is consistent with process

Making process_unchecked async and awaiting Stream::push mirrors the main process path and keeps error propagation via EventError intact. Given this function intentionally skips schema/metrics work, the behavior change is minimal and localized to the new async push, which looks appropriate.

coderabbitai[bot]
coderabbitai bot previously approved these changes Nov 23, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

150-166: Reduce RecordBatch cloning overhead.

record_clone is created at line 151 and then cloned again at line 165 (record_for_disk). RecordBatch uses Arc internally, so cloning is cheap, but this pattern creates unnecessary intermediate variables.

Apply this diff to simplify:

-        let record_clone = record.clone();
         let schema_key_clone = schema_key.to_string();
         let options_mode = self.options.mode;
         
         // Defer disk write to blocking thread pool and await it
         // This ensures disk write succeeds before we return, maintaining durability
         // If disk write fails, request fails - data is not persisted, so we shouldn't return success
         if options_mode != Mode::Query || stream_type == StreamType::Internal {
             let filename =
                 self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values);
             
             let stream_for_disk = Arc::clone(self);
-            let filename_clone = filename.clone();
-            let filename_for_error = filename.clone(); // Clone for error message
-            let record_for_disk = record_clone.clone();
+            let filename_for_error = filename.clone();
+            let record_for_disk = record.clone();
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5d28bc6 and d0cd1ca.

📒 Files selected for processing (1)
  • src/parseable/streams.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/parseable/streams.rs
🧬 Code graph analysis (1)
src/parseable/streams.rs (2)
src/parseable/staging/writer.rs (3)
  • push (138-148)
  • push (182-206)
  • try_new (58-73)
src/utils/time.rs (2)
  • granularity_range (272-287)
  • new (59-61)
🔇 Additional comments (2)
src/parseable/streams.rs (2)

138-149: Well-documented async conversion with clear durability vs performance tradeoff.

The approach is sound: await disk writes for durability, fire-and-forget memtable updates for performance. The &Arc<Self> receiver enables safe cloning for spawned tasks.


1392-1430: Test helpers correctly adapted for async push.

The sync write_log properly creates a runtime to call async push, while write_log_async directly awaits. Creating a runtime per call is acceptable overhead for test code.

Comment on lines +171 to +179
let mut guard = match stream_for_disk.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while writing to disk for stream {}",
stream_for_disk.stream_name
);
poisoned.into_inner()
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Lock poisoning should fail the operation, not continue with potentially corrupt state.

Using into_inner() after a poisoned lock allows the operation to proceed with potentially corrupt state. For data durability guarantees, this should fail fast.

Apply this diff to fail on poisoned locks:

             tokio::task::spawn_blocking(move || {
-                let mut guard = match stream_for_disk.writer.lock() {
-                    Ok(guard) => guard,
-                    Err(poisoned) => {
-                        error!(
-                            "Writer lock poisoned while writing to disk for stream {}",
-                            stream_for_disk.stream_name
-                        );
-                        poisoned.into_inner()
-                    }
-                };
+                let mut guard = stream_for_disk.writer.lock()
+                    .map_err(|e| StagingError::ObjectStorage(std::io::Error::new(
+                        std::io::ErrorKind::Other,
+                        format!("Writer lock poisoned for stream {}: {}", stream_for_disk.stream_name, e)
+                    )))?;

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/parseable/streams.rs around lines 171-180, the match on
stream_for_disk.writer.lock() currently calls poisoned.into_inner(), which
continues with potentially corrupted state; instead, do not recover the poisoned
lock — log the poisoning and fail the operation by converting the PoisonError
into an error return (or propagate it) so the caller sees failure (e.g., map the
PoisonError to an appropriate Err value and return early or use the ? operator);
if the enclosing function does not return a Result, change its signature to
return Result and propagate the error (or explicitly panic) so the code does not
continue on a poisoned lock.

Comment on lines 218 to 242
// Defer memtable push to blocking thread pool - fire-and-forget for performance
// Memtable is for query performance optimization, not durability
// If memtable push fails, data is still safely on disk and memtable can be rebuilt
{
let stream_for_memtable = Arc::clone(self);
let schema_key_for_memtable = schema_key_clone;
let record_for_memtable = record.clone();

// Spawn without awaiting - fire and forget for performance
// The concat operation at 16384 events is CPU-bound but won't block the request path
tokio::task::spawn_blocking(move || {
let mut guard = match stream_for_memtable.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while pushing to memtable for stream {}",
stream_for_memtable.stream_name
);
poisoned.into_inner()
}
};
// Push to memtable - concat happens here at 16384 events
guard.mem.push(&schema_key_for_memtable, &record_for_memtable);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Fire-and-forget memtable push has no backpressure mechanism.

Spawning tasks without awaiting them can lead to unbounded task queuing during high ingestion load. While the design intentionally sacrifices memtable guarantees for performance, consider monitoring task queue depth or implementing basic backpressure.

Generate a script to check if there's any monitoring or limits on spawned tasks:


🏁 Script executed:

#!/bin/bash
# Search for task queue monitoring or spawn_blocking limits
rg -nC3 'spawn_blocking|JoinSet|task.*queue|semaphore' --type rust -g '!target/**'

Length of output: 12121


Implement backpressure on memtable fire-and-forget tasks to prevent unbounded queuing.

The memtable push at line 228 spawns tasks without await and without any concurrency limit. While other parts of the codebase demonstrate backpressure patterns (e.g., object storage uploads use Semaphore::new(100) at src/storage/object_storage.rs:901, cluster liveness checks use Semaphore::new(10)), the memtable implementation lacks this protection. During sustained high ingestion load, this risks accumulating unbounded spawned tasks and resource exhaustion.

Consider adding a semaphore-based limit or integrating with the JoinSet pattern already used elsewhere in the codebase.

🤖 Prompt for AI Agents
In src/parseable/streams.rs around lines 218 to 242, the memtable
fire-and-forget spawn_blocking has no concurrency limit and can lead to
unbounded task queuing; add a semaphore-based backpressure: introduce an
Arc<Semaphore> (either as a static/shared instance or a field on the stream) and
acquire an owned permit before spawning the blocking task, move the permit into
the spawned closure so it is held for the duration of the mem.push call and
dropped/released when the closure completes, handle acquisition failures
gracefully (e.g., log and skip or await briefly with timeout), and ensure the
spawn_blocking closure still handles the poisoned writer lock as before; this
bounds concurrent memtable push tasks and prevents resource exhaustion.

Comment on lines +229 to +233
let mut guard = match stream_for_memtable.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while pushing to memtable for stream {}",
stream_for_memtable.stream_name
);
poisoned.into_inner()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same lock poisoning issue: should fail instead of continuing.

Identical to the disk write path, using into_inner() after lock poisoning may propagate corrupt state. While memtable is fire-and-forget, it should still fail cleanly.

Apply similar fix as suggested for disk write path - return early or log and skip the operation:

             tokio::task::spawn_blocking(move || {
-                let mut guard = match stream_for_memtable.writer.lock() {
-                    Ok(guard) => guard,
-                    Err(poisoned) => {
-                        error!(
-                            "Writer lock poisoned while pushing to memtable for stream {}",
-                            stream_for_memtable.stream_name
-                        );
-                        poisoned.into_inner()
-                    }
-                };
+                let mut guard = match stream_for_memtable.writer.lock() {
+                    Ok(guard) => guard,
+                    Err(e) => {
+                        error!(
+                            "Writer lock poisoned while pushing to memtable for stream {}: {}. Skipping memtable update.",
+                            stream_for_memtable.stream_name, e
+                        );
+                        return;
+                    }
+                };
🤖 Prompt for AI Agents
In src/parseable/streams.rs around lines 229 to 237, the code currently calls
poisoned.into_inner() on a poisoned writer lock for the memtable which can
propagate corrupted state; instead, treat a poisoned lock as an operation
failure and return early (or propagate an error) rather than continuing. Replace
the Err(poisoned) branch so it logs the poisoning with stream context and then
returns an Err (or a suitable early return/skip path consistent with the
surrounding function's error handling) instead of calling into_inner(); ensure
the function's signature and callers handle the propagated error or skip
accordingly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/parseable/streams.rs (2)

150-150: Redundant clone: record_clone is immediately cloned again.

Line 150 clones record into record_clone, but line 164 immediately clones record_clone again. This intermediate clone serves no purpose.

Apply this diff to eliminate the redundant clone:

-        let record_clone = record.clone();
         let schema_key_clone = schema_key.to_string();
         let options_mode = self.options.mode;
 
@@ -161,7 +160,7 @@
             let stream_for_disk = Arc::clone(self);
             let filename_clone = filename.clone();
             let filename_for_error = filename.clone();
-            let record_for_disk = record_clone.clone();
+            let record_for_disk = record.clone();

169-201: Add explicit return type to spawn_blocking closure for clarity.

The closure's return type is inferred as Result<(), StagingError>, but adding an explicit annotation improves readability and makes error propagation paths clearer.

Consider adding a return type annotation:

-            tokio::task::spawn_blocking(move || {
+            tokio::task::spawn_blocking(move || -> Result<(), StagingError> {
                 let mut guard = match stream_for_disk.writer.lock() {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d0cd1ca and 790e521.

📒 Files selected for processing (1)
  • src/parseable/streams.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-04-07T13:23:10.092Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1288
File: src/handlers/http/modal/mod.rs:279-301
Timestamp: 2025-04-07T13:23:10.092Z
Learning: For critical operations like writing metadata to disk in NodeMetadata::put_on_disk(), it's preferred to let exceptions propagate (using expect/unwrap) rather than trying to recover with fallback mechanisms, as the failure indicates a fundamental system issue that needs immediate attention.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-02-12T17:39:43.392Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/modal/mod.rs:218-221
Timestamp: 2025-02-12T17:39:43.392Z
Learning: In the Parseable codebase, staging directory access failures should trigger a panic rather than being handled as recoverable errors, as it indicates a critical system issue.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/parseable/streams.rs
🧬 Code graph analysis (1)
src/parseable/streams.rs (1)
src/parseable/staging/writer.rs (3)
  • push (138-148)
  • push (182-206)
  • try_new (58-73)
🔇 Additional comments (2)
src/parseable/streams.rs (2)

138-149: LGTM: Clean async conversion with clear documentation.

The signature change to async fn push(self: &Arc<Self>, ...) is correct for enabling spawn_blocking with Arc-cloned references, and the comments clearly document the durability guarantees and fire-and-forget semantics.


1388-1426: LGTM: Test utilities correctly adapted for async push.

Both write_log (creating a runtime for sync tests) and write_log_async (direct await for async tests) are correctly implemented for their respective contexts. The sync version properly blocks on the async push, while the async version leverages the existing Tokio runtime.

@ankitsheoran1 ankitsheoran1 force-pushed the fix/improve-performance-of-ingest-flow-with-spawn_blocking branch from c1bde0b to 313a5ef Compare November 23, 2025 12:22
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

1388-1399: Consider sharing a single runtime across test calls for efficiency.

The write_log helper creates a new Tokio runtime on every invocation (line 1390), which adds overhead during test execution. While this works correctly, consider using a shared runtime or lazy_static for better test performance.

Example refactor:

use std::sync::LazyLock;

static TEST_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Runtime::new().unwrap()
});

fn write_log(staging: &StreamRef, schema: &Schema, mins: i64) {
    let time: NaiveDateTime = Utc::now()
        .checked_sub_signed(TimeDelta::minutes(mins))
        .unwrap()
        .naive_utc();
    let batch = RecordBatch::try_new(
        Arc::new(schema.clone()),
        vec![
            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])),
            Arc::new(Int32Array::from(vec![1, 2, 3])),
            Arc::new(StringArray::from(vec!["a", "b", "c"])),
        ],
    )
    .unwrap();
    
    TEST_RUNTIME.block_on(staging.push(
        "abc",
        &batch,
        time,
        &HashMap::new(),
        StreamType::UserDefined,
    )).unwrap();
    staging.flush(true);
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 790e521 and 313a5ef.

📒 Files selected for processing (6)
  • src/connectors/kafka/processor.rs (1 hunks)
  • src/event/mod.rs (3 hunks)
  • src/handlers/http/ingest.rs (2 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (1 hunks)
  • src/parseable/streams.rs (3 hunks)
  • src/storage/field_stats.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/ingest.rs
  • src/event/mod.rs
  • src/storage/field_stats.rs
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-04-07T13:23:10.092Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1288
File: src/handlers/http/modal/mod.rs:279-301
Timestamp: 2025-04-07T13:23:10.092Z
Learning: For critical operations like writing metadata to disk in NodeMetadata::put_on_disk(), it's preferred to let exceptions propagate (using expect/unwrap) rather than trying to recover with fallback mechanisms, as the failure indicates a fundamental system issue that needs immediate attention.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-02-12T17:39:43.392Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/modal/mod.rs:218-221
Timestamp: 2025-02-12T17:39:43.392Z
Learning: In the Parseable codebase, staging directory access failures should trigger a panic rather than being handled as recoverable errors, as it indicates a critical system issue.

Applied to files:

  • src/parseable/streams.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/parseable/streams.rs
🧬 Code graph analysis (1)
src/parseable/streams.rs (2)
src/parseable/staging/writer.rs (2)
  • push (138-148)
  • push (182-206)
src/utils/time.rs (2)
  • granularity_range (272-287)
  • new (59-61)
🔇 Additional comments (1)
src/parseable/streams.rs (1)

138-149: Excellent async conversion with clear durability semantics.

The method signature change to async fn push(self: &Arc<Self>, ...) is well-designed. The documentation clearly articulates the durability model: disk writes are awaited (ensuring persistence before returning success), while memtable pushes are fire-and-forget (acceptable since data is already on disk).

@ankitsheoran1
Copy link
Author

I have read the CLA Document and I hereby sign the CLA

nitisht added a commit to parseablehq/.github that referenced this pull request Nov 23, 2025
@nitisht
Copy link
Member

nitisht commented Nov 25, 2025

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

@ankitsheoran1
Copy link
Author

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement .
Obviously this is not a good way to analyze performance.

Few more findings -
I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow

Do we have some optimization testing flow ?

@nitisht
Copy link
Member

nitisht commented Nov 28, 2025

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement . Obviously this is not a good way to analyze performance.

Few more findings - I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow

Do we have some optimization testing flow ?

Thanks for the updates. Happy to chat more on slack if you can join https://logg.ing/community. Right now there is no automated ingestion flow, but we test on real world data ingestion scenarios every few weeks on our internal infrastructure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Staging data ingestion and query optimizations

2 participants