-
-
Notifications
You must be signed in to change notification settings - Fork 153
chore: release memory to the OS every hour #1450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
chore: release memory to the OS every hour #1450
Conversation
WalkthroughAdds jemalloc as the global allocator, introduces a memory release module and periodic scheduler, and applies memory-conscious refactors across HTTP query response generation, Arrow utilities, and startup/init flows to reduce retained memory during processing and streaming. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as HTTP Client
participant Handler as Query Handler
participant Response as QueryResponse
participant Memory as Memory Module
participant Jemalloc as Jemalloc
Client->>Handler: HTTP query request
Handler->>Handler: Execute query, build QueryResponse
Handler->>Response: to_json() (batched, 100-record chunks)
loop per batch
Response->>Response: Convert batch -> JSON values
Response->>Response: Apply fill_null
Response->>Response: Drop intermediate JSON
end
Response-->>Handler: Return JSON/stream frames
Handler->>Memory: optional force_memory_release() (rate-limited)
Memory->>Jemalloc: advance epoch + purge arenas
Jemalloc-->>Memory: freed unused memory
Handler->>Client: send HTTP response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
Cargo.toml (1)
31-33: Jemalloc deps look good; consider optional background threads feature.If you want allocator-driven purging, enabling jemalloc background threads can help; leave off if you prefer manual purges only. (docs.rs)
src/handlers/http/query.rs (2)
333-352: Avoid duplicate purge triggers; Drop already calls it.QueryResponse::drop() already calls force_memory_release(); this explicit call makes it redundant (still cheap due to rate limit, but adds locking per batch). Consider removing to reduce contention on the global mutex.
195-198: Optional: parity in count path.If you want consistent behavior, call force_memory_release() after building the count response too.
src/response.rs (1)
36-43: Lower log level for hot path.
info!("Returning query results")is noisy per request; preferdebug!to keep prod logs clean.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.toml(1 hunks)src/handlers/http/query.rs(4 hunks)src/main.rs(1 hunks)src/metastore/metastores/object_store_metastore.rs(1 hunks)src/response.rs(2 hunks)src/utils/arrow/mod.rs(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-24T11:54:20.259Z
Learnt from: parmesant
PR: parseablehq/parseable#1449
File: src/metastore/metastores/object_store_metastore.rs:83-98
Timestamp: 2025-10-24T11:54:20.259Z
Learning: In the `get_overviews` method in `src/metastore/metastores/object_store_metastore.rs`, using `.ok()` to convert all storage errors to `None` when fetching overview objects is the intended behavior. This intentionally treats missing files and other errors (network, permissions, etc.) the same way.
Applied to files:
src/metastore/metastores/object_store_metastore.rs
🧬 Code graph analysis (3)
src/metastore/metastores/object_store_metastore.rs (1)
src/correlation.rs (1)
path(227-234)
src/handlers/http/query.rs (2)
src/response.rs (2)
force_memory_release(83-118)drop(75-77)src/utils/arrow/mod.rs (1)
record_batches_to_json(50-72)
src/response.rs (1)
src/utils/arrow/mod.rs (1)
record_batches_to_json(50-72)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (2)
src/metastore/metastores/object_store_metastore.rs (1)
113-114: LGTM — stylistic cleanup only.Single-line await + propagate is fine; behavior unchanged.
src/utils/arrow/mod.rs (1)
51-57: Early-return optimization is good.Short-circuiting empty input avoids work and allocations.
src/response.rs
Outdated
| // Purge all arenas using MALLCTL_ARENAS_ALL | ||
| if let Ok(arena_purge) = CString::new("arena.4096.purge") { | ||
| unsafe { | ||
| let ret = tikv_jemalloc_sys::mallctl( | ||
| arena_purge.as_ptr(), | ||
| std::ptr::null_mut(), // oldp (not reading) | ||
| std::ptr::null_mut(), // oldlenp (not reading) | ||
| std::ptr::null_mut(), // newp (void operation) | ||
| 0, // newlen (void operation) | ||
| ); | ||
| if ret != 0 { | ||
| warn!("Arena purge failed with code: {}", ret); | ||
| } else { | ||
| debug!("Successfully purged all jemalloc arenas"); | ||
| } | ||
| } | ||
| } else { | ||
| warn!("Failed to create CString for arena purge"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: purging via "arena.4096.purge" likely does nothing. Use MALLCTL_ARENAS_ALL or iterate narenas.
jemalloc expects arena.<i>.purge, where <i> can be the special constant MALLCTL_ARENAS_ALL to target all arenas. Hardcoding 4096 is not portable and typically returns ENOENT, so the purge won’t run. Either stringify the constant or iterate all arenas and purge each one.
Authoritative docs: use arena.<i>.purge with <i>=MALLCTL_ARENAS_ALL, or resolve a MIB and call mallctlbymib. (jemalloc.net)
Apply this safer approach (iterate arenas and flush current thread’s tcache first):
- // Purge all arenas using MALLCTL_ARENAS_ALL
- if let Ok(arena_purge) = CString::new("arena.4096.purge") {
- unsafe {
- let ret = tikv_jemalloc_sys::mallctl(
- arena_purge.as_ptr(),
- std::ptr::null_mut(), // oldp (not reading)
- std::ptr::null_mut(), // oldlenp (not reading)
- std::ptr::null_mut(), // newp (void operation)
- 0, // newlen (void operation)
- );
- if ret != 0 {
- warn!("Arena purge failed with code: {}", ret);
- } else {
- debug!("Successfully purged all jemalloc arenas");
- }
- }
- } else {
- warn!("Failed to create CString for arena purge");
- }
+ // Flush this thread's tcache to return cached objects to arenas
+ if let Ok(tcache_flush) = CString::new("thread.tcache.flush") {
+ unsafe {
+ let _ = tikv_jemalloc_sys::mallctl(
+ tcache_flush.as_ptr(),
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ 0,
+ );
+ }
+ }
+
+ // Purge each initialized arena
+ if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() {
+ for i in 0..n {
+ if let Ok(name) = CString::new(format!("arena.{i}.purge")) {
+ unsafe {
+ let ret = tikv_jemalloc_sys::mallctl(
+ name.as_ptr(),
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ 0,
+ );
+ if ret != 0 {
+ warn!("Arena purge failed for index {i} with code: {ret}");
+ }
+ }
+ }
+ }
+ debug!("Requested purge for {n} arenas");
+ } else {
+ warn!("Failed to read jemalloc arenas.narenas");
+ }Notes:
thread.tcache.flushhelps ensure cached objects are returned to arenas before purging. (jemalloc.net)- Iterating
arenas::narenasis portable across jemalloc builds. (docs.rs)
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
src/response.rs (2)
35-37: Good: Batched processing reduces peak memory allocations.The batched approach (processing 100 record batches at a time) is a sensible improvement for managing memory during JSON conversion.
Consider making
BATCH_SIZEconfigurable if you find that different workloads benefit from different batch sizes, though 100 is a reasonable default.
39-55: Consider streaming for very large result sets.While batching reduces peak allocations during conversion,
all_valuesstill accumulates the entire result set in memory. For queries returning thousands of record batches (tens of thousands of rows), memory pressure may remain high.If you observe memory issues with very large queries, consider implementing a streaming response mechanism (e.g., chunked transfer encoding) that serializes and sends each batch immediately rather than accumulating.
src/memory.rs (2)
26-53: Good: Arena iteration implementation is correct.The implementation properly iterates all initialized arenas (via
arenas::narenas) and purges each one, which addresses the portable purging requirement mentioned in past reviews.The past review suggested flushing thread-local caches before purging arenas using
thread.tcache.flush. While not strictly necessary, adding a tcache flush before the arena loop can help ensure cached objects are returned to arenas before purging, potentially improving memory release effectiveness:// Flush this thread's tcache to return cached objects to arenas if let Ok(tcache_flush) = CString::new("thread.tcache.flush") { unsafe { let _ = tikv_jemalloc_sys::mallctl( tcache_flush.as_ptr(), std::ptr::null_mut(), std::ptr::null_mut(), std::ptr::null_mut(), 0, ); } }Based on learnings (past review comment).
56-75: Scheduler logic is sound.The hourly schedule with minute-resolution checks is appropriate, and the spawned task will run for the application lifetime as intended.
The spawned task handle is not captured, so there's no explicit shutdown mechanism. For production resilience, consider capturing the
JoinHandleand exposing a graceful shutdown function if you need to cleanly terminate the scheduler (e.g., during testing or controlled shutdowns). For a daemon-like background task, the current approach is acceptable.src/handlers/http/query.rs (3)
244-252: Remove no-op temp and commented code; optionally gate explicit release behind cfg.http_response variable adds no memory win; commented force_memory_release is noise. Keep code lean; if you want an explicit purge, guard it with a feature flag.
Minimal cleanup:
- let http_response = HttpResponse::Ok() - .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .json(response); - - // // Force memory release after HTTP response is fully created - // force_memory_release(); - - Ok(http_response) + Ok(HttpResponse::Ok() + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .json(response))Optional (behind feature):
#[cfg(feature = "jemalloc_mem_release")] crate::memory::force_memory_release();
332-350: Stream path: avoid format!("{response}") and extra JSON Value; write bytes directly.Formatting a serde_json::Value via format! allocates an intermediate String and relies on Display. Serialize straight to Vec, append newline, convert to Bytes.
Apply:
- let response = query_response.to_json().map_err(|e| { + let response = query_response.to_json().map_err(|e| { error!("Failed to parse record batch into JSON: {}", e); actix_web::error::ErrorInternalServerError(e) })?; - - // Convert to bytes and explicitly drop the response object - let bytes_result = Bytes::from(format!("{response}\n")); - drop(response); // Explicit cleanup - - Ok(bytes_result) + // Serialize directly to bytes (+ '\n') to reduce copies + let mut buf = serde_json::to_vec(&response).map_err(|e| { + error!("Failed to serialize JSON: {}", e); + actix_web::error::ErrorInternalServerError(e) + })?; + drop(response); // release JSON Value ASAP + buf.push(b'\n'); + Ok(Bytes::from(buf))If desired, capture a reusable String/Vec in the FnMut environment to amortize allocations.
394-407: Counts JSON: avoid extra map-to-Value copy; explicit drop unnecessary.record_batches_to_json already returns a Vec<Map<String, Value>> which is Serialize. You can embed it directly; the extra into_iter().map(Value::Object) allocates again. The explicit drop(records) is fine but not required.
Use:
- let json_records = { - let converted = record_batches_to_json(&records)?; - drop(records); // Explicitly drop the original records early - converted - }; - - let processed_records: Vec<Value> = - json_records.into_iter().map(Value::Object).collect(); - - let res = json!({ - "fields": vec!["start_time", "endTime", "count"], - "records": processed_records, - }); + let json_records = record_batches_to_json(&records)?; + // records will drop here naturally + let res = json!({ + "fields": vec!["start_time", "endTime", "count"], + "records": json_records, + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
src/handlers/http/modal/ingest_server.rs(1 hunks)src/handlers/http/modal/query_server.rs(1 hunks)src/handlers/http/modal/server.rs(1 hunks)src/handlers/http/query.rs(3 hunks)src/lib.rs(1 hunks)src/main.rs(1 hunks)src/memory.rs(1 hunks)src/response.rs(1 hunks)src/utils/arrow/mod.rs(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/utils/arrow/mod.rs
- src/main.rs
🧰 Additional context used
🧬 Code graph analysis (5)
src/response.rs (1)
src/utils/arrow/mod.rs (1)
record_batches_to_json(50-72)
src/handlers/http/modal/query_server.rs (1)
src/memory.rs (1)
init_memory_release_scheduler(56-75)
src/handlers/http/modal/server.rs (1)
src/memory.rs (1)
init_memory_release_scheduler(56-75)
src/handlers/http/query.rs (1)
src/utils/arrow/mod.rs (1)
record_batches_to_json(50-72)
src/handlers/http/modal/ingest_server.rs (1)
src/memory.rs (1)
init_memory_release_scheduler(56-75)
🔇 Additional comments (3)
src/lib.rs (1)
33-33: LGTM: Module exposure is correct.The public memory module declaration properly exposes the new memory management subsystem.
src/handlers/http/modal/ingest_server.rs (1)
132-133: LGTM: Memory scheduler initialization is correctly placed.The memory release scheduler is initialized after sync setup and before background servers start, with appropriate error propagation.
src/handlers/http/modal/server.rs (1)
157-158: LGTM: Memory scheduler initialization is correctly placed.The memory release scheduler is initialized after analytics setup and before background servers start, with appropriate error propagation. This follows the same pattern as ingest_server.rs.
| // Initialize memory release scheduler | ||
| crate::memory::init_memory_release_scheduler()?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make memory scheduler init idempotent and tie to shutdown.
Calling init_memory_release_scheduler multiple times will spawn duplicate loops. Also, the spawned task runs forever and ignores server shutdown. Treat failures as non-fatal to avoid blocking startup.
Suggested fixes:
- Guard initialization with a OnceCell/Once to ensure single spawn.
- Accept a cancellation token or return a guard JoinHandle to stop on shutdown.
- Log-and-continue on failure (mirror init_cluster_metrics_schedular behavior).
Example (in src/memory.rs) to make it idempotent and cancellable:
+use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
+use once_cell::sync::OnceCell;
+
+static MEMORY_SCHED_INIT: OnceCell<()> = OnceCell::new();
+
-pub fn init_memory_release_scheduler() -> anyhow::Result<()> {
+pub fn init_memory_release_scheduler(cancel: CancellationToken) -> anyhow::Result<Option<JoinHandle<()>>> {
- info!("Setting up scheduler for memory release");
+ if MEMORY_SCHED_INIT.set(()).is_err() {
+ info!("Memory release scheduler already initialized");
+ return Ok(None);
+ }
+ info!("Setting up scheduler for memory release");
@@
- tokio::spawn(async move {
- loop {
- scheduler.run_pending().await;
- tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute
- }
- });
-
- Ok(())
+ let handle = tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = cancel.cancelled() => break,
+ _ = tokio::time::sleep(Duration::from_secs(60)) => {
+ scheduler.run_pending().await;
+ }
+ }
+ }
+ });
+ Ok(Some(handle))
}And here (call site) pass a cancellation token and don’t fail startup if the scheduler fails:
-// Initialize memory release scheduler
-crate::memory::init_memory_release_scheduler()?;
+// Initialize memory release scheduler (non-fatal)
+let cancel_mem = tokio_util::sync::CancellationToken::new();
+if let Err(e) = crate::memory::init_memory_release_scheduler(cancel_mem.clone()) {
+ tracing::warn!("memory scheduler init failed: {e}");
+}
+// On shutdown: cancel_mem.cancel();Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/handlers/http/modal/query_server.rs around lines 132 to 134,
init_memory_release_scheduler() is called directly which can spawn duplicate
background loops on repeated calls, never observes server shutdown, and
currently propagates failures; change the call to use the memory scheduler's
idempotent init (guarded by Once/OnceCell) and pass a cancellation token or keep
the returned guard/JoinHandle so the scheduler can be cancelled on shutdown, and
treat init failures as non-fatal by logging the error and continuing (mirror
init_cluster_metrics_scheduler behavior) instead of returning Err to block
startup.
Summary by CodeRabbit
Performance & Optimization
New Features
Chores