From c31727175238c63f2b63867571e5f5193ef8c27e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cniladrix719=E2=80=9D?= Date: Sun, 23 Nov 2025 20:11:11 +0530 Subject: [PATCH 1/2] feat: Support configuring Parseable via TOML config file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “niladrix719” --- Cargo.lock | 121 ++++++++++++++----- Cargo.toml | 1 + README.md | 20 ++++ src/cli.rs | 10 ++ src/config_loader.rs | 277 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/parseable/mod.rs | 139 +++++++++++----------- 7 files changed, 474 insertions(+), 95 deletions(-) create mode 100644 src/config_loader.rs diff --git a/Cargo.lock b/Cargo.lock index 209507dcc..09752efe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,7 +588,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.7.1", + "indexmap 2.12.1", "lexical-core", "num", "serde", @@ -1014,7 +1014,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fbd1fe9db3ebf71b89060adaf7b0504c2d6a425cf061313099547e382c2e472" dependencies = [ "serde", - "toml", + "toml 0.8.19", ] [[package]] @@ -1504,7 +1504,7 @@ dependencies = [ "base64 0.22.1", "half", "hashbrown 0.14.5", - "indexmap 2.7.1", + "indexmap 2.12.1", "libc", "log", "object_store", @@ -1565,7 +1565,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", - "indexmap 2.7.1", + "indexmap 2.12.1", "paste", "recursive", "serde_json", @@ -1739,7 +1739,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "indexmap 2.7.1", + "indexmap 2.12.1", "itertools 0.14.0", "log", "recursive", @@ -1765,7 +1765,7 @@ dependencies = [ "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "indexmap 2.7.1", + "indexmap 2.12.1", "itertools 0.14.0", "log", "paste", @@ -1833,7 +1833,7 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "indexmap 2.7.1", + "indexmap 2.12.1", "itertools 0.14.0", "log", "parking_lot", @@ -1853,7 +1853,7 @@ dependencies = [ "bigdecimal", "datafusion-common", "datafusion-expr", - "indexmap 2.7.1", + "indexmap 2.12.1", "log", "recursive", "regex", @@ -2251,7 +2251,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.7.1", + "indexmap 2.12.1", "slab", "tokio", "tokio-util", @@ -2270,7 +2270,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.2.0", - "indexmap 2.7.1", + "indexmap 2.12.1", "slab", "tokio", "tokio-util", @@ -2310,6 +2310,12 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "heck" version = "0.5.0" @@ -2744,12 +2750,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.7.1" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown 0.16.1", ] [[package]] @@ -3536,6 +3542,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "toml 0.9.8", "tonic 0.12.3", "tonic-web", "tower-http 0.6.2", @@ -3601,7 +3608,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset", - "indexmap 2.7.1", + "indexmap 2.12.1", ] [[package]] @@ -4498,18 +4505,28 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.217" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -4522,7 +4539,7 @@ version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" dependencies = [ - "indexmap 2.7.1", + "indexmap 2.12.1", "itoa", "memchr", "ryu", @@ -4549,6 +4566,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -5122,11 +5148,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" dependencies = [ "serde", - "serde_spanned", - "toml_datetime", + "serde_spanned 0.6.8", + "toml_datetime 0.6.8", "toml_edit", ] +[[package]] +name = "toml" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" +dependencies = [ + "indexmap 2.12.1", + "serde_core", + "serde_spanned 1.0.3", + "toml_datetime 0.7.3", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" version = "0.6.8" @@ -5136,19 +5177,43 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_edit" version = "0.22.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02a8b472d1a3d7c18e2d61a489aee3453fd9031c33e4f55bd533f4a7adca1bee" dependencies = [ - "indexmap 2.7.1", + "indexmap 2.12.1", "serde", - "serde_spanned", - "toml_datetime", + "serde_spanned 0.6.8", + "toml_datetime 0.6.8", "winnow", ] +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" + [[package]] name = "tonic" version = "0.12.3" @@ -5257,7 +5322,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.7.1", + "indexmap 2.12.1", "pin-project-lite", "slab", "sync_wrapper 1.0.2", @@ -6061,9 +6126,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.7.0" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e49d2d35d3fad69b39b94139037ecfb4f359f08958b9c11e7315ce770462419" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" dependencies = [ "memchr", ] @@ -6219,7 +6284,7 @@ dependencies = [ "crossbeam-utils", "displaydoc", "flate2", - "indexmap 2.7.1", + "indexmap 2.12.1", "memchr", "thiserror 2.0.11", "zopfli", diff --git a/Cargo.toml b/Cargo.toml index c1bfd5d86..382011516 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ erased-serde = "=0.3.16" serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" serde_repr = "0.1.17" +toml = "0.9.8" # Async and Runtime async-trait = "0.1" diff --git a/README.md b/README.md index 499cdcc88..72c8fdbcc 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,26 @@ curl --location --request POST 'http://localhost:8000/api/v1/ingest' \ Access the UI at [http://localhost:8000 ↗︎](http://localhost:8000). You can login to the dashboard default credentials `admin`, `admin`. +### Configure with TOML + +We can keep Parseable configuration in a TOML file instead of managing dozens of environment variables. When a file named `parseable.toml` is present in the working directory, Parseable automatically loads it; you can also point to a different file via `parseable --config-file /path/to/config.toml` (or by setting `P_CONFIG_FILE`). + +Example: + +```toml +# parseable.toml +storage = "s3-store" + +[env] +P_S3_URL = "https://s3.amazonaws.com" +P_S3_REGION = "us-east-1" +P_S3_BUCKET = "my-observability-bucket" +P_USERNAME = "admin" +P_PASSWORD = "super-secret" +``` + +Values from the TOML file are written to the corresponding `P_*` environment variables unless they are already set in the shell. Command-line flags still take precedence over everything else, so you can override individual fields without editing the file. + ## Getting started :bulb: For quickstart, refer the [quickstart section ↗︎](#quickstart-zap). diff --git a/src/cli.rs b/src/cli.rs index 4f4d4d219..e41bdf4eb 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -66,6 +66,16 @@ Join the community at https://logg.ing/community. subcommand_required = true, )] pub struct Cli { + #[arg( + long = "config-file", + short = 'c', + env = "P_CONFIG_FILE", + value_name = "path", + global = true, + value_parser = validation::file_path, + help = "Path to a TOML file containing Parseable environment defaults" + )] + pub config_file: Option, #[command(subcommand)] pub storage: StorageOptions, } diff --git a/src/config_loader.rs b/src/config_loader.rs new file mode 100644 index 000000000..a5a6ed681 --- /dev/null +++ b/src/config_loader.rs @@ -0,0 +1,277 @@ +use std::{ + collections::BTreeMap, + env as std_env, + ffi::{OsStr, OsString}, + fs, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result, anyhow}; +use clap::{Error, Parser, error::ErrorKind}; +#[cfg(test)] +use once_cell::sync::Lazy; +use serde::Deserialize; +use toml::Value; + +use crate::cli::Cli; + +/// Name of the default config file that will be auto-discovered in the +/// working directory when present. +const DEFAULT_CONFIG_FILENAME: &str = "parseable.toml"; + +/// Known storage subcommands supported by Parseable. +const STORAGE_SUBCOMMANDS: &[&str] = &["local-store", "s3-store", "blob-store", "gcs-store"]; + +/// Public entry point that loads the CLI after applying config-driven +/// environment overrides. +pub fn parse_cli_with_config() -> Cli { + let raw_args: Vec = std_env::args_os().collect(); + let adjusted_args = apply_config(raw_args) + .unwrap_or_else(|err| Error::raw(ErrorKind::Io, err.to_string()).exit()); + + Cli::parse_from(adjusted_args) +} + +#[derive(Debug, Deserialize, Default)] +struct FileConfig { + storage: Option, + #[serde(default)] + env: BTreeMap, + #[serde(flatten)] + #[serde(default)] + inline_env: BTreeMap, +} + +fn set_env_var, V: AsRef>(key: K, value: V) { + // SAFETY: std::env marks mutations as unsafe because concurrent writes can + // lead to data races. We invoke these helpers before worker threads start + // (or under a test mutex), matching std's documented safety guarantee. + unsafe { std_env::set_var(key, value) } +} + +#[cfg(test)] +fn remove_env_var>(key: K) { + unsafe { std_env::remove_var(key) } +} + +/// Ensures our tests do not fight over global environment state. +#[cfg(test)] +static TEST_ENV_GUARD: Lazy> = Lazy::new(|| std::sync::Mutex::new(())); + +fn apply_config(mut args: Vec) -> Result> { + let Some((config_path, source)) = locate_config_file(&args)? else { + return Ok(args); + }; + + let contents = fs::read_to_string(&config_path).with_context(|| { + format!( + "Failed to read config file `{}` (source: {:?})", + config_path.display(), + source, + ) + })?; + + let FileConfig { + storage, + mut env, + inline_env, + } = toml::from_str::(&contents).with_context(|| { + format!( + "Failed to parse config file `{}` (source: {:?})", + config_path.display(), + source, + ) + })?; + + env.extend(inline_env); + apply_env_overrides(&config_path, env); + + // Record the resolved config path so it shows up in CLI help / telemetry. + if std_env::var_os("P_CONFIG_FILE").is_none() { + set_env_var("P_CONFIG_FILE", &config_path); + } + + if should_inject_storage(&args) { + if let Some(storage_name) = storage { + args.push(OsString::from(storage_name)); + } else { + return Err(anyhow!( + "No storage backend provided via CLI arguments or `storage` key in `{}`", + config_path.display() + )); + } + } + + Ok(args) +} + +#[derive(Debug, Clone, Copy)] +enum ConfigSource { + Cli, + Env, + Default, +} +fn locate_config_file(args: &[OsString]) -> Result> { + if let Some(path) = config_from_args(args)? { + return Ok(Some((path, ConfigSource::Cli))); + } + + if let Some(env_path) = std_env::var_os("P_CONFIG_FILE") { + let path = PathBuf::from(env_path); + return Ok(Some((path, ConfigSource::Env))); + } + + let default_path = PathBuf::from(DEFAULT_CONFIG_FILENAME); + if default_path.is_file() { + return Ok(Some((default_path, ConfigSource::Default))); + } + + Ok(None) +} + +fn config_from_args(args: &[OsString]) -> Result> { + let mut iter = args.iter(); + // skip binary name + iter.next(); + + while let Some(raw_arg) = iter.next() { + let arg = raw_arg.to_string_lossy(); + if arg == "--config-file" || arg == "-c" { + let value = iter + .next() + .ok_or_else(|| anyhow!("`{arg}` expects a file path to follow"))?; + return Ok(Some(PathBuf::from(value))); + } + + if let Some(path) = arg.strip_prefix("--config-file=") { + return Ok(Some(PathBuf::from(path))); + } + + if let Some(path) = arg.strip_prefix("-c=") { + return Ok(Some(PathBuf::from(path))); + } + } + + Ok(None) +} + +fn apply_env_overrides(config_path: &Path, env_map: BTreeMap) { + for (key, value) in env_map { + if !looks_like_env_var(&key) { + eprintln!( + "Warning: Ignoring key `{}` in `{}` because it does not look like an env variable", + key, + config_path.display() + ); + continue; + } + if std_env::var_os(&key).is_some() { + continue; + } + match value_to_env_string(value) { + Some(serialized) => set_env_var(&key, serialized), + None => eprintln!( + "Warning: Ignoring key `{}` in `{}` because nested tables are not supported", + key, + config_path.display() + ), + } + } +} + +fn value_to_env_string(value: Value) -> Option { + match value { + Value::String(s) => Some(s), + Value::Integer(i) => Some(i.to_string()), + Value::Float(f) => Some(f.to_string()), + Value::Boolean(b) => Some(b.to_string()), + Value::Datetime(dt) => Some(dt.to_string()), + Value::Array(values) => Some(Value::Array(values).to_string()), + Value::Table(_) => None, + } +} + +fn should_inject_storage(args: &[OsString]) -> bool { + !has_storage_subcommand(args) && !is_help_or_version_call(args) +} + +fn has_storage_subcommand(args: &[OsString]) -> bool { + args.iter().skip(1).any(|arg| { + let value = arg.to_string_lossy(); + STORAGE_SUBCOMMANDS.contains(&value.as_ref()) + }) +} + +fn is_help_or_version_call(args: &[OsString]) -> bool { + args.iter().skip(1).any(|arg| { + matches!( + arg.to_string_lossy().as_ref(), + "-h" | "--help" | "help" | "-V" | "--version" | "version" + ) + }) +} + +fn looks_like_env_var(key: &str) -> bool { + !key.is_empty() + && key + .chars() + .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_') +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ffi::OsString, fs::File, io::Write}; + + fn build_config(contents: &str) -> tempfile::TempDir { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("config.toml"); + let mut file = File::create(&path).unwrap(); + file.write_all(contents.as_bytes()).unwrap(); + set_env_var("P_CONFIG_FILE", &path); + dir + } + + #[test] + fn config_sets_env_and_storage() { + let _guard = TEST_ENV_GUARD.lock().unwrap(); + remove_env_var("P_USERNAME"); + + let dir = build_config( + r#" + storage = "local-store" + P_USERNAME = "alice" + "#, + ); + + let args = vec![OsString::from("parseable")]; + let updated = apply_config(args).unwrap(); + assert_eq!(updated.len(), 2); + assert_eq!(updated[1], OsString::from("local-store")); + assert_eq!(std_env::var("P_USERNAME").unwrap(), "alice"); + + drop(dir); + remove_env_var("P_CONFIG_FILE"); + remove_env_var("P_USERNAME"); + } + + #[test] + fn cli_subcommand_not_overridden() { + let _guard = TEST_ENV_GUARD.lock().unwrap(); + remove_env_var("P_CONFIG_FILE"); + let dir = build_config( + r#" + storage = "local-store" + P_USERNAME = "bob" + "#, + ); + + let args = vec![OsString::from("parseable"), OsString::from("s3-store")]; + let updated = apply_config(args).unwrap(); + assert_eq!(updated[1], OsString::from("s3-store")); + assert!(std_env::var("P_USERNAME").is_ok()); + + drop(dir); + remove_env_var("P_CONFIG_FILE"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 9493937cf..f4219d6bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ pub mod analytics; pub mod banner; pub mod catalog; mod cli; +mod config_loader; #[cfg(feature = "kafka")] pub mod connectors; pub mod correlation; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d8cba4aea..337a0bdeb 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -29,7 +29,7 @@ use actix_web::http::header::HeaderMap; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; -use clap::{Parser, error::ErrorKind}; +use clap::error::ErrorKind; use http::{HeaderName, HeaderValue, StatusCode, header::CONTENT_TYPE}; use once_cell::sync::Lazy; pub use staging::StagingError; @@ -41,7 +41,8 @@ use tracing::error; #[cfg(feature = "kafka")] use crate::connectors::kafka::config::KafkaConfig; use crate::{ - cli::{Cli, Options, StorageOptions}, + cli::{Options, StorageOptions}, + config_loader, event::{ commit_schema, format::{LogSource, LogSourceEntry}, @@ -88,75 +89,79 @@ pub const JOIN_COMMUNITY: &str = pub const STREAM_EXISTS: &str = "Stream exists"; /// Shared state of the Parseable server. -pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage { - StorageOptions::Local(args) => { - if args.options.staging_dir() == &args.storage.root { - clap::Error::raw( - ErrorKind::ValueValidation, - "Cannot use same path for storage and staging", +pub static PARSEABLE: Lazy = Lazy::new(|| { + let cli = config_loader::parse_cli_with_config(); + + match cli.storage { + StorageOptions::Local(args) => { + if args.options.staging_dir() == &args.storage.root { + clap::Error::raw( + ErrorKind::ValueValidation, + "Cannot use same path for storage and staging", + ) + .exit(); + } + + if args.options.hot_tier_storage_path.is_some() { + clap::Error::raw( + ErrorKind::ValueValidation, + "Cannot use hot tier with local-store subcommand.", + ) + .exit(); + } + + // for now create a metastore without using a CLI arg + let metastore = ObjectStoreMetastore { + storage: args.storage.construct_client(), + }; + + Parseable::new( + args.options, + #[cfg(feature = "kafka")] + args.kafka, + Arc::new(args.storage), + Arc::new(metastore), ) - .exit(); } - - if args.options.hot_tier_storage_path.is_some() { - clap::Error::raw( - ErrorKind::ValueValidation, - "Cannot use hot tier with local-store subcommand.", + StorageOptions::S3(args) => { + // for now create a metastore without using a CLI arg + let metastore = ObjectStoreMetastore { + storage: args.storage.construct_client(), + }; + Parseable::new( + args.options, + #[cfg(feature = "kafka")] + args.kafka, + Arc::new(args.storage), + Arc::new(metastore), + ) + } + StorageOptions::Blob(args) => { + // for now create a metastore without using a CLI arg + let metastore = ObjectStoreMetastore { + storage: args.storage.construct_client(), + }; + Parseable::new( + args.options, + #[cfg(feature = "kafka")] + args.kafka, + Arc::new(args.storage), + Arc::new(metastore), + ) + } + StorageOptions::Gcs(args) => { + // for now create a metastore without using a CLI arg + let metastore = ObjectStoreMetastore { + storage: args.storage.construct_client(), + }; + Parseable::new( + args.options, + #[cfg(feature = "kafka")] + args.kafka, + Arc::new(args.storage), + Arc::new(metastore), ) - .exit(); } - - // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; - - Parseable::new( - args.options, - #[cfg(feature = "kafka")] - args.kafka, - Arc::new(args.storage), - Arc::new(metastore), - ) - } - StorageOptions::S3(args) => { - // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; - Parseable::new( - args.options, - #[cfg(feature = "kafka")] - args.kafka, - Arc::new(args.storage), - Arc::new(metastore), - ) - } - StorageOptions::Blob(args) => { - // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; - Parseable::new( - args.options, - #[cfg(feature = "kafka")] - args.kafka, - Arc::new(args.storage), - Arc::new(metastore), - ) - } - StorageOptions::Gcs(args) => { - // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; - Parseable::new( - args.options, - #[cfg(feature = "kafka")] - args.kafka, - Arc::new(args.storage), - Arc::new(metastore), - ) } }); From b09645f55364bb2b7416d782c06f793727a6b823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cniladrix719=E2=80=9D?= Date: Sun, 23 Nov 2025 20:27:39 +0530 Subject: [PATCH 2/2] fix: Ensure thread-safe environment variable mutations with mutex lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “niladrix719” --- src/config_loader.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/config_loader.rs b/src/config_loader.rs index a5a6ed681..d70823a55 100644 --- a/src/config_loader.rs +++ b/src/config_loader.rs @@ -4,11 +4,11 @@ use std::{ ffi::{OsStr, OsString}, fs, path::{Path, PathBuf}, + sync::Mutex, }; use anyhow::{Context, Result, anyhow}; use clap::{Error, Parser, error::ErrorKind}; -#[cfg(test)] use once_cell::sync::Lazy; use serde::Deserialize; use toml::Value; @@ -32,6 +32,8 @@ pub fn parse_cli_with_config() -> Cli { Cli::parse_from(adjusted_args) } +static ENV_LOCK: Lazy> = Lazy::new(|| Mutex::new(())); + #[derive(Debug, Deserialize, Default)] struct FileConfig { storage: Option, @@ -43,14 +45,16 @@ struct FileConfig { } fn set_env_var, V: AsRef>(key: K, value: V) { + let _guard = ENV_LOCK.lock().unwrap(); // SAFETY: std::env marks mutations as unsafe because concurrent writes can - // lead to data races. We invoke these helpers before worker threads start - // (or under a test mutex), matching std's documented safety guarantee. + // lead to data races. Serializing through ENV_LOCK ensures only one thread + // mutates the environment at a time, matching std's documented guarantee. unsafe { std_env::set_var(key, value) } } #[cfg(test)] fn remove_env_var>(key: K) { + let _guard = ENV_LOCK.lock().unwrap(); unsafe { std_env::remove_var(key) } }