diff --git a/tembo-operator/Cargo.lock b/tembo-operator/Cargo.lock index 607bc5076..7adf181e9 100644 --- a/tembo-operator/Cargo.lock +++ b/tembo-operator/Cargo.lock @@ -494,7 +494,7 @@ dependencies = [ [[package]] name = "controller" -version = "0.43.2" +version = "0.44.0" dependencies = [ "actix-web", "anyhow", diff --git a/tembo-operator/Cargo.toml b/tembo-operator/Cargo.toml index e635ca8b2..7d5ba4722 100644 --- a/tembo-operator/Cargo.toml +++ b/tembo-operator/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "controller" description = "Tembo Operator for Postgres" -version = "0.43.2" +version = "0.44.0" edition = "2021" default-run = "controller" license = "Apache-2.0" @@ -61,6 +61,7 @@ hyper = "0.14.27" rand = "0.8.5" tower-test = "0.4.0" futures-util = "0.3" +regex = "1" [dependencies.kube] features = ["runtime", "client", "derive", "ws"] diff --git a/tembo-operator/src/controller.rs b/tembo-operator/src/controller.rs index bd070e782..6ee7e5ebe 100644 --- a/tembo-operator/src/controller.rs +++ b/tembo-operator/src/controller.rs @@ -110,10 +110,30 @@ async fn reconcile(cdb: Arc, ctx: Arc) -> Result { .map_err(|e| Error::FinalizerError(Box::new(e))) } -fn error_policy(cdb: Arc, error: &Error, ctx: Arc) -> Action { +pub(crate) fn error_policy(cdb: Arc, error: &Error, ctx: Arc) -> Action { warn!("reconcile failed: {:?}", error); ctx.metrics.reconcile_failure(&cdb, error); - Action::requeue(Duration::from_secs(5 * 60)) + + // Check for 429 error code from Kubernetes API + match error { + Error::KubeError(kube_error) => match kube_error { + kube::Error::Api(api_error) if api_error.code == 429 => { + // Error is a 429 (too many requests), calculate backoff and jitter + let backoff: u64 = 60; + let max_jitter: u64 = 120; + let jitter: u64 = rand::thread_rng().gen_range(0..=max_jitter); + let backoff_with_jitter = Duration::from_secs(backoff + jitter); + // Log the 429 error and the calculated backoff time + warn!( + "Received HTTP 429 Too Many Requests. Requeuing after {} seconds.", + backoff_with_jitter.as_secs() + ); + Action::requeue(backoff_with_jitter) + } + _ => Action::requeue(Duration::from_secs(5 * 60)), + }, + _ => Action::requeue(Duration::from_secs(5 * 60)), + } } // create_volume_snapshot_patch creates a patch for the CoreDB spec to enable or disable volumesnapshots @@ -1192,4 +1212,141 @@ mod test { )); assert!(is_volume_snapshot_update_needed(None, true)); } + + // Test the error_policy function, we need to mock the ctx and cdb to mimic a 429 error code + use crate::{error_policy, Error}; + use futures::pin_mut; + use http::{Request, Response, StatusCode}; + use hyper::Body; + use k8s_openapi::api::core::v1::Pod; + use kube::{api::Api, Client}; + use serde_json::json; + use tower_test::mock; + + #[tokio::test] + async fn test_error_policy_429() { + // setup a test CoreDB object + let coredb = CoreDB::test(); + + // mock the Kubernetes client and setup Context + let (mock_service, handle) = mock::pair::, Response>(); + let client = Client::new(mock_service, "default".to_string()); + let ctx = Arc::new(Context { + client: client.clone(), + metrics: Default::default(), + diagnostics: Default::default(), + }); + + // setup the mock response 429 too many requests + let spawned = tokio::spawn(async move { + pin_mut!(handle); + if let Some((_request, send)) = handle.next_request().await { + // We don't check the specifics of the request here, focusing on the response + send.send_response( + Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(Body::from( + json!({ + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "Too Many Requests", + "reason": "TooManyRequests", + "code": 429 + }) + .to_string(), + )) + .unwrap(), + ); + } + }); + + // Setup call to kubernetes api Pod + let pod_api: Api = Api::namespaced(ctx.client.clone(), "default"); + let err = pod_api.get("test-pod").await.err().unwrap(); + + // Convert the KubeError into your custom error type as it would in your controller logic + let custom_error = Error::from(err); + + // Now we simulate calling the error_policy function with this error + let action = error_policy(Arc::new(coredb), &custom_error, ctx); + let action_str = format!("{:?}", action); + + println!("Action: {:?}", action); + + // Use regular expressions to extract the duration from the action string + let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap(); + if let Some(captures) = re.captures(&action_str) { + let duration_secs = captures[1].parse::().unwrap(); + assert!((60..=180).contains(&duration_secs)); + } else { + panic!("Unexpected action format: {}", action_str); + } + + spawned.await.unwrap(); + } + + #[tokio::test] + async fn test_error_policy_non_429() { + // setup a test CoreDB object + let coredb = CoreDB::test(); + + // mock the Kubernetes client and setup Context + let (mock_service, handle) = mock::pair::, Response>(); + let client = Client::new(mock_service, "default".to_string()); + let ctx = Arc::new(Context { + client: client.clone(), + metrics: Default::default(), + diagnostics: Default::default(), + }); + + // setup the mock response 404 Not Found + let spawned = tokio::spawn(async move { + pin_mut!(handle); + if let Some((_request, send)) = handle.next_request().await { + send.send_response( + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from( + json!({ + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "Not Found", + "reason": "NotFound", + "code": 404 + }) + .to_string(), + )) + .unwrap(), + ); + } + }); + + // Setup call to kubernetes api Pod + let pod_api: Api = Api::namespaced(ctx.client.clone(), "default"); + let err = pod_api.get("test-pod").await.err().unwrap(); + + // Convert the KubeError into your custom error type as it would in your controller logic + let custom_error = Error::from(err); + + // Now we simulate calling the error_policy function with this error + let action = error_policy(Arc::new(coredb), &custom_error, ctx); + let action_str = format!("{:?}", action); + + println!("Action: {:?}", action); + + // Assert that the action is a requeue with a duration of 5 minutes (300 seconds) + let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap(); + if let Some(captures) = re.captures(&action_str) { + let duration_secs = captures[1].parse::().unwrap(); + assert_eq!(duration_secs, 300); + } else { + panic!("Unexpected action format: {}", action_str); + } + + spawned.await.unwrap(); + } }