Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ parquet = "54.0.0"
# Web server and HTTP-related
actix-cors = "0.7.0"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
tikv-jemalloc-ctl = "0.6.0"
tikv-jemallocator = "0.6.0"
tikv-jemalloc-sys = "0.6.1"
actix-web-httpauth = "0.8"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl ParseableServer for IngestServer {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

tokio::spawn(airplane::server());

// Ingestors shouldn't have to deal with OpenId auth flow
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl ParseableServer for QueryServer {
analytics::init_analytics_scheduler()?;
}

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

Comment on lines +132 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make memory scheduler init idempotent and tie to shutdown.

Calling init_memory_release_scheduler multiple times will spawn duplicate loops. Also, the spawned task runs forever and ignores server shutdown. Treat failures as non-fatal to avoid blocking startup.

Suggested fixes:

  • Guard initialization with a OnceCell/Once to ensure single spawn.
  • Accept a cancellation token or return a guard JoinHandle to stop on shutdown.
  • Log-and-continue on failure (mirror init_cluster_metrics_schedular behavior).

Example (in src/memory.rs) to make it idempotent and cancellable:

+use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
+use once_cell::sync::OnceCell;
+
+static MEMORY_SCHED_INIT: OnceCell<()> = OnceCell::new();
+
-pub fn init_memory_release_scheduler() -> anyhow::Result<()> {
+pub fn init_memory_release_scheduler(cancel: CancellationToken) -> anyhow::Result<Option<JoinHandle<()>>> {
-    info!("Setting up scheduler for memory release");
+    if MEMORY_SCHED_INIT.set(()).is_err() {
+        info!("Memory release scheduler already initialized");
+        return Ok(None);
+    }
+    info!("Setting up scheduler for memory release");
@@
-    tokio::spawn(async move {
-        loop {
-            scheduler.run_pending().await;
-            tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute
-        }
-    });
-
-    Ok(())
+    let handle = tokio::spawn(async move {
+        loop {
+            tokio::select! {
+                _ = cancel.cancelled() => break,
+                _ = tokio::time::sleep(Duration::from_secs(60)) => {
+                    scheduler.run_pending().await;
+                }
+            }
+        }
+    });
+    Ok(Some(handle))
}

And here (call site) pass a cancellation token and don’t fail startup if the scheduler fails:

-// Initialize memory release scheduler
-crate::memory::init_memory_release_scheduler()?;
+// Initialize memory release scheduler (non-fatal)
+let cancel_mem = tokio_util::sync::CancellationToken::new();
+if let Err(e) = crate::memory::init_memory_release_scheduler(cancel_mem.clone()) {
+    tracing::warn!("memory scheduler init failed: {e}");
+}
+// On shutdown: cancel_mem.cancel();

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/handlers/http/modal/query_server.rs around lines 132 to 134,
init_memory_release_scheduler() is called directly which can spawn duplicate
background loops on repeated calls, never observes server shutdown, and
currently propagates failures; change the call to use the memory scheduler's
idempotent init (guarded by Once/OnceCell) and pass a cancellation token or keep
the returned guard/JoinHandle so the scheduler can be cancelled on shutdown, and
treat init failures as non-fatal by logging the error and continuing (mirror
init_cluster_metrics_scheduler behavior) instead of returning Err to block
startup.

if init_cluster_metrics_schedular().is_ok() {
info!("Cluster metrics scheduler started successfully");
}
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ impl ParseableServer for Server {
analytics::init_analytics_scheduler()?;
}

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

tokio::spawn(handlers::livetail::server());
tokio::spawn(handlers::airplane::server());

Expand Down
40 changes: 29 additions & 11 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use futures::stream::once;
use futures::{Stream, StreamExt, future};
use futures_util::Future;
use http::StatusCode;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
Expand Down Expand Up @@ -241,9 +240,15 @@ async fn handle_non_streaming_query(
with_fields: query_request.fields,
}
.to_json()?;
Ok(HttpResponse::Ok()

let http_response = HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
.json(response);

// // Force memory release after HTTP response is fully created
// force_memory_release();

Ok(http_response)
}

/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON).
Expand Down Expand Up @@ -324,18 +329,24 @@ fn create_batch_processor(
) -> impl FnMut(Result<RecordBatch, QueryError>) -> Result<Bytes, actix_web::Error> {
move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
// Create response and immediately process to reduce memory retention
let query_response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.map_err(|e| {
};

let response = query_response.to_json().map_err(|e| {
error!("Failed to parse record batch into JSON: {}", e);
actix_web::error::ErrorInternalServerError(e)
})?;
Ok(Bytes::from(format!("{response}\n")))

// Convert to bytes and explicitly drop the response object
let bytes_result = Bytes::from(format!("{response}\n"));
drop(response); // Explicit cleanup

Ok(bytes_result)
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
Expand Down Expand Up @@ -380,12 +391,19 @@ pub async fn get_counts(
let (records, _) = get_records_and_fields(&query_request, &creds).await?;

if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
let records = json_records.into_iter().map(Value::Object).collect_vec();
// Use optimized JSON conversion with explicit memory management
let json_records = {
let converted = record_batches_to_json(&records)?;
drop(records); // Explicitly drop the original records early
converted
};

let processed_records: Vec<Value> =
json_records.into_iter().map(Value::Object).collect();

let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
"records": processed_records,
});

return Ok(web::Json(res));
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod event;
pub mod handlers;
pub mod hottier;
mod livetail;
pub mod memory;
mod metadata;
pub mod metastore;
pub mod metrics;
Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Registry, fmt};

// Use jemalloc as the global allocator
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
Expand Down
75 changes: 75 additions & 0 deletions src/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::ffi::CString;
use std::time::Duration;

use clokwerk::AsyncScheduler;
use tracing::{info, warn};

/// Force memory release using jemalloc
pub fn force_memory_release() {
// Advance epoch to refresh statistics and trigger potential cleanup
if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) {
warn!("Failed to advance jemalloc epoch: {:?}", e);
}

// Purge each initialized arena
if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() {
for i in 0..n {
if let Ok(name) = CString::new(format!("arena.{i}.purge")) {
unsafe {
let ret = tikv_jemalloc_sys::mallctl(
name.as_ptr(),
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
);
if ret != 0 {
warn!("Arena purge failed for index {i} with code: {ret}");
}
}
}
}
} else {
warn!("Failed to read jemalloc arenas.narenas");
}
}

/// Initialize memory management scheduler
pub fn init_memory_release_scheduler() -> anyhow::Result<()> {
info!("Setting up scheduler for memory release");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(clokwerk::Interval::Hours(1))
.run(move || async {
info!("Running scheduled memory release");
force_memory_release();
});

tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute
}
});

Ok(())
}
5 changes: 1 addition & 4 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore {
/// Delete an overview
async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> {
let path = RelativePathBuf::from_iter([stream, "overview"]);
Ok(self
.storage
.delete_object(&path)
.await?)
Ok(self.storage.delete_object(&path).await?)
}

/// This function fetches all the keystones from the underlying object store
Expand Down
29 changes: 19 additions & 10 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{Value, json};
use tracing::info;

Expand All @@ -32,26 +31,36 @@ pub struct QueryResponse {
impl QueryResponse {
pub fn to_json(&self) -> Result<Value, QueryError> {
info!("{}", "Returning query results");
let mut json_records = record_batches_to_json(&self.records)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
// Process in batches to avoid massive allocations
const BATCH_SIZE: usize = 100; // Process 100 record batches at a time
let mut all_values = Vec::new();

for chunk in self.records.chunks(BATCH_SIZE) {
let mut json_records = record_batches_to_json(chunk)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
}
}
}
}

// Convert this batch to values and add to collection
let batch_values: Vec<Value> = json_records.into_iter().map(Value::Object).collect();
all_values.extend(batch_values);
}
let values = json_records.into_iter().map(Value::Object).collect_vec();

let response = if self.with_fields {
json!({
"fields": self.fields,
"records": values,
"records": all_values,
})
} else {
Value::Array(values)
Value::Array(all_values)
};

Ok(response)
Expand Down
14 changes: 11 additions & 3 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ use crate::event::DEFAULT_TIMESTAMP_KEY;
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
// Early return for empty records to avoid unnecessary allocations
if records.is_empty() {
return Ok(Vec::new());
}

let buf = Vec::with_capacity(records.len() * 1024); // Pre-allocate with reasonable capacity
let mut writer = arrow_json::ArrayWriter::new(buf);
for record in records {
writer.write(record)?;
Expand All @@ -57,8 +62,11 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String,

let buf = writer.into_inner();

let json_rows: Vec<Map<String, Value>> =
serde_json::from_reader(buf.as_slice()).unwrap_or_default();
// Use a cursor to avoid extra allocations during parsing
let json_rows: Vec<Map<String, Value>> = {
let cursor = std::io::Cursor::new(buf);
serde_json::from_reader(cursor)?
};

Ok(json_rows)
}
Expand Down
Loading