Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub struct Server {
/// Rows in Parquet Rowgroup
pub row_group_size: usize,

/// Query memory limit in bytes
pub query_memory_pool_size: Option<usize>,

/// Parquet compression algorithm
pub parquet_compression: Compression,
}
Expand Down Expand Up @@ -229,6 +232,11 @@ impl FromArgMatches for Server {
.get_one::<bool>(Self::SEND_ANALYTICS)
.cloned()
.expect("default for send analytics");
// converts Gib to bytes before assigning
self.query_memory_pool_size = m
.get_one::<u8>(Self::QUERY_MEM_POOL_SIZE)
.cloned()
.map(|gib| gib as usize * 1024usize.pow(3));
self.row_group_size = m
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
Expand Down Expand Up @@ -263,6 +271,7 @@ impl Server {
pub const PASSWORD: &str = "password";
pub const CHECK_UPDATE: &str = "check-update";
pub const SEND_ANALYTICS: &str = "send-analytics";
pub const QUERY_MEM_POOL_SIZE: &str = "query-mempool-size";
pub const ROW_GROUP_SIZE: &str = "row-group-size";
pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo";
pub const DEFAULT_USERNAME: &str = "admin";
Expand Down Expand Up @@ -352,6 +361,15 @@ impl Server {
.value_parser(value_parser!(bool))
.help("Disable/Enable checking for updates"),
)
.arg(
Arg::new(Self::QUERY_MEM_POOL_SIZE)
.long(Self::QUERY_MEM_POOL_SIZE)
.env("P_QUERY_MEMORY_LIMIT")
.value_name("Gib")
.required(false)
.value_parser(value_parser!(u8))
.help("Set a fixed memory limit for query"),
)
.arg(
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
Expand Down
21 changes: 20 additions & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::*;
use itertools::Itertools;
use serde_json::Value;
use std::path::Path;
use std::sync::Arc;
use sysinfo::{System, SystemExt};

use crate::option::CONFIG;
use crate::storage::ObjectStorageError;
Expand Down Expand Up @@ -81,7 +84,23 @@ impl Query {
// create session context for this query
fn create_session_context(&self) -> SessionContext {
let config = SessionConfig::default();
let runtime = CONFIG.storage().get_datafusion_runtime();
let runtime_config = CONFIG
.storage()
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);

let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size {
Some(size) => (size, 1.),
None => {
let mut system = System::new();
system.refresh_memory();
let available_mem = system.available_memory();
(available_mem as usize, 0.85)
}
};

let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
let mut state = SessionState::with_config_rt(config, runtime);

if let Some(tag) = &self.filter_tag {
Expand Down
8 changes: 3 additions & 5 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
execution::runtime_env::RuntimeConfig,
};
use fs_extra::file::{move_file, CopyOptions};
use futures::{stream::FuturesUnordered, TryStreamExt};
Expand Down Expand Up @@ -64,10 +64,8 @@ pub struct FSConfig {
}

impl ObjectStorageProvider for FSConfig {
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv> {
let config = RuntimeConfig::new();
let runtime = RuntimeEnv::new(config).unwrap();
Arc::new(runtime)
fn get_datafusion_runtime(&self) -> RuntimeConfig {
RuntimeConfig::new()
}

fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {
Expand Down
5 changes: 3 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{
datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv,
datasource::listing::ListingTable, error::DataFusionError,
execution::runtime_env::RuntimeConfig,
};
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
Expand All @@ -48,7 +49,7 @@ const SCHEMA_FILE_NAME: &str = ".schema";
const ALERT_FILE_NAME: &str = ".alert.json";

pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug {
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv>;
fn get_datafusion_runtime(&self) -> RuntimeConfig;
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send>;
fn get_endpoint(&self) -> String;
fn register_store_metrics(&self, handler: &PrometheusMetrics);
Expand Down
11 changes: 3 additions & 8 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeConfig;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum};
Expand Down Expand Up @@ -184,7 +184,7 @@ impl S3Config {
}

impl ObjectStorageProvider for S3Config {
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv> {
fn get_datafusion_runtime(&self) -> RuntimeConfig {
let s3 = self.get_default_builder().build().unwrap();

// limit objectstore to a concurrent request limit
Expand All @@ -194,12 +194,7 @@ impl ObjectStorageProvider for S3Config {
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
object_store_registry.register_store(url.as_ref(), Arc::new(s3));

let config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));

let runtime = RuntimeEnv::new(config).unwrap();

Arc::new(runtime)
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
}

fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {
Expand Down