diff --git a/server/src/option.rs b/server/src/option.rs index 585e29f4d..14e389e3a 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -187,6 +187,9 @@ pub struct Server { /// Rows in Parquet Rowgroup pub row_group_size: usize, + /// Query memory limit in bytes + pub query_memory_pool_size: Option, + /// Parquet compression algorithm pub parquet_compression: Compression, } @@ -229,6 +232,11 @@ impl FromArgMatches for Server { .get_one::(Self::SEND_ANALYTICS) .cloned() .expect("default for send analytics"); + // converts Gib to bytes before assigning + self.query_memory_pool_size = m + .get_one::(Self::QUERY_MEM_POOL_SIZE) + .cloned() + .map(|gib| gib as usize * 1024usize.pow(3)); self.row_group_size = m .get_one::(Self::ROW_GROUP_SIZE) .cloned() @@ -263,6 +271,7 @@ impl Server { pub const PASSWORD: &str = "password"; pub const CHECK_UPDATE: &str = "check-update"; pub const SEND_ANALYTICS: &str = "send-analytics"; + pub const QUERY_MEM_POOL_SIZE: &str = "query-mempool-size"; pub const ROW_GROUP_SIZE: &str = "row-group-size"; pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo"; pub const DEFAULT_USERNAME: &str = "admin"; @@ -352,6 +361,15 @@ impl Server { .value_parser(value_parser!(bool)) .help("Disable/Enable checking for updates"), ) + .arg( + Arg::new(Self::QUERY_MEM_POOL_SIZE) + .long(Self::QUERY_MEM_POOL_SIZE) + .env("P_QUERY_MEMORY_LIMIT") + .value_name("Gib") + .required(false) + .value_parser(value_parser!(u8)) + .help("Set a fixed memory limit for query"), + ) .arg( Arg::new(Self::ROW_GROUP_SIZE) .long(Self::ROW_GROUP_SIZE) diff --git a/server/src/query.rs b/server/src/query.rs index 7943643ee..b62b7ddf6 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -23,11 +23,14 @@ use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::context::SessionState; +use datafusion::execution::disk_manager::DiskManagerConfig; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; use std::path::Path; use std::sync::Arc; +use sysinfo::{System, SystemExt}; use crate::option::CONFIG; use crate::storage::ObjectStorageError; @@ -81,7 +84,23 @@ impl Query { // create session context for this query fn create_session_context(&self) -> SessionContext { let config = SessionConfig::default(); - let runtime = CONFIG.storage().get_datafusion_runtime(); + let runtime_config = CONFIG + .storage() + .get_datafusion_runtime() + .with_disk_manager(DiskManagerConfig::NewOs); + + let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size { + Some(size) => (size, 1.), + None => { + let mut system = System::new(); + system.refresh_memory(); + let available_mem = system.available_memory(); + (available_mem as usize, 0.85) + } + }; + + let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); let mut state = SessionState::with_config_rt(config, runtime); if let Some(tag) = &self.filter_tag { diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index ce1407e5b..ce037a32c 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -31,7 +31,7 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, error::DataFusionError, - execution::runtime_env::{RuntimeConfig, RuntimeEnv}, + execution::runtime_env::RuntimeConfig, }; use fs_extra::file::{move_file, CopyOptions}; use futures::{stream::FuturesUnordered, TryStreamExt}; @@ -64,10 +64,8 @@ pub struct FSConfig { } impl ObjectStorageProvider for FSConfig { - fn get_datafusion_runtime(&self) -> Arc { - let config = RuntimeConfig::new(); - let runtime = RuntimeEnv::new(config).unwrap(); - Arc::new(runtime) + fn get_datafusion_runtime(&self) -> RuntimeConfig { + RuntimeConfig::new() } fn get_object_store(&self) -> Arc { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 39e2b6877..1bb95e521 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -33,7 +33,8 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ - datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv, + datasource::listing::ListingTable, error::DataFusionError, + execution::runtime_env::RuntimeConfig, }; use relative_path::RelativePath; use relative_path::RelativePathBuf; @@ -48,7 +49,7 @@ const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { - fn get_datafusion_runtime(&self) -> Arc; + fn get_datafusion_runtime(&self) -> RuntimeConfig; fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index a0ba1b744..c6356b32b 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -28,7 +28,7 @@ use datafusion::datasource::object_store::{ DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, }; use datafusion::error::DataFusionError; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeConfig; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum}; @@ -184,7 +184,7 @@ impl S3Config { } impl ObjectStorageProvider for S3Config { - fn get_datafusion_runtime(&self) -> Arc { + fn get_datafusion_runtime(&self) -> RuntimeConfig { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit @@ -194,12 +194,7 @@ impl ObjectStorageProvider for S3Config { let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(s3)); - let config = - RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); - - let runtime = RuntimeEnv::new(config).unwrap(); - - Arc::new(runtime) + RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } fn get_object_store(&self) -> Arc {