Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kubernetes 429 error requeue with jitter #696

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.43.2"
version = "0.44.0"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down Expand Up @@ -61,6 +61,7 @@ hyper = "0.14.27"
rand = "0.8.5"
tower-test = "0.4.0"
futures-util = "0.3"
regex = "1"

[dependencies.kube]
features = ["runtime", "client", "derive", "ws"]
Expand Down
161 changes: 159 additions & 2 deletions tembo-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,30 @@ async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
.map_err(|e| Error::FinalizerError(Box::new(e)))
}

fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
pub(crate) fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
ctx.metrics.reconcile_failure(&cdb, error);
Action::requeue(Duration::from_secs(5 * 60))

// Check for 429 error code from Kubernetes API
match error {
Error::KubeError(kube_error) => match kube_error {
kube::Error::Api(api_error) if api_error.code == 429 => {
// Error is a 429 (too many requests), calculate backoff and jitter
let backoff: u64 = 60;
let max_jitter: u64 = 120;
let jitter: u64 = rand::thread_rng().gen_range(0..=max_jitter);
let backoff_with_jitter = Duration::from_secs(backoff + jitter);
// Log the 429 error and the calculated backoff time
warn!(
"Received HTTP 429 Too Many Requests. Requeuing after {} seconds.",
backoff_with_jitter.as_secs()
);
Action::requeue(backoff_with_jitter)
}
_ => Action::requeue(Duration::from_secs(5 * 60)),
},
_ => Action::requeue(Duration::from_secs(5 * 60)),
}
}

// create_volume_snapshot_patch creates a patch for the CoreDB spec to enable or disable volumesnapshots
Expand Down Expand Up @@ -1192,4 +1212,141 @@ mod test {
));
assert!(is_volume_snapshot_update_needed(None, true));
}

// Test the error_policy function, we need to mock the ctx and cdb to mimic a 429 error code
use crate::{error_policy, Error};
use futures::pin_mut;
use http::{Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::api::core::v1::Pod;
use kube::{api::Api, Client};
use serde_json::json;
use tower_test::mock;

#[tokio::test]
async fn test_error_policy_429() {
// setup a test CoreDB object
let coredb = CoreDB::test();

// mock the Kubernetes client and setup Context
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let client = Client::new(mock_service, "default".to_string());
let ctx = Arc::new(Context {
client: client.clone(),
metrics: Default::default(),
diagnostics: Default::default(),
});

// setup the mock response 429 too many requests
let spawned = tokio::spawn(async move {
pin_mut!(handle);
if let Some((_request, send)) = handle.next_request().await {
// We don't check the specifics of the request here, focusing on the response
send.send_response(
Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.body(Body::from(
json!({
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Too Many Requests",
"reason": "TooManyRequests",
"code": 429
})
.to_string(),
))
.unwrap(),
);
}
});

// Setup call to kubernetes api Pod
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
let err = pod_api.get("test-pod").await.err().unwrap();

// Convert the KubeError into your custom error type as it would in your controller logic
let custom_error = Error::from(err);

// Now we simulate calling the error_policy function with this error
let action = error_policy(Arc::new(coredb), &custom_error, ctx);
let action_str = format!("{:?}", action);

println!("Action: {:?}", action);

// Use regular expressions to extract the duration from the action string
let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
if let Some(captures) = re.captures(&action_str) {
let duration_secs = captures[1].parse::<u64>().unwrap();
assert!((60..=180).contains(&duration_secs));
} else {
panic!("Unexpected action format: {}", action_str);
}

spawned.await.unwrap();
}

#[tokio::test]
async fn test_error_policy_non_429() {
// setup a test CoreDB object
let coredb = CoreDB::test();

// mock the Kubernetes client and setup Context
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let client = Client::new(mock_service, "default".to_string());
let ctx = Arc::new(Context {
client: client.clone(),
metrics: Default::default(),
diagnostics: Default::default(),
});

// setup the mock response 404 Not Found
let spawned = tokio::spawn(async move {
pin_mut!(handle);
if let Some((_request, send)) = handle.next_request().await {
send.send_response(
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(
json!({
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Not Found",
"reason": "NotFound",
"code": 404
})
.to_string(),
))
.unwrap(),
);
}
});

// Setup call to kubernetes api Pod
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
let err = pod_api.get("test-pod").await.err().unwrap();

// Convert the KubeError into your custom error type as it would in your controller logic
let custom_error = Error::from(err);

// Now we simulate calling the error_policy function with this error
let action = error_policy(Arc::new(coredb), &custom_error, ctx);
let action_str = format!("{:?}", action);

println!("Action: {:?}", action);

// Assert that the action is a requeue with a duration of 5 minutes (300 seconds)
let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
if let Some(captures) = re.captures(&action_str) {
let duration_secs = captures[1].parse::<u64>().unwrap();
assert_eq!(duration_secs, 300);
} else {
panic!("Unexpected action format: {}", action_str);
}

spawned.await.unwrap();
}
}
Loading