Skip to content

Commit

Permalink
add kubernetes 429 error requeue with jitter (#696)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhudson committed Apr 4, 2024
1 parent 49d0a33 commit 6802e9e
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 4 deletions.
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.43.2"
version = "0.44.0"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down Expand Up @@ -61,6 +61,7 @@ hyper = "0.14.27"
rand = "0.8.5"
tower-test = "0.4.0"
futures-util = "0.3"
regex = "1"

[dependencies.kube]
features = ["runtime", "client", "derive", "ws"]
Expand Down
161 changes: 159 additions & 2 deletions tembo-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,30 @@ async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
.map_err(|e| Error::FinalizerError(Box::new(e)))
}

fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
pub(crate) fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
ctx.metrics.reconcile_failure(&cdb, error);
Action::requeue(Duration::from_secs(5 * 60))

// Check for 429 error code from Kubernetes API
match error {
Error::KubeError(kube_error) => match kube_error {
kube::Error::Api(api_error) if api_error.code == 429 => {
// Error is a 429 (too many requests), calculate backoff and jitter
let backoff: u64 = 60;
let max_jitter: u64 = 120;
let jitter: u64 = rand::thread_rng().gen_range(0..=max_jitter);
let backoff_with_jitter = Duration::from_secs(backoff + jitter);
// Log the 429 error and the calculated backoff time
warn!(
"Received HTTP 429 Too Many Requests. Requeuing after {} seconds.",
backoff_with_jitter.as_secs()
);
Action::requeue(backoff_with_jitter)
}
_ => Action::requeue(Duration::from_secs(5 * 60)),
},
_ => Action::requeue(Duration::from_secs(5 * 60)),
}
}

// create_volume_snapshot_patch creates a patch for the CoreDB spec to enable or disable volumesnapshots
Expand Down Expand Up @@ -1192,4 +1212,141 @@ mod test {
));
assert!(is_volume_snapshot_update_needed(None, true));
}

// Test the error_policy function, we need to mock the ctx and cdb to mimic a 429 error code
use crate::{error_policy, Error};
use futures::pin_mut;
use http::{Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::api::core::v1::Pod;
use kube::{api::Api, Client};
use serde_json::json;
use tower_test::mock;

#[tokio::test]
async fn test_error_policy_429() {
// setup a test CoreDB object
let coredb = CoreDB::test();

// mock the Kubernetes client and setup Context
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let client = Client::new(mock_service, "default".to_string());
let ctx = Arc::new(Context {
client: client.clone(),
metrics: Default::default(),
diagnostics: Default::default(),
});

// setup the mock response 429 too many requests
let spawned = tokio::spawn(async move {
pin_mut!(handle);
if let Some((_request, send)) = handle.next_request().await {
// We don't check the specifics of the request here, focusing on the response
send.send_response(
Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.body(Body::from(
json!({
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Too Many Requests",
"reason": "TooManyRequests",
"code": 429
})
.to_string(),
))
.unwrap(),
);
}
});

// Setup call to kubernetes api Pod
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
let err = pod_api.get("test-pod").await.err().unwrap();

// Convert the KubeError into your custom error type as it would in your controller logic
let custom_error = Error::from(err);

// Now we simulate calling the error_policy function with this error
let action = error_policy(Arc::new(coredb), &custom_error, ctx);
let action_str = format!("{:?}", action);

println!("Action: {:?}", action);

// Use regular expressions to extract the duration from the action string
let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
if let Some(captures) = re.captures(&action_str) {
let duration_secs = captures[1].parse::<u64>().unwrap();
assert!((60..=180).contains(&duration_secs));
} else {
panic!("Unexpected action format: {}", action_str);
}

spawned.await.unwrap();
}

#[tokio::test]
async fn test_error_policy_non_429() {
// setup a test CoreDB object
let coredb = CoreDB::test();

// mock the Kubernetes client and setup Context
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let client = Client::new(mock_service, "default".to_string());
let ctx = Arc::new(Context {
client: client.clone(),
metrics: Default::default(),
diagnostics: Default::default(),
});

// setup the mock response 404 Not Found
let spawned = tokio::spawn(async move {
pin_mut!(handle);
if let Some((_request, send)) = handle.next_request().await {
send.send_response(
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(
json!({
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Not Found",
"reason": "NotFound",
"code": 404
})
.to_string(),
))
.unwrap(),
);
}
});

// Setup call to kubernetes api Pod
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
let err = pod_api.get("test-pod").await.err().unwrap();

// Convert the KubeError into your custom error type as it would in your controller logic
let custom_error = Error::from(err);

// Now we simulate calling the error_policy function with this error
let action = error_policy(Arc::new(coredb), &custom_error, ctx);
let action_str = format!("{:?}", action);

println!("Action: {:?}", action);

// Assert that the action is a requeue with a duration of 5 minutes (300 seconds)
let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
if let Some(captures) = re.captures(&action_str) {
let duration_secs = captures[1].parse::<u64>().unwrap();
assert_eq!(duration_secs, 300);
} else {
panic!("Unexpected action format: {}", action_str);
}

spawned.await.unwrap();
}
}

0 comments on commit 6802e9e

Please sign in to comment.