diff --git a/etl-api/src/k8s/base.rs b/etl-api/src/k8s/base.rs index a58e10ca..8e93ef12 100644 --- a/etl-api/src/k8s/base.rs +++ b/etl-api/src/k8s/base.rs @@ -65,6 +65,18 @@ impl From<&str> for PodPhase { } } +/// A pod's status which takes into account whether a deletion has been +/// requested, the pod's actual status and if the pod exited with an error +/// to determine the its current state +pub enum PodStatus { + Stopped, + Starting, + Started, + Stopping, + Failed, + Unknown, +} + /// Client interface describing the Kubernetes operations used by the API. /// /// Implementations are expected to be idempotent where possible by issuing @@ -149,10 +161,6 @@ pub trait K8sClient: Send + Sync { /// Deletes the replicator [`StatefulSet`] if it exists. async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>; - /// Returns the phase of the replicator pod. - async fn get_pod_phase(&self, prefix: &str) -> Result; - - /// Reports whether the replicator container terminated with a non-zero exit - /// code. - async fn has_replicator_container_error(&self, prefix: &str) -> Result; + /// Returns the status of the replicator pod. + async fn get_pod_status(&self, prefix: &str) -> Result; } diff --git a/etl-api/src/k8s/http.rs b/etl-api/src/k8s/http.rs index 693f521b..ee6e4b11 100644 --- a/etl-api/src/k8s/http.rs +++ b/etl-api/src/k8s/http.rs @@ -1,5 +1,5 @@ use crate::configs::log::LogLevel; -use crate::k8s::DestinationType; +use crate::k8s::{DestinationType, PodStatus}; use crate::k8s::{K8sClient, K8sError, PodPhase}; use async_trait::async_trait; use base64::{Engine, prelude::BASE64_STANDARD}; @@ -144,6 +144,35 @@ impl HttpK8sClient { Err(e) => Err(e.into()), } } + + /// Returns true if the replicator container in the pod has terminated with error code + fn has_replicator_container_error(pod: &Pod, replicator_container_name: &str) -> bool { + // Find the replicator container status + let container_status = pod.status.as_ref().and_then(|status| { + status + .container_statuses + .as_ref() + .and_then(|container_statuses| { + container_statuses + .iter() + .find(|cs| cs.name == replicator_container_name) + .cloned() + }) + }); + + let Some(container_status) = container_status else { + return false; + }; + + // Check last terminated state for non-zero exit code + if let Some(last_state) = &container_status.last_state + && let Some(terminated) = &last_state.terminated + { + return terminated.exit_code != 0; + } + + false + } } #[async_trait] @@ -385,16 +414,26 @@ impl K8sClient for HttpK8sClient { Ok(()) } - async fn get_pod_phase(&self, prefix: &str) -> Result { + async fn get_pod_status(&self, prefix: &str) -> Result { debug!("getting pod status"); let pod_name = create_pod_name(prefix); let pod = match self.pods_api.get(&pod_name).await { Ok(pod) => pod, - Err(kube::Error::Api(er)) if er.code == 404 => return Ok(PodPhase::Succeeded), + Err(kube::Error::Api(er)) if er.code == 404 => return Ok(PodStatus::Stopped), Err(e) => return Err(e.into()), }; + let replicator_container_name = create_replicator_container_name(prefix); + + if Self::has_replicator_container_error(&pod, &replicator_container_name) { + return Ok(PodStatus::Failed); + } + + if pod.metadata.deletion_timestamp.is_some() { + return Ok(PodStatus::Stopping); + } + let phase = pod .status .map(|status| { @@ -409,43 +448,13 @@ impl K8sClient for HttpK8sClient { }) .unwrap_or(PodPhase::Unknown); - Ok(phase) - } - - async fn has_replicator_container_error(&self, prefix: &str) -> Result { - debug!("checking for replicator container error"); - - let pod_name = create_pod_name(prefix); - let pod = match self.pods_api.get(&pod_name).await { - Ok(pod) => pod, - Err(kube::Error::Api(er)) if er.code == 404 => return Ok(false), - Err(e) => return Err(e.into()), - }; - - let replicator_container_name = create_replicator_container_name(prefix); - - // Find the replicator container status - let container_status = pod.status.and_then(|status| { - status.container_statuses.and_then(|container_statuses| { - container_statuses - .iter() - .find(|cs| cs.name == replicator_container_name) - .cloned() - }) - }); - - let Some(container_status) = container_status else { - return Ok(false); - }; - - // Check last terminated state for non-zero exit code - if let Some(last_state) = &container_status.last_state - && let Some(terminated) = &last_state.terminated - { - return Ok(terminated.exit_code != 0); - } - - Ok(false) + Ok(match phase { + PodPhase::Pending => PodStatus::Starting, + PodPhase::Running => PodStatus::Started, + PodPhase::Succeeded => PodStatus::Stopped, + PodPhase::Failed => PodStatus::Failed, + PodPhase::Unknown => PodStatus::Unknown, + }) } } diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 84d45a03..4e856090 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -12,7 +12,6 @@ use std::ops::DerefMut; use thiserror::Error; use utoipa::ToSchema; -use crate::config::ApiConfig; use crate::configs::encryption::EncryptionKey; use crate::configs::pipeline::{FullApiPipelineConfig, PartialApiPipelineConfig}; use crate::db; @@ -25,11 +24,12 @@ use crate::k8s::core::{ create_k8s_object_prefix, create_or_update_pipeline_resources_in_k8s, delete_pipeline_resources_in_k8s, }; -use crate::k8s::{K8sClient, K8sError, PodPhase}; +use crate::k8s::{K8sClient, K8sError}; use crate::routes::{ ErrorMessage, TenantIdError, connect_to_source_database_with_defaults, extract_tenant_id, }; use crate::utils::parse_docker_image_tag; +use crate::{config::ApiConfig, k8s::PodStatus}; #[derive(Debug, Error)] pub enum PipelineError { @@ -407,8 +407,22 @@ pub enum PipelineStatus { Stopped, Starting, Started, - Unknown, + Stopping, Failed, + Unknown, +} + +impl From for PipelineStatus { + fn from(value: PodStatus) -> Self { + match value { + PodStatus::Stopped => PipelineStatus::Stopped, + PodStatus::Starting => PipelineStatus::Starting, + PodStatus::Started => PipelineStatus::Started, + PodStatus::Stopping => PipelineStatus::Stopping, + PodStatus::Failed => PipelineStatus::Failed, + PodStatus::Unknown => PipelineStatus::Unknown, + } + } } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -871,26 +885,8 @@ pub async fn get_pipeline_status( let prefix = create_k8s_object_prefix(tenant_id, replicator.id); - // We load the pod phase. - let pod_phase = k8s_client.get_pod_phase(&prefix).await?; - - // Check if there's a container error. - // - // We are not exposing the error to not leak any internal information of the pod and because we - // will expose it through logs. - let has_container_error = k8s_client.has_replicator_container_error(&prefix).await?; - - let status = if has_container_error { - PipelineStatus::Failed - } else { - match pod_phase { - PodPhase::Pending => PipelineStatus::Starting, - PodPhase::Running => PipelineStatus::Started, - PodPhase::Succeeded => PipelineStatus::Stopped, - PodPhase::Failed => PipelineStatus::Failed, - PodPhase::Unknown => PipelineStatus::Unknown, - } - }; + let pod_status = k8s_client.get_pod_status(&prefix).await?; + let status = pod_status.into(); let response = GetPipelineStatusResponse { pipeline_id, diff --git a/etl-api/tests/support/k8s_client.rs b/etl-api/tests/support/k8s_client.rs index d917fac2..b6722676 100644 --- a/etl-api/tests/support/k8s_client.rs +++ b/etl-api/tests/support/k8s_client.rs @@ -5,7 +5,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use etl_api::configs::log::LogLevel; use etl_api::k8s::http::{TRUSTED_ROOT_CERT_CONFIG_MAP_NAME, TRUSTED_ROOT_CERT_KEY_NAME}; -use etl_api::k8s::{DestinationType, K8sClient, K8sError, PodPhase}; +use etl_api::k8s::{DestinationType, K8sClient, K8sError, PodStatus}; use etl_config::Environment; use k8s_openapi::api::core::v1::ConfigMap; @@ -96,11 +96,7 @@ impl K8sClient for MockK8sClient { Ok(()) } - async fn get_pod_phase(&self, _prefix: &str) -> Result { - Ok(PodPhase::Running) - } - - async fn has_replicator_container_error(&self, _prefix: &str) -> Result { - Ok(false) + async fn get_pod_status(&self, _prefix: &str) -> Result { + Ok(PodStatus::Started) } }