Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ secrecy = { version = "0.10.3", default-features = false }
sentry = { version = "0.42.0" }
serde = { version = "1.0.219", default-features = false }
serde_json = { version = "1.0.141", default-features = false }
serde_yaml = { version = "0.9.34", default-features = false }
sqlx = { version = "0.8.6", default-features = false }
tempfile = { version = "3.23.0", default-features = false }
thiserror = "2.0.12"
tokio = { version = "1.47.0", default-features = false }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
Expand Down
3 changes: 1 addition & 2 deletions etl-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ WORKDIR /app
# Create non-root user (distroless already has nonroot user)
USER nonroot:nonroot

# Copy binary and configuration
# Copy binary; configuration must be provided at runtime via mounted volume or environment overrides
COPY --from=builder /app/target/release/etl-api ./etl-api
COPY --chown=nonroot:nonroot etl-api/configuration ./configuration

# Use exec form for proper signal handling
ENTRYPOINT ["./etl-api"]
28 changes: 28 additions & 0 deletions etl-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,34 @@ export DATABASE_URL=postgres://USER:PASSWORD@HOST:PORT/DB
sqlx migrate run --source etl-api/migrations
```

## Configuration

### Configuration Directory

The configuration directory is determined by:
- **`APP_CONFIG_DIR`** environment variable: If set, use this absolute path as the configuration directory
- **Fallback**: `configuration/` directory relative to the binary location

Configuration files are loaded in this order:
1. `base.(yaml|yml|json)` - Base configuration for all environments
2. `{environment}.(yaml|yml|json)` - Environment-specific overrides (environment defaults to `prod` unless `APP_ENVIRONMENT` is set to `dev`, `staging`, or `prod`)
3. `APP_`-prefixed environment variables - Runtime overrides (nested keys use `__`, lists split on `,`)

### Examples

Using default configuration directory:
```bash
# Looks for configuration files in ./configuration/
./etl-api
```

Using custom configuration directory:
```bash
# Looks for configuration files in /etc/etl-api/config/
export APP_CONFIG_DIR=/etc/etl-api/config
./etl-api
```

## Development

### Database Migrations
Expand Down
19 changes: 19 additions & 0 deletions etl-api/configuration/staging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
database:
host: "localhost"
port: 5430
name: "postgres"
username: "postgres"
password: "postgres"
tls:
enabled: false
trusted_root_certs: ""
require_ssl: false
application:
host: "127.0.0.1"
port: 8000
encryption_key:
id: 0
key: BlK9AlrzqRnCZy53j42uE1p2qGBiF7HYZjZYFaZObqg=
api_keys:
- XOUbHmWbt9h7nWl15wWwyWQnctmFGNjpawMc3lT5CFs=
- jD3rU2aYq7nVp1mWb4sTxQ9PZkHcGdR8eM5oLfNhJ0s=
135 changes: 86 additions & 49 deletions etl-api/src/k8s/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,47 @@ use thiserror::Error;

use crate::configs::{destination::StoredDestinationConfig, log::LogLevel};

/// Errors emitted by the Kubernetes integration.
/// Errors from Kubernetes operations.
///
/// Variants wrap lower-level libraries where appropriate to preserve context.
/// Wraps underlying library errors to preserve context and provide a unified error type
/// for all Kubernetes interactions.
#[derive(Debug, Error)]
pub enum K8sError {
/// A serialization or deserialization error while building or parsing
/// Kubernetes resources.
/// Serialization or deserialization failed when building or parsing Kubernetes resources.
#[error("An error occurred in serde when dealing with K8s: {0}")]
Serde(#[from] serde_json::error::Error),
/// An error returned by the [`kube`] client when talking to the API
/// server.
/// The [`kube`] client returned an error when communicating with the API server.
#[error("An error occurred with kube when dealing with K8s: {0}")]
Kube(#[from] kube::Error),
}

/// A file to be stored in a [`ConfigMap`] that is used to configure a replicator.
///
/// Each file becomes a key-value pair in the config map's data section.
#[derive(Debug, Clone)]
pub struct ReplicatorConfigMapFile {
/// The filename to use as the key in the config map.
pub filename: String,
/// The file content to store.
pub content: String,
}

/// The type of destination storage system for replication.
///
/// Determines which destination-specific resources and configurations are created
/// when deploying a replicator.
#[derive(Debug, Clone, Copy)]
pub enum DestinationType {
/// In-memory storage destination.
Memory,
/// Google BigQuery destination.
BigQuery,
/// Apache Iceberg destination.
Iceberg,
}

impl From<&StoredDestinationConfig> for DestinationType {
/// Extracts the destination type from a stored configuration.
fn from(value: &StoredDestinationConfig) -> DestinationType {
match value {
StoredDestinationConfig::Memory => DestinationType::Memory,
Expand All @@ -37,23 +55,28 @@ impl From<&StoredDestinationConfig> for DestinationType {
}
}

/// A simplified view of a pod phase.
/// A subset of Kubernetes pod phases relevant to the API.
///
/// This mirrors the string phases reported by Kubernetes but only tracks the
/// states needed by the API. Unknown values map to [`PodPhase::Unknown`].
/// Maps the standard Kubernetes pod phase strings to a simplified enum.
/// Unrecognized phases are represented as [`PodPhase::Unknown`].
#[derive(Debug)]
pub enum PodPhase {
/// Pod is waiting to be scheduled or for containers to start.
Pending,
/// Pod is bound to a node and at least one container is running.
Running,
/// All containers in the pod have terminated successfully.
Succeeded,
/// All containers have terminated and at least one failed.
Failed,
/// The pod phase could not be determined or is not recognized.
Unknown,
}

impl From<&str> for PodPhase {
/// Converts a Kubernetes pod phase string into a [`PodPhase`].
/// Parses a Kubernetes pod phase string into a [`PodPhase`].
///
/// Unrecognized values result in [`PodPhase::Unknown`].
/// Returns [`PodPhase::Unknown`] for unrecognized values.
fn from(value: &str) -> Self {
match value {
"Pending" => PodPhase::Pending,
Expand All @@ -65,50 +88,53 @@ impl From<&str> for PodPhase {
}
}

/// A pod's status which takes into account whether a deletion has been
/// requested, the pod's actual status and if the pod exited with an error
/// to determine the its current state
/// The derived status of a replicator pod.
///
/// Combines the pod's phase, deletion timestamp, and exit status to determine
/// the operational state from the API's perspective.
pub enum PodStatus {
/// Pod has successfully stopped and no longer exists.
Stopped,
/// Pod is pending or initializing.
Starting,
/// Pod is running and ready.
Started,
/// Pod is terminating after a deletion request.
Stopping,
/// Pod failed to start or exited with an error.
Failed,
/// Pod status could not be determined.
Unknown,
}

/// Client interface describing the Kubernetes operations used by the API.
/// Operations for managing Kubernetes resources required by replicators.
///
/// Implementations are expected to be idempotent where possible by issuing
/// server-side apply patches for create-or-update behaviors.
/// Methods use server-side apply patches to provide idempotent create-or-update semantics
/// where possible. All operations target the data-plane namespace unless otherwise specified.
#[async_trait]
pub trait K8sClient: Send + Sync {
/// Creates or updates the Postgres password secret for a replicator.
///
/// The secret name is derived from `prefix` and is stored in the
/// data-plane namespace.
/// The secret name is derived from `prefix` and stored in the data-plane namespace.
async fn create_or_update_postgres_secret(
&self,
prefix: &str,
postgres_password: &str,
) -> Result<(), K8sError>;

/// Creates or updates the BigQuery service account secret for a
/// replicator.
/// Creates or updates the BigQuery service account secret for a replicator.
///
/// The secret name is derived from `prefix` and is stored in the
/// data-plane namespace.
async fn create_or_update_bq_secret(
/// The secret name is derived from `prefix` and stored in the data-plane namespace.
async fn create_or_update_bigquery_secret(
&self,
prefix: &str,
bq_service_account_key: &str,
) -> Result<(), K8sError>;

/// Creates or updates the Iceberg's secret for a replicator.
/// The secret includes catalog toke, s3 access key id and secret.
/// Creates or updates the Iceberg credentials secret for a replicator.
///
/// The secret name is derived from `prefix` and is stored in the
/// data-plane namespace.
/// The secret contains the catalog token, S3 access key ID, and S3 secret access key.
/// The secret name is derived from `prefix` and stored in the data-plane namespace.
async fn create_or_update_iceberg_secret(
&self,
prefix: &str,
Expand All @@ -117,39 +143,45 @@ pub trait K8sClient: Send + Sync {
s3_secret_access_key: &str,
) -> Result<(), K8sError>;

/// Deletes the Postgres password secret for a replicator if it exists.
/// Deletes the Postgres password secret for a replicator.
///
/// Does nothing if the secret does not exist.
async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>;

/// Deletes the BigQuery service account secret for a replicator if it
/// exists.
async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>;
/// Deletes the BigQuery service account secret for a replicator.
///
/// Does nothing if the secret does not exist.
async fn delete_bigquery_secret(&self, prefix: &str) -> Result<(), K8sError>;

/// Deletes the iceberg secret for a replicator if it exists.
/// Deletes the Iceberg credentials secret for a replicator.
///
/// Does nothing if the secret does not exist.
async fn delete_iceberg_secret(&self, prefix: &str) -> Result<(), K8sError>;

/// Retrieves a named [`ConfigMap`].
/// Retrieves a [`ConfigMap`] by name from the data-plane namespace.
async fn get_config_map(&self, config_map_name: &str) -> Result<ConfigMap, K8sError>;

/// Creates or updates the replicator configuration [`ConfigMap`].
///
/// The config map stores two YAML documents: a base and a environment specific
/// override.
async fn create_or_update_config_map(
/// Accepts a list of files to store in the config map. Each file's filename
/// becomes a key in the config map's data section with the content as its value.
/// The config map name is derived from `prefix`.
async fn create_or_update_replicator_config_map(
&self,
prefix: &str,
base_config: &str,
env_config: &str,
environment: Environment,
files: Vec<ReplicatorConfigMapFile>,
) -> Result<(), K8sError>;

/// Deletes the replicator configuration [`ConfigMap`] if it exists.
async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>;
/// Deletes the replicator configuration [`ConfigMap`].
///
/// Does nothing if the config map does not exist.
async fn delete_replicator_config_map(&self, prefix: &str) -> Result<(), K8sError>;

/// Creates or updates the replicator [`StatefulSet`].
///
/// The set references previously created secrets and config maps. Optional
/// `template_annotations` may be used to trigger a rolling restart.
async fn create_or_update_stateful_set(
/// The stateful set references secrets and config maps created by other methods.
/// Changing the configuration may trigger a rolling restart of the pods.
async fn create_or_update_replicator_stateful_set(
&self,
prefix: &str,
replicator_image: &str,
Expand All @@ -158,9 +190,14 @@ pub trait K8sClient: Send + Sync {
log_level: LogLevel,
) -> Result<(), K8sError>;

/// Deletes the replicator [`StatefulSet`] if it exists.
async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;
/// Deletes the replicator [`StatefulSet`].
///
/// Does nothing if the stateful set does not exist.
async fn delete_replicator_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;

/// Returns the status of the replicator pod.
async fn get_pod_status(&self, prefix: &str) -> Result<PodStatus, K8sError>;
/// Retrieves the current status of a replicator pod.
///
/// Returns a [`PodStatus`] derived from the pod's phase, deletion timestamp,
/// and exit status.
async fn get_replicator_pod_status(&self, prefix: &str) -> Result<PodStatus, K8sError>;
}
30 changes: 21 additions & 9 deletions etl-api/src/k8s/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::db::pipelines::Pipeline;
use crate::db::replicators::Replicator;
use crate::db::sources::Source;
use crate::k8s::http::{TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME};
use crate::k8s::{DestinationType, K8sClient};
use crate::k8s::{DestinationType, K8sClient, ReplicatorConfigMapFile};
use crate::routes::pipelines::PipelineError;

/// Secret types required by different destination configurations.
Expand Down Expand Up @@ -233,7 +233,7 @@ async fn create_or_update_dynamic_replicator_secrets(
.create_or_update_postgres_secret(prefix, &postgres_password)
.await?;
k8s_client
.create_or_update_bq_secret(prefix, &big_query_service_account_key)
.create_or_update_bigquery_secret(prefix, &big_query_service_account_key)
.await?;
}
Secrets::Iceberg {
Expand Down Expand Up @@ -270,11 +270,23 @@ async fn create_or_update_replicator_config(
config: ReplicatorConfigWithoutSecrets,
environment: Environment,
) -> Result<(), PipelineError> {
// For now the base config is empty.
let base_config = "";
let env_config = serde_json::to_string(&config)?;

let files = vec![
ReplicatorConfigMapFile {
filename: "base.json".to_string(),
// For our setup, we don't need to add config params to the base config file; everything
// is added directly in the environment-specific config file.
content: "{}".to_owned(),
},
ReplicatorConfigMapFile {
filename: format!("{environment}.json"),
content: env_config,
},
];

k8s_client
.create_or_update_config_map(prefix, base_config, &env_config, environment)
.create_or_update_replicator_config_map(prefix, files)
.await?;

Ok(())
Expand All @@ -294,7 +306,7 @@ async fn create_or_update_replicator_stateful_set(
log_level: LogLevel,
) -> Result<(), PipelineError> {
k8s_client
.create_or_update_stateful_set(
.create_or_update_replicator_stateful_set(
prefix,
&replicator_image,
environment,
Expand Down Expand Up @@ -324,7 +336,7 @@ async fn delete_dynamic_replicator_secrets(
// then there's a risk of wrong secret type being attempted for deletion which might leave
// the actual secret behind. So for simplicty we just delete both kinds of secrets. The
// one which doesn't exist will be safely ignored.
k8s_client.delete_bq_secret(prefix).await?;
k8s_client.delete_bigquery_secret(prefix).await?;
k8s_client.delete_iceberg_secret(prefix).await?;

Ok(())
Expand All @@ -335,7 +347,7 @@ async fn delete_replicator_config(
k8s_client: &dyn K8sClient,
prefix: &str,
) -> Result<(), PipelineError> {
k8s_client.delete_config_map(prefix).await?;
k8s_client.delete_replicator_config_map(prefix).await?;

Ok(())
}
Expand All @@ -345,7 +357,7 @@ async fn delete_replicator_stateful_set(
k8s_client: &dyn K8sClient,
prefix: &str,
) -> Result<(), PipelineError> {
k8s_client.delete_stateful_set(prefix).await?;
k8s_client.delete_replicator_stateful_set(prefix).await?;

Ok(())
}
Loading
Loading