From a0eed7fee8c1b7a219195cf18f67d037aa857b00 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Sat, 6 Dec 2025 10:29:50 +0530 Subject: [PATCH 1/2] fix: remove rustls-pemfile in favour of rustls-pki-types See this security advisory: https://rustsec.org/advisories/RUSTSEC-2025-0134 --- Cargo.toml | 13 +++++++++---- etl/Cargo.toml | 2 +- etl/src/error.rs | 15 +++++++++++++++ etl/src/replication/client.rs | 12 +++++++----- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7eac0514..8336895ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ rand = { version = "0.9.2", default-features = false } reqwest = { version = "0.12.22", default-features = false } ring = { version = "0.17.14", default-features = false } rustls = { version = "0.23.31", default-features = false } -rustls-pemfile = { version = "2.2.0", default-features = false } +rustls-pki-types = { version = "1.12.0", default-features = false } secrecy = { version = "0.10.3", default-features = false } sentry = { version = "0.42.0" } serde = { version = "1.0.219", default-features = false } @@ -71,9 +71,14 @@ serde_json = { version = "1.0.141", default-features = false } serde_yaml = { version = "0.9.34", default-features = false } sqlx = { version = "0.8.6", default-features = false } tempfile = { version = "3.23.0", default-features = false } -thiserror = "2.0.12" -tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] } -tikv-jemallocator = { version = "0.6.1", default-features = false, features = ["background_threads_runtime_support", "unprefixed_malloc_on_supported_platforms"] } +thiserror = "2.0.12" +tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = [ + "stats", +] } +tikv-jemallocator = { version = "0.6.1", default-features = false, features = [ + "background_threads_runtime_support", + "unprefixed_malloc_on_supported_platforms", +] } tokio = { version = "1.47.0", default-features = false } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" } tokio-rustls = { version = "0.26.2", default-features = false } diff --git a/etl/Cargo.toml b/etl/Cargo.toml index ad0203254..77460e265 100644 --- a/etl/Cargo.toml +++ b/etl/Cargo.toml @@ -28,7 +28,7 @@ pin-project-lite = { workspace = true } postgres-replication = { workspace = true } ring = { workspace = true, default-features = false } rustls = { workspace = true, features = ["aws-lc-rs", "logging"] } -rustls-pemfile = { workspace = true, features = ["std"] } +rustls-pki-types = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/etl/src/error.rs b/etl/src/error.rs index b8385b2a0..858c25980 100644 --- a/etl/src/error.rs +++ b/etl/src/error.rs @@ -913,6 +913,21 @@ impl From for EtlError { } } +/// Converts [`rustls_pki_types::pem::Error`] to [`EtlError`] with [`ErrorKind::EncryptionError`]. +impl From for EtlError { + #[track_caller] + fn from(err: rustls_pki_types::pem::Error) -> EtlError { + let detail = err.to_string(); + let source = Arc::new(err); + EtlError::from_components( + ErrorKind::EncryptionError, + Cow::Borrowed("Failed to parse PEM certificate"), + Some(Cow::Owned(detail)), + Some(source), + ) + } +} + /// Converts [`uuid::Error`] to [`EtlError`] with [`ErrorKind::InvalidData`]. impl From for EtlError { #[track_caller] diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index b291f6bbb..b0277c053 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -10,9 +10,10 @@ use etl_postgres::{below_version, requires_version}; use pg_escape::{quote_identifier, quote_literal}; use postgres_replication::LogicalReplicationStream; use rustls::ClientConfig; +use rustls_pki_types::CertificateDer; +use rustls_pki_types::pem::PemObject; use std::collections::HashMap; use std::fmt; -use std::io::BufReader; use std::num::NonZeroI32; use std::sync::Arc; @@ -223,10 +224,11 @@ impl PgReplicationClient { let mut root_store = rustls::RootCertStore::empty(); if pg_connection_config.tls.enabled { - let mut root_certs_reader = - BufReader::new(pg_connection_config.tls.trusted_root_certs.as_bytes()); - for cert in rustls_pemfile::certs(&mut root_certs_reader) { - let cert = cert?; + let certs = CertificateDer::pem_slice_iter( + pg_connection_config.tls.trusted_root_certs.as_bytes(), + ) + .collect::, _>>()?; + for cert in certs { root_store.add(cert)?; } }; From 94a6a0f9e73dda8878b3d6d794340d28b530d998 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Sat, 6 Dec 2025 10:31:24 +0530 Subject: [PATCH 2/2] fix clippy warning --- etl-api/src/k8s/http.rs | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/etl-api/src/k8s/http.rs b/etl-api/src/k8s/http.rs index f45265bd4..6a672a303 100644 --- a/etl-api/src/k8s/http.rs +++ b/etl-api/src/k8s/http.rs @@ -213,26 +213,26 @@ impl HttpK8sClient { } // Waiting state, we want to distinguish normal waiting reasons from abnormal ones. - if let Some(waiting) = &state.waiting { - if let Some(reason) = &waiting.reason { - match reason.as_str() { - // Crash/restart errors - "CrashLoopBackOff" => return true, - - // Image-related errors (6 predefined in kubelet) - "ImagePullBackOff" - | "ErrImagePull" - | "ErrImageNeverPull" - | "InvalidImageName" - | "ImageInspectError" - | "RegistryUnavailable" => return true, - - // Container creation errors - "CreateContainerConfigError" | "CreateContainerError" | "RunContainerError" => { - return true; - } - _ => {} + if let Some(waiting) = &state.waiting + && let Some(reason) = &waiting.reason + { + match reason.as_str() { + // Crash/restart errors + "CrashLoopBackOff" => return true, + + // Image-related errors (6 predefined in kubelet) + "ImagePullBackOff" + | "ErrImagePull" + | "ErrImageNeverPull" + | "InvalidImageName" + | "ImageInspectError" + | "RegistryUnavailable" => return true, + + // Container creation errors + "CreateContainerConfigError" | "CreateContainerError" | "RunContainerError" => { + return true; } + _ => {} } }