From d489de367a191ced2306ee413f8409f7cb275aa8 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 6 Aug 2025 08:30:26 +0200 Subject: [PATCH 1/2] ref(api): Do not return detailed pod error stats --- etl-api/src/k8s_client.rs | 68 +++++------------------------- etl-api/src/routes/pipelines.rs | 32 ++++---------- etl-api/tests/common/k8s_client.rs | 16 ++----- 3 files changed, 24 insertions(+), 92 deletions(-) diff --git a/etl-api/src/k8s_client.rs b/etl-api/src/k8s_client.rs index e49f84e1d..50229ed92 100644 --- a/etl-api/src/k8s_client.rs +++ b/etl-api/src/k8s_client.rs @@ -10,7 +10,7 @@ use tracing::*; use kube::{ Client, - api::{Api, DeleteParams, LogParams, Patch, PatchParams}, + api::{Api, DeleteParams, Patch, PatchParams}, }; #[derive(Debug, Error)] @@ -30,13 +30,6 @@ pub enum PodPhase { Unknown, } -#[derive(Debug, Clone)] -pub struct ContainerError { - pub exit_code: Option, - pub message: Option, - pub reason: Option, -} - impl From<&str> for PodPhase { fn from(value: &str) -> Self { match value { @@ -88,17 +81,11 @@ pub trait K8sClient: Send + Sync { async fn get_pod_phase(&self, prefix: &str) -> Result; - async fn get_replicator_container_error( + async fn has_replicator_container_error( &self, prefix: &str, - ) -> Result, K8sError>; + ) -> Result; - async fn get_container_logs( - &self, - pod_name: &str, - container_name: &str, - previous: bool, - ) -> Result; async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>; } @@ -534,11 +521,11 @@ impl K8sClient for HttpK8sClient { Ok(phase) } - async fn get_replicator_container_error( + async fn has_replicator_container_error( &self, prefix: &str, - ) -> Result, K8sError> { - info!("getting replicator error information"); + ) -> Result { + info!("checking for replicator container error"); let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0"); let pod = match self.pods_api.get(&pod_name).await { @@ -547,7 +534,7 @@ impl K8sClient for HttpK8sClient { return match e { kube::Error::Api(ref er) => { if er.code == 404 { - return Ok(None); + return Ok(false); } Err(e.into()) } @@ -569,32 +556,17 @@ impl K8sClient for HttpK8sClient { }); let Some(container_status) = container_status else { - return Ok(None); + return Ok(false); }; - // Check last terminated state. - // - // `last_state` is only set when there’s a previous termination, and remains empty if the - // container has never failed, so this is what we want, having access to the previous failure + // Check last terminated state for non-zero exit code if let Some(last_state) = &container_status.last_state { if let Some(terminated) = &last_state.terminated { - if terminated.exit_code != 0 { - // Fetch logs from the previous container run - let log_message = self - .get_container_logs(&pod_name, &replicator_container_name, true) - .await - .ok(); - - return Ok(Some(ContainerError { - exit_code: Some(terminated.exit_code), - message: log_message.or_else(|| terminated.message.clone()), - reason: terminated.reason.clone(), - })); - } + return Ok(terminated.exit_code != 0); } } - Ok(None) + Ok(false) } async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> { @@ -618,22 +590,4 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn get_container_logs( - &self, - pod_name: &str, - container_name: &str, - previous: bool, - ) -> Result { - let log_params = LogParams { - container: Some(container_name.to_string()), - tail_lines: Some(50), // Get last 50 lines - timestamps: false, - previous, // Get logs from previous container instance or not - ..Default::default() - }; - - let logs = self.pods_api.logs(pod_name, &log_params).await?; - - Ok(logs) - } } diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 31dcca75b..ef5573611 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -359,11 +359,7 @@ pub enum PipelineStatus { Started, Stopping, Unknown, - Failed { - exit_code: Option, - message: Option, - reason: Option, - }, + Failed, } #[utoipa::path( @@ -710,30 +706,20 @@ pub async fn get_pipeline_status( // We load the pod phase. let pod_phase = k8s_client.get_pod_phase(&prefix).await?; - // We check the status of the replicator container. + // Check if there's a container error. // - // Note that this is dumping all the logs within the container, meaning that anything on stdout - // and stderr will be returned. In case we want to be more careful with what we return we can - // embed within the logs some special characters that will be used to determine which error message - // to return to the user. - let replicator_error = k8s_client.get_replicator_container_error(&prefix).await?; - - let status = if let Some(replicator_error) = replicator_error { - PipelineStatus::Failed { - exit_code: replicator_error.exit_code, - message: replicator_error.message, - reason: replicator_error.reason, - } + // We are not exposing the error to not leak any internal information of the pod and because we + // will expose it through logs. + let has_container_error = k8s_client.has_replicator_container_error(&prefix).await?; + + let status = if has_container_error { + PipelineStatus::Failed } else { match pod_phase { PodPhase::Pending => PipelineStatus::Starting, PodPhase::Running => PipelineStatus::Started, PodPhase::Succeeded => PipelineStatus::Stopped, - PodPhase::Failed => PipelineStatus::Failed { - exit_code: None, - message: None, - reason: None, - }, + PodPhase::Failed => PipelineStatus::Failed, PodPhase::Unknown => PipelineStatus::Unknown, } }; diff --git a/etl-api/tests/common/k8s_client.rs b/etl-api/tests/common/k8s_client.rs index d3a4e9382..658c68c67 100644 --- a/etl-api/tests/common/k8s_client.rs +++ b/etl-api/tests/common/k8s_client.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use etl_api::k8s_client::{ - ContainerError, K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, + K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME, }; use k8s_openapi::api::core::v1::ConfigMap; @@ -80,21 +80,13 @@ impl K8sClient for MockK8sClient { Ok(PodPhase::Running) } - async fn get_replicator_container_error( + async fn has_replicator_container_error( &self, _prefix: &str, - ) -> Result, K8sError> { - Ok(None) + ) -> Result { + Ok(false) } - async fn get_container_logs( - &self, - _pod_name: &str, - _container_name: &str, - _previous: bool, - ) -> Result { - Ok(String::new()) - } async fn delete_pod(&self, _prefix: &str) -> Result<(), K8sError> { Ok(()) From 03f9c21574da411f44a9b23fe81ff63b7e0e908e Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 6 Aug 2025 08:31:31 +0200 Subject: [PATCH 2/2] Update --- etl-api/src/k8s_client.rs | 12 ++---------- etl-api/tests/common/k8s_client.rs | 9 ++------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/etl-api/src/k8s_client.rs b/etl-api/src/k8s_client.rs index 50229ed92..34eaf526c 100644 --- a/etl-api/src/k8s_client.rs +++ b/etl-api/src/k8s_client.rs @@ -81,11 +81,7 @@ pub trait K8sClient: Send + Sync { async fn get_pod_phase(&self, prefix: &str) -> Result; - async fn has_replicator_container_error( - &self, - prefix: &str, - ) -> Result; - + async fn has_replicator_container_error(&self, prefix: &str) -> Result; async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>; } @@ -521,10 +517,7 @@ impl K8sClient for HttpK8sClient { Ok(phase) } - async fn has_replicator_container_error( - &self, - prefix: &str, - ) -> Result { + async fn has_replicator_container_error(&self, prefix: &str) -> Result { info!("checking for replicator container error"); let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0"); @@ -589,5 +582,4 @@ impl K8sClient for HttpK8sClient { Ok(()) } - } diff --git a/etl-api/tests/common/k8s_client.rs b/etl-api/tests/common/k8s_client.rs index 658c68c67..e086990aa 100644 --- a/etl-api/tests/common/k8s_client.rs +++ b/etl-api/tests/common/k8s_client.rs @@ -2,8 +2,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use etl_api::k8s_client::{ - K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, - TRUSTED_ROOT_CERT_KEY_NAME, + K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME, }; use k8s_openapi::api::core::v1::ConfigMap; @@ -80,14 +79,10 @@ impl K8sClient for MockK8sClient { Ok(PodPhase::Running) } - async fn has_replicator_container_error( - &self, - _prefix: &str, - ) -> Result { + async fn has_replicator_container_error(&self, _prefix: &str) -> Result { Ok(false) } - async fn delete_pod(&self, _prefix: &str) -> Result<(), K8sError> { Ok(()) }