From 001c47844812fb25f992555f05b8f2108d0404db Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 25 Oct 2025 04:18:29 +0100 Subject: [PATCH 1/2] chore: release memory to the OS every hour --- Cargo.lock | 34 ++++++++ Cargo.toml | 3 + src/handlers/http/query.rs | 44 +++++++--- src/main.rs | 4 + .../metastores/object_store_metastore.rs | 5 +- src/response.rs | 81 ++++++++++++++++--- src/utils/arrow/mod.rs | 14 +++- 7 files changed, 155 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81afe87bf..19b268877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3533,6 +3533,9 @@ dependencies = [ "temp-dir", "tempfile", "thiserror 2.0.11", + "tikv-jemalloc-ctl", + "tikv-jemalloc-sys", + "tikv-jemallocator", "tokio", "tokio-stream", "tokio-util", @@ -4964,6 +4967,37 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.37" diff --git a/Cargo.toml b/Cargo.toml index 81561712c..147295e6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,9 @@ parquet = "54.0.0" # Web server and HTTP-related actix-cors = "0.7.0" actix-web = { version = "4.9.0", features = ["rustls-0_22"] } +tikv-jemalloc-ctl = "0.6.0" +tikv-jemallocator = "0.6.0" +tikv-jemalloc-sys = "0.6.1" actix-web-httpauth = "0.8" actix-web-prometheus = { version = "0.1" } actix-web-static-files = "4.0" diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index a9dd88b43..d536b2ac7 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -35,7 +35,6 @@ use futures::stream::once; use futures::{Stream, StreamExt, future}; use futures_util::Future; use http::StatusCode; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::collections::HashMap; @@ -52,7 +51,7 @@ use crate::query::error::ExecuteError; use crate::query::{CountsRequest, Query as LogicalQuery, execute}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::Users; -use crate::response::QueryResponse; +use crate::response::{QueryResponse, force_memory_release}; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; @@ -241,9 +240,15 @@ async fn handle_non_streaming_query( with_fields: query_request.fields, } .to_json()?; - Ok(HttpResponse::Ok() + + let http_response = HttpResponse::Ok() .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .json(response)) + .json(response); + + // Force memory release after HTTP response is fully created + force_memory_release(); + + Ok(http_response) } /// Handles streaming queries, returning results as newline-delimited JSON (NDJSON). @@ -324,18 +329,26 @@ fn create_batch_processor( ) -> impl FnMut(Result) -> Result { move |batch_result| match batch_result { Ok(batch) => { - let response = QueryResponse { + // Create response and immediately process to reduce memory retention + let query_response = QueryResponse { records: vec![batch], fields: Vec::new(), fill_null: send_null, with_fields: false, - } - .to_json() - .map_err(|e| { + }; + + let response = query_response.to_json().map_err(|e| { error!("Failed to parse record batch into JSON: {}", e); actix_web::error::ErrorInternalServerError(e) })?; - Ok(Bytes::from(format!("{response}\n"))) + + // Convert to bytes and explicitly drop the response object + let bytes_result = Bytes::from(format!("{response}\n")); + drop(response); // Explicit cleanup + + force_memory_release(); + + Ok(bytes_result) } Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), } @@ -380,12 +393,19 @@ pub async fn get_counts( let (records, _) = get_records_and_fields(&query_request, &creds).await?; if let Some(records) = records { - let json_records = record_batches_to_json(&records)?; - let records = json_records.into_iter().map(Value::Object).collect_vec(); + // Use optimized JSON conversion with explicit memory management + let json_records = { + let converted = record_batches_to_json(&records)?; + drop(records); // Explicitly drop the original records early + converted + }; + + let processed_records: Vec = + json_records.into_iter().map(Value::Object).collect(); let res = json!({ "fields": vec!["start_time", "endTime", "count"], - "records": records, + "records": processed_records, }); return Ok(web::Json(res)); diff --git a/src/main.rs b/src/main.rs index 569022c42..2d74b9fd0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,10 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry, fmt}; +// Use jemalloc as the global allocator +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + #[actix_web::main] async fn main() -> anyhow::Result<()> { init_logger(); diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index b93984f05..10edf9438 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore { /// Delete an overview async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> { let path = RelativePathBuf::from_iter([stream, "overview"]); - Ok(self - .storage - .delete_object(&path) - .await?) + Ok(self.storage.delete_object(&path).await?) } /// This function fetches all the keystones from the underlying object store diff --git a/src/response.rs b/src/response.rs index 8823b032a..f14ba6e29 100644 --- a/src/response.rs +++ b/src/response.rs @@ -16,11 +16,14 @@ * */ +use std::ffi::CString; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use datafusion::arrow::record_batch::RecordBatch; -use itertools::Itertools; use serde_json::{Value, json}; -use tracing::info; +use tracing::{debug, info, warn}; pub struct QueryResponse { pub records: Vec, @@ -32,28 +35,84 @@ pub struct QueryResponse { impl QueryResponse { pub fn to_json(&self) -> Result { info!("{}", "Returning query results"); - let mut json_records = record_batches_to_json(&self.records)?; - if self.fill_null { - for map in &mut json_records { - for field in &self.fields { - if !map.contains_key(field) { - map.insert(field.clone(), Value::Null); + // Process in batches to avoid massive allocations + const BATCH_SIZE: usize = 100; // Process 100 record batches at a time + let mut all_values = Vec::new(); + + for chunk in self.records.chunks(BATCH_SIZE) { + let mut json_records = record_batches_to_json(chunk)?; + + if self.fill_null { + for map in &mut json_records { + for field in &self.fields { + if !map.contains_key(field) { + map.insert(field.clone(), Value::Null); + } } } } + + // Convert this batch to values and add to collection + let batch_values: Vec = json_records.into_iter().map(Value::Object).collect(); + all_values.extend(batch_values); } - let values = json_records.into_iter().map(Value::Object).collect_vec(); let response = if self.with_fields { json!({ "fields": self.fields, - "records": values, + "records": all_values, }) } else { - Value::Array(values) + Value::Array(all_values) }; Ok(response) } } + +impl Drop for QueryResponse { + fn drop(&mut self) { + force_memory_release(); + } +} + +// Rate-limited memory release with proper error handling +static LAST_PURGE: Mutex> = Mutex::new(None); +const PURGE_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour +pub fn force_memory_release() { + { + let mut last_purge = LAST_PURGE.lock().unwrap(); + if let Some(last) = *last_purge { + if last.elapsed() < PURGE_INTERVAL { + return; + } + } + *last_purge = Some(Instant::now()); + } + + // Advance epoch to refresh statistics and trigger potential cleanup + if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) { + warn!("Failed to advance jemalloc epoch: {:?}", e); + } + + // Purge all arenas using MALLCTL_ARENAS_ALL + if let Ok(arena_purge) = CString::new("arena.4096.purge") { + unsafe { + let ret = tikv_jemalloc_sys::mallctl( + arena_purge.as_ptr(), + std::ptr::null_mut(), // oldp (not reading) + std::ptr::null_mut(), // oldlenp (not reading) + std::ptr::null_mut(), // newp (void operation) + 0, // newlen (void operation) + ); + if ret != 0 { + warn!("Arena purge failed with code: {}", ret); + } else { + debug!("Successfully purged all jemalloc arenas"); + } + } + } else { + warn!("Failed to create CString for arena purge"); + } +} diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index be372b59f..d60c8f007 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -48,7 +48,12 @@ use crate::event::DEFAULT_TIMESTAMP_KEY; /// /// A vector of JSON objects representing the record batches. pub fn record_batches_to_json(records: &[RecordBatch]) -> Result>> { - let buf = vec![]; + // Early return for empty records to avoid unnecessary allocations + if records.is_empty() { + return Ok(Vec::new()); + } + + let buf = Vec::with_capacity(records.len() * 1024); // Pre-allocate with reasonable capacity let mut writer = arrow_json::ArrayWriter::new(buf); for record in records { writer.write(record)?; @@ -57,8 +62,11 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result> = - serde_json::from_reader(buf.as_slice()).unwrap_or_default(); + // Use a cursor to avoid extra allocations during parsing + let json_rows: Vec> = { + let cursor = std::io::Cursor::new(buf); + serde_json::from_reader(cursor).unwrap_or_else(|_| Vec::with_capacity(0)) + }; Ok(json_rows) } From db6ba4b115ce27d7db25e6944f5ac350bf39ebfb Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 25 Oct 2025 06:34:14 +0100 Subject: [PATCH 2/2] moved from query handler to scheduled job --- src/handlers/http/modal/ingest_server.rs | 3 + src/handlers/http/modal/query_server.rs | 3 + src/handlers/http/modal/server.rs | 3 + src/handlers/http/query.rs | 8 +-- src/lib.rs | 1 + src/main.rs | 1 + src/memory.rs | 75 ++++++++++++++++++++++++ src/response.rs | 52 +--------------- src/utils/arrow/mod.rs | 2 +- 9 files changed, 91 insertions(+), 57 deletions(-) create mode 100644 src/memory.rs diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 628bd9f0f..9345350f1 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -129,6 +129,9 @@ impl ParseableServer for IngestServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + tokio::spawn(airplane::server()); // Ingestors shouldn't have to deal with OpenId auth flow diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c551884d4..6049cb338 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -129,6 +129,9 @@ impl ParseableServer for QueryServer { analytics::init_analytics_scheduler()?; } + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + if init_cluster_metrics_schedular().is_ok() { info!("Cluster metrics scheduler started successfully"); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index a522697aa..fc51939b1 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -154,6 +154,9 @@ impl ParseableServer for Server { analytics::init_analytics_scheduler()?; } + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index d536b2ac7..c62270c88 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -51,7 +51,7 @@ use crate::query::error::ExecuteError; use crate::query::{CountsRequest, Query as LogicalQuery, execute}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::Users; -use crate::response::{QueryResponse, force_memory_release}; +use crate::response::QueryResponse; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; @@ -245,8 +245,8 @@ async fn handle_non_streaming_query( .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) .json(response); - // Force memory release after HTTP response is fully created - force_memory_release(); + // // Force memory release after HTTP response is fully created + // force_memory_release(); Ok(http_response) } @@ -346,8 +346,6 @@ fn create_batch_processor( let bytes_result = Bytes::from(format!("{response}\n")); drop(response); // Explicit cleanup - force_memory_release(); - Ok(bytes_result) } Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), diff --git a/src/lib.rs b/src/lib.rs index 9493937cf..46cc4c4d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod event; pub mod handlers; pub mod hottier; mod livetail; +pub mod memory; mod metadata; pub mod metastore; pub mod metrics; diff --git a/src/main.rs b/src/main.rs index 2d74b9fd0..c8cce217e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry, fmt}; // Use jemalloc as the global allocator +#[cfg(not(target_env = "msvc"))] #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; diff --git a/src/memory.rs b/src/memory.rs new file mode 100644 index 000000000..8e553e6b8 --- /dev/null +++ b/src/memory.rs @@ -0,0 +1,75 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::ffi::CString; +use std::time::Duration; + +use clokwerk::AsyncScheduler; +use tracing::{info, warn}; + +/// Force memory release using jemalloc +pub fn force_memory_release() { + // Advance epoch to refresh statistics and trigger potential cleanup + if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) { + warn!("Failed to advance jemalloc epoch: {:?}", e); + } + + // Purge each initialized arena + if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() { + for i in 0..n { + if let Ok(name) = CString::new(format!("arena.{i}.purge")) { + unsafe { + let ret = tikv_jemalloc_sys::mallctl( + name.as_ptr(), + std::ptr::null_mut(), + std::ptr::null_mut(), + std::ptr::null_mut(), + 0, + ); + if ret != 0 { + warn!("Arena purge failed for index {i} with code: {ret}"); + } + } + } + } + } else { + warn!("Failed to read jemalloc arenas.narenas"); + } +} + +/// Initialize memory management scheduler +pub fn init_memory_release_scheduler() -> anyhow::Result<()> { + info!("Setting up scheduler for memory release"); + + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(clokwerk::Interval::Hours(1)) + .run(move || async { + info!("Running scheduled memory release"); + force_memory_release(); + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute + } + }); + + Ok(()) +} diff --git a/src/response.rs b/src/response.rs index f14ba6e29..f6eb3cccd 100644 --- a/src/response.rs +++ b/src/response.rs @@ -16,14 +16,10 @@ * */ -use std::ffi::CString; -use std::sync::Mutex; -use std::time::{Duration, Instant}; - use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use datafusion::arrow::record_batch::RecordBatch; use serde_json::{Value, json}; -use tracing::{debug, info, warn}; +use tracing::info; pub struct QueryResponse { pub records: Vec, @@ -70,49 +66,3 @@ impl QueryResponse { Ok(response) } } - -impl Drop for QueryResponse { - fn drop(&mut self) { - force_memory_release(); - } -} - -// Rate-limited memory release with proper error handling -static LAST_PURGE: Mutex> = Mutex::new(None); -const PURGE_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour -pub fn force_memory_release() { - { - let mut last_purge = LAST_PURGE.lock().unwrap(); - if let Some(last) = *last_purge { - if last.elapsed() < PURGE_INTERVAL { - return; - } - } - *last_purge = Some(Instant::now()); - } - - // Advance epoch to refresh statistics and trigger potential cleanup - if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) { - warn!("Failed to advance jemalloc epoch: {:?}", e); - } - - // Purge all arenas using MALLCTL_ARENAS_ALL - if let Ok(arena_purge) = CString::new("arena.4096.purge") { - unsafe { - let ret = tikv_jemalloc_sys::mallctl( - arena_purge.as_ptr(), - std::ptr::null_mut(), // oldp (not reading) - std::ptr::null_mut(), // oldlenp (not reading) - std::ptr::null_mut(), // newp (void operation) - 0, // newlen (void operation) - ); - if ret != 0 { - warn!("Arena purge failed with code: {}", ret); - } else { - debug!("Successfully purged all jemalloc arenas"); - } - } - } else { - warn!("Failed to create CString for arena purge"); - } -} diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index d60c8f007..dea33d26f 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -65,7 +65,7 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result> = { let cursor = std::io::Cursor::new(buf); - serde_json::from_reader(cursor).unwrap_or_else(|_| Vec::with_capacity(0)) + serde_json::from_reader(cursor)? }; Ok(json_rows)