Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ parquet = "54.0.0"
# Web server and HTTP-related
actix-cors = "0.7.0"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
tikv-jemalloc-ctl = "0.6.0"
tikv-jemallocator = "0.6.0"
tikv-jemalloc-sys = "0.6.1"
Comment on lines +31 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that jemalloc dependency versions are current and secure.

The tikv-jemalloc dependencies have been added with specific versions. Please confirm these are the latest stable versions and check for any known security vulnerabilities.

Run the following script to check for the latest versions and security advisories:


🏁 Script executed:

#!/bin/bash
# Check latest versions on crates.io
echo "=== Checking latest versions ==="
for crate in tikv-jemalloc-ctl tikv-jemallocator tikv-jemalloc-sys; do
  echo "Checking $crate..."
  curl -s "https://crates.io/api/v1/crates/$crate" | jq -r ".crate.max_stable_version // .crate.max_version"
done

echo ""
echo "=== Checking for security advisories ==="
for crate in tikv-jemalloc-ctl tikv-jemallocator tikv-jemalloc-sys; do
  gh api graphql -f query="
  {
    securityVulnerabilities(first: 5, ecosystem: RUST, package: \"$crate\") {
      nodes {
        advisory {
          summary
          severity
          publishedAt
        }
        vulnerableVersionRange
        firstPatchedVersion {
          identifier
        }
      }
    }
  }" 2>/dev/null || echo "No advisories found for $crate"
done

Length of output: 380


Update tikv-jemalloc-ctl and tikv-jemallocator to latest stable versions.

tikv-jemalloc-ctl and tikv-jemallocator are pinned to 0.6.0, but version 0.6.1 is available on crates.io. Update both to 0.6.1 for consistency with tikv-jemalloc-sys and to use the latest stable patch. No security vulnerabilities were found for any of these crates.

🤖 Prompt for AI Agents
In Cargo.toml around lines 31 to 33, tikv-jemalloc-ctl and tikv-jemallocator are
pinned at 0.6.0 while tikv-jemalloc-sys is 0.6.1; update both tikv-jemalloc-ctl
and tikv-jemallocator to version "0.6.1" to keep versions consistent, then run
cargo update -p tikv-jemalloc-ctl -p tikv-jemallocator (or regenerate
Cargo.lock) to ensure the lockfile reflects the new patch versions.

actix-web-httpauth = "0.8"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl ParseableServer for IngestServer {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

tokio::spawn(airplane::server());

// Ingestors shouldn't have to deal with OpenId auth flow
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl ParseableServer for QueryServer {
analytics::init_analytics_scheduler()?;
}

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

if init_cluster_metrics_schedular().is_ok() {
info!("Cluster metrics scheduler started successfully");
}
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ impl ParseableServer for Server {
analytics::init_analytics_scheduler()?;
}

// Initialize memory release scheduler
crate::memory::init_memory_release_scheduler()?;

tokio::spawn(handlers::livetail::server());
tokio::spawn(handlers::airplane::server());

Expand Down
40 changes: 29 additions & 11 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use futures::stream::once;
use futures::{Stream, StreamExt, future};
use futures_util::Future;
use http::StatusCode;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
Expand Down Expand Up @@ -241,9 +240,15 @@ async fn handle_non_streaming_query(
with_fields: query_request.fields,
}
.to_json()?;
Ok(HttpResponse::Ok()

let http_response = HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
.json(response);

// // Force memory release after HTTP response is fully created
// force_memory_release();

Ok(http_response)
}

/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON).
Expand Down Expand Up @@ -324,18 +329,24 @@ fn create_batch_processor(
) -> impl FnMut(Result<RecordBatch, QueryError>) -> Result<Bytes, actix_web::Error> {
move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
// Create response and immediately process to reduce memory retention
let query_response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.map_err(|e| {
};

let response = query_response.to_json().map_err(|e| {
error!("Failed to parse record batch into JSON: {}", e);
actix_web::error::ErrorInternalServerError(e)
})?;
Ok(Bytes::from(format!("{response}\n")))

// Convert to bytes and explicitly drop the response object
let bytes_result = Bytes::from(format!("{response}\n"));
drop(response); // Explicit cleanup

Ok(bytes_result)
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
Expand Down Expand Up @@ -380,12 +391,19 @@ pub async fn get_counts(
let (records, _) = get_records_and_fields(&query_request, &creds).await?;

if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
let records = json_records.into_iter().map(Value::Object).collect_vec();
// Use optimized JSON conversion with explicit memory management
let json_records = {
let converted = record_batches_to_json(&records)?;
drop(records); // Explicitly drop the original records early
converted
};

let processed_records: Vec<Value> =
json_records.into_iter().map(Value::Object).collect();

let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
"records": processed_records,
});

return Ok(web::Json(res));
Expand Down
Loading
Loading