diff --git a/Cargo.toml b/Cargo.toml index 775f17177..9e9882761 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,9 @@ secrecy = { version = "0.10.3", default-features = false } sentry = { version = "0.42.0" } serde = { version = "1.0.219", default-features = false } serde_json = { version = "1.0.141", default-features = false } +serde_yaml = { version = "0.9.34", default-features = false } sqlx = { version = "0.8.6", default-features = false } +tempfile = { version = "3.23.0", default-features = false } thiserror = "2.0.12" tokio = { version = "1.47.0", default-features = false } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" } diff --git a/etl-api/Dockerfile b/etl-api/Dockerfile index 02e92a6f1..cf4788b18 100644 --- a/etl-api/Dockerfile +++ b/etl-api/Dockerfile @@ -43,9 +43,8 @@ WORKDIR /app # Create non-root user (distroless already has nonroot user) USER nonroot:nonroot -# Copy binary and configuration +# Copy binary; configuration must be provided at runtime via mounted volume or environment overrides COPY --from=builder /app/target/release/etl-api ./etl-api -COPY --chown=nonroot:nonroot etl-api/configuration ./configuration # Use exec form for proper signal handling ENTRYPOINT ["./etl-api"] diff --git a/etl-api/README.md b/etl-api/README.md index ae93248e2..5530b15d4 100644 --- a/etl-api/README.md +++ b/etl-api/README.md @@ -46,6 +46,34 @@ export DATABASE_URL=postgres://USER:PASSWORD@HOST:PORT/DB sqlx migrate run --source etl-api/migrations ``` +## Configuration + +### Configuration Directory + +The configuration directory is determined by: +- **`APP_CONFIG_DIR`** environment variable: If set, use this absolute path as the configuration directory +- **Fallback**: `configuration/` directory relative to the binary location + +Configuration files are loaded in this order: +1. `base.(yaml|yml|json)` - Base configuration for all environments +2. `{environment}.(yaml|yml|json)` - Environment-specific overrides (environment defaults to `prod` unless `APP_ENVIRONMENT` is set to `dev`, `staging`, or `prod`) +3. `APP_`-prefixed environment variables - Runtime overrides (nested keys use `__`, lists split on `,`) + +### Examples + +Using default configuration directory: +```bash +# Looks for configuration files in ./configuration/ +./etl-api +``` + +Using custom configuration directory: +```bash +# Looks for configuration files in /etc/etl-api/config/ +export APP_CONFIG_DIR=/etc/etl-api/config +./etl-api +``` + ## Development ### Database Migrations diff --git a/etl-api/configuration/staging.yaml b/etl-api/configuration/staging.yaml new file mode 100644 index 000000000..50159d43a --- /dev/null +++ b/etl-api/configuration/staging.yaml @@ -0,0 +1,19 @@ +database: + host: "localhost" + port: 5430 + name: "postgres" + username: "postgres" + password: "postgres" + tls: + enabled: false + trusted_root_certs: "" + require_ssl: false +application: + host: "127.0.0.1" + port: 8000 +encryption_key: + id: 0 + key: BlK9AlrzqRnCZy53j42uE1p2qGBiF7HYZjZYFaZObqg= +api_keys: + - XOUbHmWbt9h7nWl15wWwyWQnctmFGNjpawMc3lT5CFs= + - jD3rU2aYq7nVp1mWb4sTxQ9PZkHcGdR8eM5oLfNhJ0s= diff --git a/etl-api/src/k8s/base.rs b/etl-api/src/k8s/base.rs index 8e93ef125..0449a4d42 100644 --- a/etl-api/src/k8s/base.rs +++ b/etl-api/src/k8s/base.rs @@ -5,29 +5,47 @@ use thiserror::Error; use crate::configs::{destination::StoredDestinationConfig, log::LogLevel}; -/// Errors emitted by the Kubernetes integration. +/// Errors from Kubernetes operations. /// -/// Variants wrap lower-level libraries where appropriate to preserve context. +/// Wraps underlying library errors to preserve context and provide a unified error type +/// for all Kubernetes interactions. #[derive(Debug, Error)] pub enum K8sError { - /// A serialization or deserialization error while building or parsing - /// Kubernetes resources. + /// Serialization or deserialization failed when building or parsing Kubernetes resources. #[error("An error occurred in serde when dealing with K8s: {0}")] Serde(#[from] serde_json::error::Error), - /// An error returned by the [`kube`] client when talking to the API - /// server. + /// The [`kube`] client returned an error when communicating with the API server. #[error("An error occurred with kube when dealing with K8s: {0}")] Kube(#[from] kube::Error), } +/// A file to be stored in a [`ConfigMap`] that is used to configure a replicator. +/// +/// Each file becomes a key-value pair in the config map's data section. +#[derive(Debug, Clone)] +pub struct ReplicatorConfigMapFile { + /// The filename to use as the key in the config map. + pub filename: String, + /// The file content to store. + pub content: String, +} + +/// The type of destination storage system for replication. +/// +/// Determines which destination-specific resources and configurations are created +/// when deploying a replicator. #[derive(Debug, Clone, Copy)] pub enum DestinationType { + /// In-memory storage destination. Memory, + /// Google BigQuery destination. BigQuery, + /// Apache Iceberg destination. Iceberg, } impl From<&StoredDestinationConfig> for DestinationType { + /// Extracts the destination type from a stored configuration. fn from(value: &StoredDestinationConfig) -> DestinationType { match value { StoredDestinationConfig::Memory => DestinationType::Memory, @@ -37,23 +55,28 @@ impl From<&StoredDestinationConfig> for DestinationType { } } -/// A simplified view of a pod phase. +/// A subset of Kubernetes pod phases relevant to the API. /// -/// This mirrors the string phases reported by Kubernetes but only tracks the -/// states needed by the API. Unknown values map to [`PodPhase::Unknown`]. +/// Maps the standard Kubernetes pod phase strings to a simplified enum. +/// Unrecognized phases are represented as [`PodPhase::Unknown`]. #[derive(Debug)] pub enum PodPhase { + /// Pod is waiting to be scheduled or for containers to start. Pending, + /// Pod is bound to a node and at least one container is running. Running, + /// All containers in the pod have terminated successfully. Succeeded, + /// All containers have terminated and at least one failed. Failed, + /// The pod phase could not be determined or is not recognized. Unknown, } impl From<&str> for PodPhase { - /// Converts a Kubernetes pod phase string into a [`PodPhase`]. + /// Parses a Kubernetes pod phase string into a [`PodPhase`]. /// - /// Unrecognized values result in [`PodPhase::Unknown`]. + /// Returns [`PodPhase::Unknown`] for unrecognized values. fn from(value: &str) -> Self { match value { "Pending" => PodPhase::Pending, @@ -65,50 +88,53 @@ impl From<&str> for PodPhase { } } -/// A pod's status which takes into account whether a deletion has been -/// requested, the pod's actual status and if the pod exited with an error -/// to determine the its current state +/// The derived status of a replicator pod. +/// +/// Combines the pod's phase, deletion timestamp, and exit status to determine +/// the operational state from the API's perspective. pub enum PodStatus { + /// Pod has successfully stopped and no longer exists. Stopped, + /// Pod is pending or initializing. Starting, + /// Pod is running and ready. Started, + /// Pod is terminating after a deletion request. Stopping, + /// Pod failed to start or exited with an error. Failed, + /// Pod status could not be determined. Unknown, } -/// Client interface describing the Kubernetes operations used by the API. +/// Operations for managing Kubernetes resources required by replicators. /// -/// Implementations are expected to be idempotent where possible by issuing -/// server-side apply patches for create-or-update behaviors. +/// Methods use server-side apply patches to provide idempotent create-or-update semantics +/// where possible. All operations target the data-plane namespace unless otherwise specified. #[async_trait] pub trait K8sClient: Send + Sync { /// Creates or updates the Postgres password secret for a replicator. /// - /// The secret name is derived from `prefix` and is stored in the - /// data-plane namespace. + /// The secret name is derived from `prefix` and stored in the data-plane namespace. async fn create_or_update_postgres_secret( &self, prefix: &str, postgres_password: &str, ) -> Result<(), K8sError>; - /// Creates or updates the BigQuery service account secret for a - /// replicator. + /// Creates or updates the BigQuery service account secret for a replicator. /// - /// The secret name is derived from `prefix` and is stored in the - /// data-plane namespace. - async fn create_or_update_bq_secret( + /// The secret name is derived from `prefix` and stored in the data-plane namespace. + async fn create_or_update_bigquery_secret( &self, prefix: &str, bq_service_account_key: &str, ) -> Result<(), K8sError>; - /// Creates or updates the Iceberg's secret for a replicator. - /// The secret includes catalog toke, s3 access key id and secret. + /// Creates or updates the Iceberg credentials secret for a replicator. /// - /// The secret name is derived from `prefix` and is stored in the - /// data-plane namespace. + /// The secret contains the catalog token, S3 access key ID, and S3 secret access key. + /// The secret name is derived from `prefix` and stored in the data-plane namespace. async fn create_or_update_iceberg_secret( &self, prefix: &str, @@ -117,39 +143,45 @@ pub trait K8sClient: Send + Sync { s3_secret_access_key: &str, ) -> Result<(), K8sError>; - /// Deletes the Postgres password secret for a replicator if it exists. + /// Deletes the Postgres password secret for a replicator. + /// + /// Does nothing if the secret does not exist. async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>; - /// Deletes the BigQuery service account secret for a replicator if it - /// exists. - async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>; + /// Deletes the BigQuery service account secret for a replicator. + /// + /// Does nothing if the secret does not exist. + async fn delete_bigquery_secret(&self, prefix: &str) -> Result<(), K8sError>; - /// Deletes the iceberg secret for a replicator if it exists. + /// Deletes the Iceberg credentials secret for a replicator. + /// + /// Does nothing if the secret does not exist. async fn delete_iceberg_secret(&self, prefix: &str) -> Result<(), K8sError>; - /// Retrieves a named [`ConfigMap`]. + /// Retrieves a [`ConfigMap`] by name from the data-plane namespace. async fn get_config_map(&self, config_map_name: &str) -> Result; /// Creates or updates the replicator configuration [`ConfigMap`]. /// - /// The config map stores two YAML documents: a base and a environment specific - /// override. - async fn create_or_update_config_map( + /// Accepts a list of files to store in the config map. Each file's filename + /// becomes a key in the config map's data section with the content as its value. + /// The config map name is derived from `prefix`. + async fn create_or_update_replicator_config_map( &self, prefix: &str, - base_config: &str, - env_config: &str, - environment: Environment, + files: Vec, ) -> Result<(), K8sError>; - /// Deletes the replicator configuration [`ConfigMap`] if it exists. - async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>; + /// Deletes the replicator configuration [`ConfigMap`]. + /// + /// Does nothing if the config map does not exist. + async fn delete_replicator_config_map(&self, prefix: &str) -> Result<(), K8sError>; /// Creates or updates the replicator [`StatefulSet`]. /// - /// The set references previously created secrets and config maps. Optional - /// `template_annotations` may be used to trigger a rolling restart. - async fn create_or_update_stateful_set( + /// The stateful set references secrets and config maps created by other methods. + /// Changing the configuration may trigger a rolling restart of the pods. + async fn create_or_update_replicator_stateful_set( &self, prefix: &str, replicator_image: &str, @@ -158,9 +190,14 @@ pub trait K8sClient: Send + Sync { log_level: LogLevel, ) -> Result<(), K8sError>; - /// Deletes the replicator [`StatefulSet`] if it exists. - async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>; + /// Deletes the replicator [`StatefulSet`]. + /// + /// Does nothing if the stateful set does not exist. + async fn delete_replicator_stateful_set(&self, prefix: &str) -> Result<(), K8sError>; - /// Returns the status of the replicator pod. - async fn get_pod_status(&self, prefix: &str) -> Result; + /// Retrieves the current status of a replicator pod. + /// + /// Returns a [`PodStatus`] derived from the pod's phase, deletion timestamp, + /// and exit status. + async fn get_replicator_pod_status(&self, prefix: &str) -> Result; } diff --git a/etl-api/src/k8s/core.rs b/etl-api/src/k8s/core.rs index 6de70f0d0..600df867c 100644 --- a/etl-api/src/k8s/core.rs +++ b/etl-api/src/k8s/core.rs @@ -12,7 +12,7 @@ use crate::db::pipelines::Pipeline; use crate::db::replicators::Replicator; use crate::db::sources::Source; use crate::k8s::http::{TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME}; -use crate::k8s::{DestinationType, K8sClient}; +use crate::k8s::{DestinationType, K8sClient, ReplicatorConfigMapFile}; use crate::routes::pipelines::PipelineError; /// Secret types required by different destination configurations. @@ -233,7 +233,7 @@ async fn create_or_update_dynamic_replicator_secrets( .create_or_update_postgres_secret(prefix, &postgres_password) .await?; k8s_client - .create_or_update_bq_secret(prefix, &big_query_service_account_key) + .create_or_update_bigquery_secret(prefix, &big_query_service_account_key) .await?; } Secrets::Iceberg { @@ -270,11 +270,23 @@ async fn create_or_update_replicator_config( config: ReplicatorConfigWithoutSecrets, environment: Environment, ) -> Result<(), PipelineError> { - // For now the base config is empty. - let base_config = ""; let env_config = serde_json::to_string(&config)?; + + let files = vec![ + ReplicatorConfigMapFile { + filename: "base.json".to_string(), + // For our setup, we don't need to add config params to the base config file; everything + // is added directly in the environment-specific config file. + content: "{}".to_owned(), + }, + ReplicatorConfigMapFile { + filename: format!("{environment}.json"), + content: env_config, + }, + ]; + k8s_client - .create_or_update_config_map(prefix, base_config, &env_config, environment) + .create_or_update_replicator_config_map(prefix, files) .await?; Ok(()) @@ -294,7 +306,7 @@ async fn create_or_update_replicator_stateful_set( log_level: LogLevel, ) -> Result<(), PipelineError> { k8s_client - .create_or_update_stateful_set( + .create_or_update_replicator_stateful_set( prefix, &replicator_image, environment, @@ -324,7 +336,7 @@ async fn delete_dynamic_replicator_secrets( // then there's a risk of wrong secret type being attempted for deletion which might leave // the actual secret behind. So for simplicty we just delete both kinds of secrets. The // one which doesn't exist will be safely ignored. - k8s_client.delete_bq_secret(prefix).await?; + k8s_client.delete_bigquery_secret(prefix).await?; k8s_client.delete_iceberg_secret(prefix).await?; Ok(()) @@ -335,7 +347,7 @@ async fn delete_replicator_config( k8s_client: &dyn K8sClient, prefix: &str, ) -> Result<(), PipelineError> { - k8s_client.delete_config_map(prefix).await?; + k8s_client.delete_replicator_config_map(prefix).await?; Ok(()) } @@ -345,7 +357,7 @@ async fn delete_replicator_stateful_set( k8s_client: &dyn K8sClient, prefix: &str, ) -> Result<(), PipelineError> { - k8s_client.delete_stateful_set(prefix).await?; + k8s_client.delete_replicator_stateful_set(prefix).await?; Ok(()) } diff --git a/etl-api/src/k8s/http.rs b/etl-api/src/k8s/http.rs index 8b1762b94..d737135a7 100644 --- a/etl-api/src/k8s/http.rs +++ b/etl-api/src/k8s/http.rs @@ -1,5 +1,5 @@ use crate::configs::log::LogLevel; -use crate::k8s::{DestinationType, PodStatus}; +use crate::k8s::{DestinationType, PodStatus, ReplicatorConfigMapFile}; use crate::k8s::{K8sClient, K8sError, PodPhase}; use async_trait::async_trait; use base64::{Engine, prelude::BASE64_STANDARD}; @@ -244,7 +244,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn create_or_update_bq_secret( + async fn create_or_update_bigquery_secret( &self, prefix: &str, bq_service_account_key: &str, @@ -319,7 +319,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_bigquery_secret(&self, prefix: &str) -> Result<(), K8sError> { debug!("deleting bq secret"); let bq_secret_name = create_bq_secret_name(prefix); @@ -354,24 +354,20 @@ impl K8sClient for HttpK8sClient { Ok(config_map) } - async fn create_or_update_config_map( + async fn create_or_update_replicator_config_map( &self, prefix: &str, - base_config: &str, - env_config: &str, - environment: Environment, + files: Vec, ) -> Result<(), K8sError> { debug!("patching config map"); - let env_config_file = format!("{environment}.yaml"); let replicator_config_map_name = create_replicator_config_map_name(prefix); let replicator_app_name = create_replicator_app_name(prefix); + let config_map_json = create_replicator_config_map_json( &replicator_config_map_name, &replicator_app_name, - base_config, - &env_config_file, - env_config, + files, ); let config_map: ConfigMap = serde_json::from_value(config_map_json)?; @@ -386,7 +382,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_replicator_config_map(&self, prefix: &str) -> Result<(), K8sError> { debug!("deleting config map"); let replicator_config_map_name = create_replicator_config_map_name(prefix); @@ -400,7 +396,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn create_or_update_stateful_set( + async fn create_or_update_replicator_stateful_set( &self, prefix: &str, replicator_image: &str, @@ -452,7 +448,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_replicator_stateful_set(&self, prefix: &str) -> Result<(), K8sError> { debug!("deleting stateful set"); let stateful_set_name = create_stateful_set_name(prefix); @@ -464,7 +460,7 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn get_pod_status(&self, prefix: &str) -> Result { + async fn get_replicator_pod_status(&self, prefix: &str) -> Result { debug!("getting pod status"); let pod_name = create_pod_name(prefix); @@ -620,10 +616,13 @@ fn create_iceberg_secret_json( fn create_replicator_config_map_json( config_map_name: &str, replicator_app_name: &str, - base_config: &str, - env_config_file: &str, - env_config: &str, + files: Vec, ) -> serde_json::Value { + let mut data = serde_json::Map::new(); + for file in files { + data.insert(file.filename, serde_json::Value::String(file.content)); + } + json!({ "kind": "ConfigMap", "apiVersion": "v1", @@ -635,10 +634,7 @@ fn create_replicator_config_map_json( "etl.supabase.com/app-type": REPLICATOR_APP_LABEL, } }, - "data": { - "base.yaml": base_config, - env_config_file: env_config, - } + "data": data }) } @@ -1098,7 +1094,6 @@ mod tests { let replicator_config_map_name = create_replicator_config_map_name(&prefix); let replicator_app_name = create_replicator_app_name(&prefix); let environment = Environment::Prod; - let env_config_file = format!("{environment}.yaml"); let base_config = ""; let replicator_config = ReplicatorConfig { destination: DestinationConfig::BigQuery { @@ -1137,12 +1132,21 @@ mod tests { replicator_config.into(); let env_config = serde_json::to_string(&replicator_config_without_secrets).unwrap(); + let files = vec![ + ReplicatorConfigMapFile { + filename: "base.json".to_string(), + content: base_config.to_string(), + }, + ReplicatorConfigMapFile { + filename: format!("{environment}.json"), + content: env_config, + }, + ]; + let config_map_json = create_replicator_config_map_json( &replicator_config_map_name, &replicator_app_name, - base_config, - &env_config_file, - &env_config, + files, ); assert_json_snapshot!(config_map_json); diff --git a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap index 5509bbd70..10a8d4417 100644 --- a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap +++ b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap @@ -5,8 +5,8 @@ expression: config_map_json { "apiVersion": "v1", "data": { - "base.yaml": "", - "prod.yaml": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" + "base.json": "", + "prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" }, "kind": "ConfigMap", "metadata": { diff --git a/etl-api/src/main.rs b/etl-api/src/main.rs index d115364ff..44d33c06f 100644 --- a/etl-api/src/main.rs +++ b/etl-api/src/main.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use etl_api::{config::ApiConfig, startup::Application}; use etl_config::{Environment, load_config, shared::PgConnectionConfig}; use etl_telemetry::tracing::init_tracing; @@ -32,7 +32,8 @@ async fn async_main() -> anyhow::Result<()> { match args.len() { // Run the application server 1 => { - let config = load_config::()?; + let config = load_config::() + .context("loading API configuration for server startup")?; log_pg_connection_config(&config.database); let application = Application::build(config.clone()).await?; application.run_until_stopped().await?; @@ -42,7 +43,8 @@ async fn async_main() -> anyhow::Result<()> { let command = args.nth(1).unwrap(); match command.as_str() { "migrate" => { - let config = load_config::()?; + let config = load_config::() + .context("loading database configuration for migrations")?; log_pg_connection_config(&config); Application::migrate_database(config).await?; info!("database migrated successfully"); diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 4e856090e..f864da1d4 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -885,7 +885,7 @@ pub async fn get_pipeline_status( let prefix = create_k8s_object_prefix(tenant_id, replicator.id); - let pod_status = k8s_client.get_pod_status(&prefix).await?; + let pod_status = k8s_client.get_replicator_pod_status(&prefix).await?; let status = pod_status.into(); let response = GetPipelineStatusResponse { diff --git a/etl-api/tests/support/k8s_client.rs b/etl-api/tests/support/k8s_client.rs index b6722676a..6ccac9b79 100644 --- a/etl-api/tests/support/k8s_client.rs +++ b/etl-api/tests/support/k8s_client.rs @@ -5,7 +5,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use etl_api::configs::log::LogLevel; use etl_api::k8s::http::{TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME}; -use etl_api::k8s::{DestinationType, K8sClient, K8sError, PodStatus}; +use etl_api::k8s::{DestinationType, K8sClient, K8sError, PodStatus, ReplicatorConfigMapFile}; use etl_config::Environment; use k8s_openapi::api::core::v1::ConfigMap; @@ -21,7 +21,7 @@ impl K8sClient for MockK8sClient { Ok(()) } - async fn create_or_update_bq_secret( + async fn create_or_update_bigquery_secret( &self, _prefix: &str, _bq_service_account_key: &str, @@ -43,7 +43,7 @@ impl K8sClient for MockK8sClient { Ok(()) } - async fn delete_bq_secret(&self, _prefix: &str) -> Result<(), K8sError> { + async fn delete_bigquery_secret(&self, _prefix: &str) -> Result<(), K8sError> { Ok(()) } @@ -67,21 +67,19 @@ impl K8sClient for MockK8sClient { } } - async fn create_or_update_config_map( + async fn create_or_update_replicator_config_map( &self, _prefix: &str, - _base_config: &str, - _prod_config: &str, - _environment: Environment, + _files: Vec, ) -> Result<(), K8sError> { Ok(()) } - async fn delete_config_map(&self, _prefix: &str) -> Result<(), K8sError> { + async fn delete_replicator_config_map(&self, _prefix: &str) -> Result<(), K8sError> { Ok(()) } - async fn create_or_update_stateful_set( + async fn create_or_update_replicator_stateful_set( &self, _prefix: &str, _replicator_image: &str, @@ -92,11 +90,11 @@ impl K8sClient for MockK8sClient { Ok(()) } - async fn delete_stateful_set(&self, _prefix: &str) -> Result<(), K8sError> { + async fn delete_replicator_stateful_set(&self, _prefix: &str) -> Result<(), K8sError> { Ok(()) } - async fn get_pod_status(&self, _prefix: &str) -> Result { + async fn get_replicator_pod_status(&self, _prefix: &str) -> Result { Ok(PodStatus::Started) } } diff --git a/etl-config/Cargo.toml b/etl-config/Cargo.toml index 452eda48b..6224737b9 100644 --- a/etl-config/Cargo.toml +++ b/etl-config/Cargo.toml @@ -11,10 +11,15 @@ homepage.workspace = true utoipa = ["dep:utoipa"] [dependencies] -config = { workspace = true, features = ["yaml"] } +config = { workspace = true, features = ["yaml", "json"] } secrecy = { workspace = true, features = ["serde"] } serde = { workspace = true, features = ["derive"] } sqlx = { workspace = true, features = ["postgres"] } thiserror = { workspace = true } tokio-postgres = { workspace = true } utoipa = { workspace = true, optional = true } + +[dev-dependencies] +serde_json = { workspace = true } +serde_yaml = { workspace = true } +tempfile = { workspace = true } diff --git a/etl-config/src/load.rs b/etl-config/src/load.rs index bf358a2c0..cdcffd622 100644 --- a/etl-config/src/load.rs +++ b/etl-config/src/load.rs @@ -1,12 +1,22 @@ +use std::{ + borrow::Cow, + io, + path::{Path, PathBuf}, +}; + use serde::de::DeserializeOwned; +use thiserror::Error; use crate::environment::Environment; -/// Directory containing configuration files relative to application root. +/// Directory containing configuration files relative to the application root. const CONFIGURATION_DIR: &str = "configuration"; -/// Base configuration file loaded for all environments. -const BASE_CONFIG_FILE: &str = "base.yaml"; +/// Environment variable for specifying an absolute path to the configuration directory. +const CONFIG_DIR_ENV_VAR: &str = "APP_CONFIG_DIR"; + +/// Supported extensions for base and environment configuration files. +const CONFIG_FILE_EXTENSIONS: &[&str] = &["yaml", "yml", "json"]; /// Prefix for environment variable configuration overrides. const ENV_PREFIX: &str = "APP"; @@ -15,55 +25,122 @@ const ENV_PREFIX: &str = "APP"; const ENV_PREFIX_SEPARATOR: &str = "_"; /// Separator for nested configuration keys in environment variables. -/// -/// Example: `APP_DATABASE__URL` sets the `database.url` field. const ENV_SEPARATOR: &str = "__"; /// Separator for list elements in environment variables. -/// -/// Example: `APP_API_KEYS=abc,def` sets the `api_keys` array field. const LIST_SEPARATOR: &str = ","; -/// Trait defining the list of keys that should be parsed as lists in a given [`Config`] -/// implementation. +/// Trait implemented by configuration structures that require list parsing help. pub trait Config { - /// Slice containing all the keys that should be parsed as lists when loading the configuration. + /// Keys whose values should be parsed as lists when loading the configuration. const LIST_PARSE_KEYS: &'static [&'static str]; } -/// Loads hierarchical configuration from YAML files and environment variables. +/// Identifies which configuration file is currently being loaded. +#[derive(Debug, Clone, Copy)] +enum ConfigFileKind { + /// Always-present base configuration that every service loads. + Base, + /// Environment-specific overrides (dev/staging/prod). + Environment(Environment), +} + +impl ConfigFileKind { + fn stem(&self) -> Cow<'static, str> { + match self { + ConfigFileKind::Base => Cow::Borrowed("base"), + ConfigFileKind::Environment(env) => Cow::Owned(env.to_string()), + } + } + + /// Returns a static string describing this configuration file kind for error messages. + fn as_str(&self) -> &'static str { + match self { + ConfigFileKind::Base => "base", + ConfigFileKind::Environment(Environment::Dev) => "dev", + ConfigFileKind::Environment(Environment::Staging) => "staging", + ConfigFileKind::Environment(Environment::Prod) => "prod", + } + } +} + +/// Errors that can occur while loading configuration files and overrides. +#[derive(Debug, Error)] +pub enum LoadConfigError { + /// Failed to determine the current working directory. + #[error("failed to determine the current directory")] + CurrentDir(#[source] io::Error), + + /// The configured `configuration` directory does not exist. + #[error("configuration directory `{0}` does not exist")] + MissingConfigurationDirectory(PathBuf), + + /// Could not locate one of the required configuration files. + #[error("could not locate {kind} configuration in `{directory}`; attempted: {attempted}")] + ConfigurationFileMissing { + kind: &'static str, + directory: PathBuf, + attempted: String, + }, + + /// Environment variable overrides failed to merge into the configuration. + #[error("failed to load configuration from environment variables")] + EnvironmentVariables(#[source] config::ConfigError), + + /// The configuration files were parsed but deserialization failed. + #[error("failed to deserialize configuration")] + Deserialization(#[source] config::ConfigError), + + /// Failed to determine the runtime environment (`APP_ENVIRONMENT`). + #[error("failed to determine runtime environment")] + Environment(#[source] io::Error), + + /// Failed to initialize the configuration builder. + #[error("failed to initialize configuration builder")] + Builder(#[source] config::ConfigError), +} + +/// Loads hierarchical configuration from base, environment, and environment-variable sources. /// -/// Loads configuration in this order: -/// 1. Base configuration from `configuration/base.yaml` -/// 2. Environment-specific file from `configuration/{environment}.yaml` -/// 3. Environment variable overrides prefixed with `APP` +/// The configuration directory is determined by: +/// - First checking the `APP_CONFIG_DIR` environment variable for an absolute path +/// - If not set, using `/configuration` /// -/// Nested keys use double underscores: `APP_DATABASE__URL` → `database.url` and lists are separated -/// by `,`. +/// Loads files from `base.(yaml|yml|json)` and `{environment}.(yaml|yml|json)` +/// before applying overrides from `APP_`-prefixed environment variables. /// -/// # Panics -/// Panics if current directory cannot be determined or if `APP_ENVIRONMENT` -/// cannot be parsed. -pub fn load_config() -> Result +/// Nested keys use double underscores (`APP_SERVICE__HOST`), and list values are comma-separated. +pub fn load_config() -> Result where T: Config + DeserializeOwned, { - let base_path = std::env::current_dir().expect("Failed to determine the current directory"); - let configuration_directory = base_path.join(CONFIGURATION_DIR); + let configuration_directory = if let Ok(config_dir) = std::env::var(CONFIG_DIR_ENV_VAR) { + // Use the absolute path provided by APP_CONFIG_DIR + PathBuf::from(config_dir) + } else { + // Fallback to /configuration + let base_path = std::env::current_dir().map_err(LoadConfigError::CurrentDir)?; + base_path.join(CONFIGURATION_DIR) + }; - // Detect the running environment. - // Default to `dev` if unspecified. - let environment = Environment::load().expect("Failed to parse APP_ENVIRONMENT."); + if !configuration_directory.is_dir() { + return Err(LoadConfigError::MissingConfigurationDirectory( + configuration_directory, + )); + } - let environment_filename = format!("{environment}.yaml"); + let environment = Environment::load().map_err(LoadConfigError::Environment)?; + + let base_file = find_configuration_file(&configuration_directory, ConfigFileKind::Base)?; + let environment_file = find_configuration_file( + &configuration_directory, + ConfigFileKind::Environment(environment), + )?; - // We build the environment configuration source. let mut environment_source = config::Environment::with_prefix(ENV_PREFIX) .prefix_separator(ENV_PREFIX_SEPARATOR) .separator(ENV_SEPARATOR); - // If there is a list of keys to parse, we add them to the source and enable parsing with the - // separator. if !T::LIST_PARSE_KEYS.is_empty() { environment_source = environment_source .try_parsing(true) @@ -74,20 +151,237 @@ where } } - let settings = config::Config::builder() - // Add in settings from the base configuration file. - .add_source(config::File::from( - configuration_directory.join(BASE_CONFIG_FILE), - )) - // Add in settings from the environment-specific file. - .add_source(config::File::from( - configuration_directory.join(environment_filename), - )) - // Add in settings from environment variables (with a prefix of APP and '__' as separator) - // E.g. `APP_DESTINATION__BIG_QUERY__PROJECT_ID=my-project-id` sets - // `Settings { destination: BigQuery { project_id } }` to `my-project-id`. - .add_source(environment_source) - .build()?; - - settings.try_deserialize::() + let base_file_source = config::File::from(base_file.clone()); + let environment_file_source = config::File::from(environment_file.clone()); + + let builder = config::Config::builder() + .add_source(base_file_source) + .add_source(environment_file_source) + .add_source(environment_source); + + let settings = builder.build().map_err(LoadConfigError::Builder)?; + + settings + .try_deserialize::() + .map_err(LoadConfigError::Deserialization) +} + +/// Finds the configuration file that matches the requested kind and supported extensions. +fn find_configuration_file( + directory: &Path, + kind: ConfigFileKind, +) -> Result { + let stem = kind.stem(); + let mut attempted_paths = Vec::with_capacity(CONFIG_FILE_EXTENSIONS.len()); + + for extension in CONFIG_FILE_EXTENSIONS { + let path = directory.join(format!("{stem}.{extension}")); + attempted_paths.push(path.clone()); + + if path.is_file() { + return Ok(path); + } + } + + let attempted = attempted_paths + .iter() + .map(|path| format!("`{}`", path.display())) + .collect::>() + .join(", "); + + Err(LoadConfigError::ConfigurationFileMissing { + kind: kind.as_str(), + directory: directory.to_path_buf(), + attempted, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::{Deserialize, Serialize}; + use std::fs; + use tempfile::TempDir; + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + struct ApplicationConfig { + /// Application name. + name: String, + /// Storage mode configuration. + mode: StorageMode, + } + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + #[serde(rename_all = "snake_case")] + enum StorageMode { + Memory, + Disk { path: String, max_size: u64 }, + } + + impl Config for ApplicationConfig { + const LIST_PARSE_KEYS: &'static [&'static str] = &[]; + } + + fn create_mock_config() -> ApplicationConfig { + ApplicationConfig { + name: "test-app".to_string(), + mode: StorageMode::Disk { + path: "/tmp/data".to_string(), + max_size: 1024, + }, + } + } + + fn test_roundtrip_with_extension(extension: &str) { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let original_config = create_mock_config(); + + // Write base file (empty) + let base_file = config_dir.join(format!("base.{extension}")); + let base_content = match extension { + "json" => "{}", + "yaml" | "yml" => "", + _ => panic!("Unsupported extension: {extension}"), + }; + fs::write(&base_file, base_content).unwrap(); + + // Write environment file with actual config + let env_file = config_dir.join(format!("prod.{extension}")); + let env_content = match extension { + "json" => serde_json::to_string_pretty(&original_config).unwrap(), + "yaml" | "yml" => { + // YAML serialization for externally tagged enums doesn't match what config crate expects + // For now, manually construct YAML that config crate can deserialize + format!( + "name: \"{}\"\nmode:\n disk:\n path: \"{}\"\n max_size: {}\n", + original_config.name, + match &original_config.mode { + StorageMode::Disk { path, .. } => path.as_str(), + _ => panic!("Expected Disk variant"), + }, + match &original_config.mode { + StorageMode::Disk { max_size, .. } => max_size, + _ => panic!("Expected Disk variant"), + } + ) + } + _ => panic!("Unsupported extension: {extension}"), + }; + fs::write(&env_file, env_content).unwrap(); + + // Set environment and working directory + unsafe { + std::env::set_var("APP_ENVIRONMENT", "prod"); + } + std::env::set_current_dir(temp_dir.path()).unwrap(); + + // Load config + let loaded_config: ApplicationConfig = load_config().unwrap(); + + // Verify the loaded config matches the original exactly + assert_eq!(loaded_config, original_config); + } + + #[test] + fn test_roundtrip_json() { + test_roundtrip_with_extension("json"); + } + + #[test] + fn test_roundtrip_yaml() { + test_roundtrip_with_extension("yaml"); + } + + #[test] + fn test_roundtrip_yml() { + test_roundtrip_with_extension("yml"); + } + + #[test] + fn test_all_supported_extensions_detected() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + // Test each supported extension is correctly detected + for extension in CONFIG_FILE_EXTENSIONS { + let test_file = config_dir.join(format!("base.{extension}")); + fs::write(&test_file, "{}").unwrap(); + + let result = find_configuration_file(&config_dir, ConfigFileKind::Base); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), test_file); + + // Clean up for next iteration + fs::remove_file(&test_file).unwrap(); + } + } + + #[test] + fn test_app_config_dir_env_var() { + let temp_dir = TempDir::new().unwrap(); + // Note: NOT using "configuration" subdirectory, using a custom path + let custom_config_dir = temp_dir.path().join("my-custom-config"); + fs::create_dir(&custom_config_dir).unwrap(); + + let original_config = create_mock_config(); + + // Write base and environment files + let base_file = custom_config_dir.join("base.json"); + fs::write(&base_file, "{}").unwrap(); + + let env_file = custom_config_dir.join("prod.json"); + let env_content = serde_json::to_string_pretty(&original_config).unwrap(); + fs::write(&env_file, env_content).unwrap(); + + // Set APP_CONFIG_DIR to the custom path (absolute path) + unsafe { + std::env::set_var("APP_CONFIG_DIR", custom_config_dir.to_str().unwrap()); + std::env::set_var("APP_ENVIRONMENT", "prod"); + } + + // Load config - should use APP_CONFIG_DIR, not current_dir/configuration + let loaded_config: ApplicationConfig = load_config().unwrap(); + + // Verify the loaded config matches the original + assert_eq!(loaded_config, original_config); + + // Clean up + unsafe { + std::env::remove_var("APP_CONFIG_DIR"); + } + } + + #[test] + fn test_fallback_to_current_dir_when_app_config_dir_not_set() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let original_config = create_mock_config(); + + // Write base and environment files + let base_file = config_dir.join("base.json"); + fs::write(&base_file, "{}").unwrap(); + + let env_file = config_dir.join("prod.json"); + let env_content = serde_json::to_string_pretty(&original_config).unwrap(); + fs::write(&env_file, env_content).unwrap(); + + // Ensure APP_CONFIG_DIR is not set + unsafe { + std::env::remove_var("APP_CONFIG_DIR"); + std::env::set_var("APP_ENVIRONMENT", "prod"); + } + std::env::set_current_dir(temp_dir.path()).unwrap(); + + // Load config - should fallback to current_dir/configuration + let loaded_config: ApplicationConfig = load_config().unwrap(); + + // Verify the loaded config matches the original + assert_eq!(loaded_config, original_config); + } } diff --git a/etl-replicator/Dockerfile b/etl-replicator/Dockerfile index 56e8134e5..0ec4c8f3b 100644 --- a/etl-replicator/Dockerfile +++ b/etl-replicator/Dockerfile @@ -44,9 +44,8 @@ WORKDIR /app # Create non-root user (distroless already has nonroot user) USER nonroot:nonroot -# Copy binary and configuration +# Copy binary; configuration must be provided at runtime via mounted volume or environment overrides COPY --from=builder /app/target/release/etl-replicator ./etl-replicator -COPY --chown=nonroot:nonroot etl-replicator/configuration ./configuration # Use exec form for proper signal handling -ENTRYPOINT ["./etl-replicator"] \ No newline at end of file +ENTRYPOINT ["./etl-replicator"] diff --git a/etl-replicator/README.md b/etl-replicator/README.md index f6f76db71..60314d3ab 100644 --- a/etl-replicator/README.md +++ b/etl-replicator/README.md @@ -1,3 +1,31 @@ # `etl` - Replicator Long-lived process that performs Postgres logical replication using the `etl` crate. + +## Configuration + +### Configuration Directory + +The configuration directory is determined by: +- **`APP_CONFIG_DIR`** environment variable: If set, use this absolute path as the configuration directory +- **Fallback**: `configuration/` directory relative to the binary location + +Configuration files are loaded in this order: +1. `base.(yaml|yml|json)` - Base configuration for all environments +2. `{environment}.(yaml|yml|json)` - Environment-specific overrides (environment defaults to `prod` unless `APP_ENVIRONMENT` is set to `dev`, `staging`, or `prod`) +3. `APP_`-prefixed environment variables - Runtime overrides (nested keys use `__`, lists are comma-separated) + +### Examples + +Using default configuration directory: +```bash +# Looks for configuration files in ./configuration/ +./etl-replicator +``` + +Using custom configuration directory: +```bash +# Looks for configuration files in /etc/etl/replicator-config/ +export APP_CONFIG_DIR=/etc/etl/replicator-config +./etl-replicator +``` diff --git a/etl-replicator/configuration/staging.yaml b/etl-replicator/configuration/staging.yaml new file mode 100644 index 000000000..3ce27d846 --- /dev/null +++ b/etl-replicator/configuration/staging.yaml @@ -0,0 +1,12 @@ +destination: + memory: +pipeline: + pg_connection: + host: "localhost" + port: 5430 + name: "postgres" + username: "postgres" + password: "postgres" + tls: + trusted_root_certs: "" + enabled: false \ No newline at end of file diff --git a/etl-replicator/src/config.rs b/etl-replicator/src/config.rs index 584129820..b95beb61c 100644 --- a/etl-replicator/src/config.rs +++ b/etl-replicator/src/config.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use etl_config::load_config; use etl_config::shared::ReplicatorConfig; @@ -6,7 +7,7 @@ use etl_config::shared::ReplicatorConfig; /// Uses the standard configuration loading mechanism from [`etl_config`] and /// validates the resulting [`ReplicatorConfig`] before returning it. pub fn load_replicator_config() -> anyhow::Result { - let config = load_config::()?; + let config = load_config::().context("loading replicator configuration")?; config.validate()?; Ok(config)