diff --git a/Cargo.lock b/Cargo.lock index 81afe87bf..19b268877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3533,6 +3533,9 @@ dependencies = [ "temp-dir", "tempfile", "thiserror 2.0.11", + "tikv-jemalloc-ctl", + "tikv-jemalloc-sys", + "tikv-jemallocator", "tokio", "tokio-stream", "tokio-util", @@ -4964,6 +4967,37 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.37" diff --git a/Cargo.toml b/Cargo.toml index 81561712c..147295e6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,9 @@ parquet = "54.0.0" # Web server and HTTP-related actix-cors = "0.7.0" actix-web = { version = "4.9.0", features = ["rustls-0_22"] } +tikv-jemalloc-ctl = "0.6.0" +tikv-jemallocator = "0.6.0" +tikv-jemalloc-sys = "0.6.1" actix-web-httpauth = "0.8" actix-web-prometheus = { version = "0.1" } actix-web-static-files = "4.0" diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 628bd9f0f..9345350f1 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -129,6 +129,9 @@ impl ParseableServer for IngestServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + tokio::spawn(airplane::server()); // Ingestors shouldn't have to deal with OpenId auth flow diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c551884d4..6049cb338 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -129,6 +129,9 @@ impl ParseableServer for QueryServer { analytics::init_analytics_scheduler()?; } + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + if init_cluster_metrics_schedular().is_ok() { info!("Cluster metrics scheduler started successfully"); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index a522697aa..fc51939b1 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -154,6 +154,9 @@ impl ParseableServer for Server { analytics::init_analytics_scheduler()?; } + // Initialize memory release scheduler + crate::memory::init_memory_release_scheduler()?; + tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index a9dd88b43..c62270c88 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -35,7 +35,6 @@ use futures::stream::once; use futures::{Stream, StreamExt, future}; use futures_util::Future; use http::StatusCode; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::collections::HashMap; @@ -241,9 +240,15 @@ async fn handle_non_streaming_query( with_fields: query_request.fields, } .to_json()?; - Ok(HttpResponse::Ok() + + let http_response = HttpResponse::Ok() .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .json(response)) + .json(response); + + // // Force memory release after HTTP response is fully created + // force_memory_release(); + + Ok(http_response) } /// Handles streaming queries, returning results as newline-delimited JSON (NDJSON). @@ -324,18 +329,24 @@ fn create_batch_processor( ) -> impl FnMut(Result) -> Result { move |batch_result| match batch_result { Ok(batch) => { - let response = QueryResponse { + // Create response and immediately process to reduce memory retention + let query_response = QueryResponse { records: vec![batch], fields: Vec::new(), fill_null: send_null, with_fields: false, - } - .to_json() - .map_err(|e| { + }; + + let response = query_response.to_json().map_err(|e| { error!("Failed to parse record batch into JSON: {}", e); actix_web::error::ErrorInternalServerError(e) })?; - Ok(Bytes::from(format!("{response}\n"))) + + // Convert to bytes and explicitly drop the response object + let bytes_result = Bytes::from(format!("{response}\n")); + drop(response); // Explicit cleanup + + Ok(bytes_result) } Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), } @@ -380,12 +391,19 @@ pub async fn get_counts( let (records, _) = get_records_and_fields(&query_request, &creds).await?; if let Some(records) = records { - let json_records = record_batches_to_json(&records)?; - let records = json_records.into_iter().map(Value::Object).collect_vec(); + // Use optimized JSON conversion with explicit memory management + let json_records = { + let converted = record_batches_to_json(&records)?; + drop(records); // Explicitly drop the original records early + converted + }; + + let processed_records: Vec = + json_records.into_iter().map(Value::Object).collect(); let res = json!({ "fields": vec!["start_time", "endTime", "count"], - "records": records, + "records": processed_records, }); return Ok(web::Json(res)); diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index 41e2a21e3..5e38f17d2 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -16,7 +16,8 @@ * */ -use std::sync::{Arc, LazyLock, atomic::AtomicBool}; +use std::collections::VecDeque; +use std::sync::{Arc, LazyLock, Mutex, atomic::AtomicBool}; use actix_web::{ body::MessageBody, @@ -34,8 +35,75 @@ use tracing::{info, trace, warn}; use crate::analytics::{SYS_INFO, refresh_sys_info}; use crate::parseable::PARSEABLE; +#[derive(Debug, Clone)] +struct ResourceSample { + cpu_usage: f32, + memory_usage: f32, + timestamp: std::time::Instant, +} + +/// Structure to maintain rolling average of resource utilization +struct ResourceHistory { + samples: VecDeque, + window_duration: Duration, +} + +impl ResourceHistory { + fn new(window_duration: Duration) -> Self { + Self { + samples: VecDeque::new(), + window_duration, + } + } + + fn add_sample(&mut self, cpu_usage: f32, memory_usage: f32) { + let now = std::time::Instant::now(); + let sample = ResourceSample { + cpu_usage, + memory_usage, + timestamp: now, + }; + + // Add new sample + self.samples.push_back(sample); + + // Remove old samples outside the window + let cutoff_time = now - self.window_duration; + while let Some(front) = self.samples.front() { + if front.timestamp < cutoff_time { + self.samples.pop_front(); + } else { + break; + } + } + } + + fn get_average(&self) -> Option<(f32, f32)> { + if self.samples.is_empty() { + return None; + } + + let count = self.samples.len() as f32; + let (total_cpu, total_memory) = + self.samples + .iter() + .fold((0.0, 0.0), |(cpu_acc, mem_acc), sample| { + (cpu_acc + sample.cpu_usage, mem_acc + sample.memory_usage) + }); + + Some((total_cpu / count, total_memory / count)) + } + + fn sample_count(&self) -> usize { + self.samples.len() + } +} + static RESOURCE_CHECK_ENABLED: LazyLock> = - LazyLock::new(|| Arc::new(AtomicBool::new(false))); + LazyLock::new(|| Arc::new(AtomicBool::new(true))); + +static RESOURCE_HISTORY: LazyLock>> = + LazyLock::new(|| Arc::new(Mutex::new(ResourceHistory::new(Duration::from_secs(120))))); /// Spawn a background task to monitor system resources pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { @@ -48,9 +116,13 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { let memory_threshold = PARSEABLE.options.memory_utilization_threshold; info!( - "Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%", + "Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}% (2-minute rolling average)", cpu_threshold, memory_threshold ); + + // Calculate minimum samples needed for a reliable 2-minute average + let min_samples_for_decision = std::cmp::max(1, 120 / resource_check_interval as usize); + loop { select! { _ = check_interval.tick() => { @@ -65,8 +137,6 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { (used_memory, total_memory, cpu_usage) }).await.unwrap(); - let mut resource_ok = true; - // Calculate memory usage percentage let memory_usage = if total_memory > 0.0 { (used_memory / total_memory) * 100.0 @@ -74,23 +144,54 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { 0.0 }; - // Log current resource usage every few checks for debugging - info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", - cpu_usage, memory_usage, - used_memory / 1024.0 / 1024.0 / 1024.0, - total_memory / 1024.0 / 1024.0 / 1024.0); + // Add current sample to history + { + let mut history = RESOURCE_HISTORY.lock().unwrap(); + history.add_sample(cpu_usage, memory_usage); + } - // Check memory utilization - if memory_usage > memory_threshold { - warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)", - memory_usage, memory_threshold); + // Get rolling averages + let (avg_cpu, avg_memory, sample_count) = { + let history = RESOURCE_HISTORY.lock().unwrap(); + if let Some((cpu_avg, mem_avg)) = history.get_average() { + (cpu_avg, mem_avg, history.sample_count()) + } else { + (cpu_usage, memory_usage, 1) // Fallback to current values if no history + } + }; + + // Log current and average resource usage + info!( + "Resource usage - Current: CPU {:.1}%, Memory {:.1}% | 2-min avg: CPU {:.1}%, Memory {:.1}% (samples: {})", + cpu_usage, memory_usage, avg_cpu, avg_memory, sample_count + ); + + // Only make decisions based on rolling average if we have enough samples + let (decision_cpu, decision_memory) = if sample_count >= min_samples_for_decision { + (avg_cpu, avg_memory) + } else { + // For the first few minutes, use current values but be more conservative + info!("Still warming up resource history (need {} samples, have {})", min_samples_for_decision, sample_count); + (cpu_usage, memory_usage) + }; + + let mut resource_ok = true; + + // Check memory utilization against rolling average + if decision_memory > memory_threshold { + warn!( + "High memory usage detected: 2-min avg {:.1}% (threshold: {:.1}%, current: {:.1}%)", + decision_memory, memory_threshold, memory_usage + ); resource_ok = false; } - // Check CPU utilization - if cpu_usage > cpu_threshold { - warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)", - cpu_usage, cpu_threshold); + // Check CPU utilization against rolling average + if decision_cpu > cpu_threshold { + warn!( + "High CPU usage detected: 2-min avg {:.1}% (threshold: {:.1}%, current: {:.1}%)", + decision_cpu, cpu_threshold, cpu_usage + ); resource_ok = false; } @@ -100,9 +201,9 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { // Log state changes if previous_state != resource_ok { if resource_ok { - info!("Resource utilization back to normal - requests will be accepted"); + info!("Resource utilization back to normal (2-min avg: CPU {:.1}%, Memory {:.1}%) - requests will be accepted", avg_cpu, avg_memory); } else { - warn!("Resource utilization too high - requests will be rejected"); + warn!("Resource utilization too high (2-min avg: CPU {:.1}%, Memory {:.1}%) - requests will be rejected", avg_cpu, avg_memory); } } }, @@ -116,7 +217,7 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { } /// Middleware to check system resource utilization before processing requests -/// Returns 503 Service Unavailable if resources are over-utilized +/// Returns 503 Service Unavailable if resources are over-utilized (based on 2-minute rolling average) pub async fn check_resource_utilization_middleware( req: ServiceRequest, next: Next, @@ -124,9 +225,9 @@ pub async fn check_resource_utilization_middleware( let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); if !resource_ok { - let error_msg = "Server resources over-utilized"; + let error_msg = "Server resources over-utilized (based on 2-minute rolling average)"; warn!( - "Rejecting request to {} due to resource constraints", + "Rejecting request to {} due to resource constraints (2-minute average above threshold)", req.path() ); return Err(ErrorServiceUnavailable(error_msg)); @@ -135,3 +236,66 @@ pub async fn check_resource_utilization_middleware( // Continue processing the request if resource utilization is within limits next.call(req).await } + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_resource_history_basic() { + let mut history = ResourceHistory::new(Duration::from_secs(60)); + + // Add some samples + history.add_sample(50.0, 60.0); + history.add_sample(70.0, 80.0); + + let (avg_cpu, avg_memory) = history.get_average().unwrap(); + assert_eq!(avg_cpu, 60.0); // (50 + 70) / 2 + assert_eq!(avg_memory, 70.0); // (60 + 80) / 2 + assert_eq!(history.sample_count(), 2); + } + + #[test] + fn test_resource_history_window_cleanup() { + let mut history = ResourceHistory::new(Duration::from_millis(100)); + + // Add samples + history.add_sample(50.0, 60.0); + std::thread::sleep(Duration::from_millis(50)); + history.add_sample(70.0, 80.0); + + // Both samples should be present + assert_eq!(history.sample_count(), 2); + + // Wait for first sample to expire + std::thread::sleep(Duration::from_millis(100)); + history.add_sample(90.0, 100.0); + + // Old samples should be cleaned up, only recent samples remain + assert!(history.sample_count() <= 2); + + let (avg_cpu, avg_memory) = history.get_average().unwrap(); + // Should be average of recent samples only + assert!(avg_cpu >= 70.0); + assert!(avg_memory >= 80.0); + } + + #[test] + fn test_resource_history_empty() { + let history = ResourceHistory::new(Duration::from_secs(60)); + assert!(history.get_average().is_none()); + assert_eq!(history.sample_count(), 0); + } + + #[test] + fn test_resource_history_single_sample() { + let mut history = ResourceHistory::new(Duration::from_secs(60)); + history.add_sample(75.5, 85.3); + + let (avg_cpu, avg_memory) = history.get_average().unwrap(); + assert_eq!(avg_cpu, 75.5); + assert_eq!(avg_memory, 85.3); + assert_eq!(history.sample_count(), 1); + } +} diff --git a/src/lib.rs b/src/lib.rs index 9493937cf..46cc4c4d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod event; pub mod handlers; pub mod hottier; mod livetail; +pub mod memory; mod metadata; pub mod metastore; pub mod metrics; diff --git a/src/main.rs b/src/main.rs index 569022c42..c8cce217e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,11 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry, fmt}; +// Use jemalloc as the global allocator +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + #[actix_web::main] async fn main() -> anyhow::Result<()> { init_logger(); diff --git a/src/memory.rs b/src/memory.rs new file mode 100644 index 000000000..8e553e6b8 --- /dev/null +++ b/src/memory.rs @@ -0,0 +1,75 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::ffi::CString; +use std::time::Duration; + +use clokwerk::AsyncScheduler; +use tracing::{info, warn}; + +/// Force memory release using jemalloc +pub fn force_memory_release() { + // Advance epoch to refresh statistics and trigger potential cleanup + if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) { + warn!("Failed to advance jemalloc epoch: {:?}", e); + } + + // Purge each initialized arena + if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() { + for i in 0..n { + if let Ok(name) = CString::new(format!("arena.{i}.purge")) { + unsafe { + let ret = tikv_jemalloc_sys::mallctl( + name.as_ptr(), + std::ptr::null_mut(), + std::ptr::null_mut(), + std::ptr::null_mut(), + 0, + ); + if ret != 0 { + warn!("Arena purge failed for index {i} with code: {ret}"); + } + } + } + } + } else { + warn!("Failed to read jemalloc arenas.narenas"); + } +} + +/// Initialize memory management scheduler +pub fn init_memory_release_scheduler() -> anyhow::Result<()> { + info!("Setting up scheduler for memory release"); + + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(clokwerk::Interval::Hours(1)) + .run(move || async { + info!("Running scheduled memory release"); + force_memory_release(); + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute + } + }); + + Ok(()) +} diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index b93984f05..10edf9438 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore { /// Delete an overview async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> { let path = RelativePathBuf::from_iter([stream, "overview"]); - Ok(self - .storage - .delete_object(&path) - .await?) + Ok(self.storage.delete_object(&path).await?) } /// This function fetches all the keystones from the underlying object store diff --git a/src/response.rs b/src/response.rs index 8823b032a..f6eb3cccd 100644 --- a/src/response.rs +++ b/src/response.rs @@ -18,7 +18,6 @@ use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use datafusion::arrow::record_batch::RecordBatch; -use itertools::Itertools; use serde_json::{Value, json}; use tracing::info; @@ -32,26 +31,36 @@ pub struct QueryResponse { impl QueryResponse { pub fn to_json(&self) -> Result { info!("{}", "Returning query results"); - let mut json_records = record_batches_to_json(&self.records)?; - if self.fill_null { - for map in &mut json_records { - for field in &self.fields { - if !map.contains_key(field) { - map.insert(field.clone(), Value::Null); + // Process in batches to avoid massive allocations + const BATCH_SIZE: usize = 100; // Process 100 record batches at a time + let mut all_values = Vec::new(); + + for chunk in self.records.chunks(BATCH_SIZE) { + let mut json_records = record_batches_to_json(chunk)?; + + if self.fill_null { + for map in &mut json_records { + for field in &self.fields { + if !map.contains_key(field) { + map.insert(field.clone(), Value::Null); + } } } } + + // Convert this batch to values and add to collection + let batch_values: Vec = json_records.into_iter().map(Value::Object).collect(); + all_values.extend(batch_values); } - let values = json_records.into_iter().map(Value::Object).collect_vec(); let response = if self.with_fields { json!({ "fields": self.fields, - "records": values, + "records": all_values, }) } else { - Value::Array(values) + Value::Array(all_values) }; Ok(response) diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index be372b59f..dea33d26f 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -48,7 +48,12 @@ use crate::event::DEFAULT_TIMESTAMP_KEY; /// /// A vector of JSON objects representing the record batches. pub fn record_batches_to_json(records: &[RecordBatch]) -> Result>> { - let buf = vec![]; + // Early return for empty records to avoid unnecessary allocations + if records.is_empty() { + return Ok(Vec::new()); + } + + let buf = Vec::with_capacity(records.len() * 1024); // Pre-allocate with reasonable capacity let mut writer = arrow_json::ArrayWriter::new(buf); for record in records { writer.write(record)?; @@ -57,8 +62,11 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result> = - serde_json::from_reader(buf.as_slice()).unwrap_or_default(); + // Use a cursor to avoid extra allocations during parsing + let json_rows: Vec> = { + let cursor = std::io::Cursor::new(buf); + serde_json::from_reader(cursor)? + }; Ok(json_rows) }