Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,22 @@ rand = { version = "0.9.2", default-features = false }
reqwest = { version = "0.12.22", default-features = false }
ring = { version = "0.17.14", default-features = false }
rustls = { version = "0.23.31", default-features = false }
rustls-pemfile = { version = "2.2.0", default-features = false }
rustls-pki-types = { version = "1.12.0", default-features = false }
secrecy = { version = "0.10.3", default-features = false }
sentry = { version = "0.42.0" }
serde = { version = "1.0.219", default-features = false }
serde_json = { version = "1.0.141", default-features = false }
serde_yaml = { version = "0.9.34", default-features = false }
sqlx = { version = "0.8.6", default-features = false }
tempfile = { version = "3.23.0", default-features = false }
thiserror = "2.0.12"
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] }
tikv-jemallocator = { version = "0.6.1", default-features = false, features = ["background_threads_runtime_support", "unprefixed_malloc_on_supported_platforms"] }
thiserror = "2.0.12"
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = [
"stats",
] }
tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
"background_threads_runtime_support",
"unprefixed_malloc_on_supported_platforms",
] }
tokio = { version = "1.47.0", default-features = false }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
tokio-rustls = { version = "0.26.2", default-features = false }
Expand Down
38 changes: 19 additions & 19 deletions etl-api/src/k8s/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,26 @@ impl HttpK8sClient {
}

// Waiting state, we want to distinguish normal waiting reasons from abnormal ones.
if let Some(waiting) = &state.waiting {
if let Some(reason) = &waiting.reason {
match reason.as_str() {
// Crash/restart errors
"CrashLoopBackOff" => return true,

// Image-related errors (6 predefined in kubelet)
"ImagePullBackOff"
| "ErrImagePull"
| "ErrImageNeverPull"
| "InvalidImageName"
| "ImageInspectError"
| "RegistryUnavailable" => return true,

// Container creation errors
"CreateContainerConfigError" | "CreateContainerError" | "RunContainerError" => {
return true;
}
_ => {}
if let Some(waiting) = &state.waiting
&& let Some(reason) = &waiting.reason
{
match reason.as_str() {
// Crash/restart errors
"CrashLoopBackOff" => return true,

// Image-related errors (6 predefined in kubelet)
"ImagePullBackOff"
| "ErrImagePull"
| "ErrImageNeverPull"
| "InvalidImageName"
| "ImageInspectError"
| "RegistryUnavailable" => return true,

// Container creation errors
"CreateContainerConfigError" | "CreateContainerError" | "RunContainerError" => {
return true;
}
_ => {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pin-project-lite = { workspace = true }
postgres-replication = { workspace = true }
ring = { workspace = true, default-features = false }
rustls = { workspace = true, features = ["aws-lc-rs", "logging"] }
rustls-pemfile = { workspace = true, features = ["std"] }
rustls-pki-types = { workspace = true, features = ["std"] }
serde_json = { workspace = true, features = ["std"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
Expand Down
15 changes: 15 additions & 0 deletions etl/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,21 @@ impl From<rustls::Error> for EtlError {
}
}

/// Converts [`rustls_pki_types::pem::Error`] to [`EtlError`] with [`ErrorKind::EncryptionError`].
impl From<rustls_pki_types::pem::Error> for EtlError {
#[track_caller]
fn from(err: rustls_pki_types::pem::Error) -> EtlError {
let detail = err.to_string();
let source = Arc::new(err);
EtlError::from_components(
ErrorKind::EncryptionError,
Cow::Borrowed("Failed to parse PEM certificate"),
Some(Cow::Owned(detail)),
Some(source),
)
}
}

/// Converts [`uuid::Error`] to [`EtlError`] with [`ErrorKind::InvalidData`].
impl From<uuid::Error> for EtlError {
#[track_caller]
Expand Down
12 changes: 7 additions & 5 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use etl_postgres::{below_version, requires_version};
use pg_escape::{quote_identifier, quote_literal};
use postgres_replication::LogicalReplicationStream;
use rustls::ClientConfig;
use rustls_pki_types::CertificateDer;
use rustls_pki_types::pem::PemObject;
use std::collections::HashMap;
use std::fmt;
use std::io::BufReader;
use std::num::NonZeroI32;
use std::sync::Arc;

Expand Down Expand Up @@ -223,10 +224,11 @@ impl PgReplicationClient {

let mut root_store = rustls::RootCertStore::empty();
if pg_connection_config.tls.enabled {
let mut root_certs_reader =
BufReader::new(pg_connection_config.tls.trusted_root_certs.as_bytes());
for cert in rustls_pemfile::certs(&mut root_certs_reader) {
let cert = cert?;
let certs = CertificateDer::pem_slice_iter(
pg_connection_config.tls.trusted_root_certs.as_bytes(),
)
.collect::<Result<Vec<_>, _>>()?;
for cert in certs {
root_store.add(cert)?;
}
};
Expand Down