From d3b2c69e49666e1f32fd3807369923af5c2823b6 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Tue, 30 May 2023 15:43:01 +0300 Subject: [PATCH] feat(retry-policy): add request retry policy Add `RequestRetryPolicy` for delay computation between failed requests retry. fix(reqwest-response): fix missing headers Fix missing `headers` in `RequestResponse` struct when used with `reqwest` crate. --- src/core/mod.rs | 4 + src/core/retry_policy.rs | 291 +++++++++++++++++++++++++++++++++++++++ src/transport/reqwest.rs | 24 +++- 3 files changed, 315 insertions(+), 4 deletions(-) create mode 100644 src/core/retry_policy.rs diff --git a/src/core/mod.rs b/src/core/mod.rs index abc15f07..1b2271c0 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -37,6 +37,10 @@ pub mod transport_response; #[doc(inline)] pub use serialize::Serialize; +#[doc(inline)] +pub use retry_policy::RequestRetryPolicy; +pub mod retry_policy; + #[cfg(any(feature = "publish", feature = "access"))] pub mod headers; diff --git a/src/core/retry_policy.rs b/src/core/retry_policy.rs new file mode 100644 index 00000000..6360120e --- /dev/null +++ b/src/core/retry_policy.rs @@ -0,0 +1,291 @@ +//! # Request retry policy +//! +//! This module contains the [`RequestRetryPolicy`] struct. +//! It is used to calculate delays between failed requests to the [`PubNub API`] +//! for next retry attempt. +//! It is intended to be used by the [`pubnub`] crate. +//! +//! [`PubNub API`]: https://www.pubnub.com/docs +//! [`pubnub`]: ../index.html +//! +use crate::core::TransportResponse; + +/// Request retry policy. +/// +/// +pub enum RequestRetryPolicy { + /// Requests shouldn't be tried again. + None, + + /// Retry the request after the same amount of time. + Linear { + /// The delay between failed retry attempts. + delay: u32, + + /// Number of times a request can be retried. + max_retry: u32, + }, + + /// Retry the request using exponential amount of time. + Exponential { + /// Minimum delay between failed retry attempts. + min_delay: u32, + + /// Maximum delay between failed retry attempts. + max_delay: u32, + + /// Number of times a request can be retried. + max_retry: u32, + }, +} + +impl RequestRetryPolicy { + #[allow(dead_code)] + pub(crate) fn retry_delay(&self, attempt: &u32, response: &TransportResponse) -> Option { + match response.status { + // Respect service requested delay. + 429 => (!matches!(self, Self::None)) + .then(|| response.headers.get("retry-after")) + .flatten() + .and_then(|value| value.parse::().ok()), + 500..=599 => match self { + RequestRetryPolicy::None => None, + RequestRetryPolicy::Linear { delay, max_retry } => { + (*attempt).le(max_retry).then_some(*delay) + } + RequestRetryPolicy::Exponential { + min_delay, + max_delay, + max_retry, + } => (*attempt) + .le(max_retry) + .then_some((*min_delay).pow(*attempt).min(*max_delay)), + }, + _ => None, + } + } +} + +impl Default for RequestRetryPolicy { + fn default() -> Self { + Self::Exponential { + min_delay: 2, + max_delay: 300, + max_retry: 2, + } + } +} + +#[cfg(test)] +mod should { + use super::*; + use crate::lib::collections::HashMap; + + fn client_error_response() -> TransportResponse { + TransportResponse { + status: 400, + ..Default::default() + } + } + + fn too_many_requests_error_response() -> TransportResponse { + TransportResponse { + status: 429, + headers: HashMap::from([("retry-after".into(), "150".into())]), + ..Default::default() + } + } + + fn server_error_response() -> TransportResponse { + TransportResponse { + status: 500, + ..Default::default() + } + } + + #[test] + fn create_exponential_by_default() { + let policy: RequestRetryPolicy = Default::default(); + assert!(matches!(policy, RequestRetryPolicy::Exponential { .. })); + } + + mod none_policy { + use super::*; + + #[test] + fn return_none_delay_for_client_error_response() { + assert_eq!( + RequestRetryPolicy::None.retry_delay(&1, &client_error_response()), + None + ); + } + + #[test] + fn return_none_delay_for_server_error_response() { + assert_eq!( + RequestRetryPolicy::None.retry_delay(&1, &server_error_response()), + None + ); + } + + #[test] + fn return_none_delay_for_too_many_requests_error_response() { + assert_eq!( + RequestRetryPolicy::None.retry_delay(&1, &too_many_requests_error_response()), + None + ); + } + } + + mod linear_policy { + use super::*; + + #[test] + fn return_none_delay_for_client_error_response() { + let policy = RequestRetryPolicy::Linear { + delay: 10, + max_retry: 5, + }; + + assert_eq!(policy.retry_delay(&1, &client_error_response()), None); + } + + #[test] + fn return_same_delay_for_server_error_response() { + let expected_delay = 10; + let policy = RequestRetryPolicy::Linear { + delay: expected_delay, + max_retry: 5, + }; + + assert_eq!( + policy.retry_delay(&1, &server_error_response()), + Some(expected_delay) + ); + + assert_eq!( + policy.retry_delay(&2, &server_error_response()), + Some(expected_delay) + ); + } + + #[test] + fn return_none_delay_when_reach_max_retry_for_server_error_response() { + let expected_delay = 10; + let policy = RequestRetryPolicy::Linear { + delay: expected_delay, + max_retry: 2, + }; + + assert_eq!( + policy.retry_delay(&2, &server_error_response()), + Some(expected_delay) + ); + + assert_eq!(policy.retry_delay(&3, &server_error_response()), None); + } + + #[test] + fn return_service_delay_for_too_many_requests_error_response() { + let policy = RequestRetryPolicy::Linear { + delay: 10, + max_retry: 2, + }; + + // 150 is from 'server_error_response' `Retry-After` header. + assert_eq!( + policy.retry_delay(&2, &too_many_requests_error_response()), + Some(150) + ); + } + } + + mod exponential_policy { + use super::*; + + #[test] + fn return_none_delay_for_client_error_response() { + let expected_delay = 8; + let policy = RequestRetryPolicy::Exponential { + min_delay: expected_delay, + max_delay: 100, + max_retry: 2, + }; + + assert_eq!(policy.retry_delay(&1, &client_error_response()), None); + } + + #[test] + fn return_exponential_delay_for_server_error_response() { + let expected_delay = 8; + let policy = RequestRetryPolicy::Exponential { + min_delay: expected_delay, + max_delay: 100, + max_retry: 2, + }; + + assert_eq!( + policy.retry_delay(&1, &server_error_response()), + Some(expected_delay) + ); + + assert_eq!( + policy.retry_delay(&2, &server_error_response()), + Some(expected_delay.pow(2)) + ); + } + + #[test] + fn return_none_delay_when_reach_max_retry_for_server_error_response() { + let expected_delay = 8; + let policy = RequestRetryPolicy::Exponential { + min_delay: expected_delay, + max_delay: 100, + max_retry: 2, + }; + + assert_eq!( + policy.retry_delay(&2, &server_error_response()), + Some(expected_delay.pow(2)) + ); + + assert_eq!(policy.retry_delay(&3, &server_error_response()), None); + } + + #[test] + fn return_max_delay_when_reach_max_value_for_server_error_response() { + let expected_delay = 8; + let max_delay = 50; + let policy = RequestRetryPolicy::Exponential { + min_delay: expected_delay, + max_delay, + max_retry: 5, + }; + + assert_eq!( + policy.retry_delay(&1, &server_error_response()), + Some(expected_delay) + ); + + assert_eq!( + policy.retry_delay(&2, &server_error_response()), + Some(max_delay) + ); + } + + #[test] + fn return_service_delay_for_too_many_requests_error_response() { + let policy = RequestRetryPolicy::Exponential { + min_delay: 10, + max_delay: 100, + max_retry: 2, + }; + + // 150 is from 'server_error_response' `Retry-After` header. + assert_eq!( + policy.retry_delay(&2, &too_many_requests_error_response()), + Some(150) + ); + } + } +} diff --git a/src/transport/reqwest.rs b/src/transport/reqwest.rs index cdbd70e2..d92c5d10 100644 --- a/src/transport/reqwest.rs +++ b/src/transport/reqwest.rs @@ -89,6 +89,7 @@ impl Transport for TransportReqwest { details: e.to_string(), })?; + let headers = result.headers().clone(); let status = result.status(); result .bytes() @@ -96,7 +97,7 @@ impl Transport for TransportReqwest { .map_err(|e| PubNubError::Transport { details: e.to_string(), }) - .and_then(|bytes| create_result(status, bytes)) + .and_then(|bytes| create_result(status, bytes, &headers)) } } @@ -204,11 +205,25 @@ fn prepare_url(hostname: &str, path: &str, query_params: &HashMap Result { +fn create_result( + status: StatusCode, + body: Bytes, + headers: &HeaderMap, +) -> Result { + let headers: HashMap = + headers + .iter() + .fold(HashMap::new(), |mut acc, (name, value)| { + if let Ok(value) = value.to_str() { + acc.insert(name.to_string(), value.to_string()); + } + acc + }); + Ok(TransportResponse { status: status.as_u16(), body: (!body.is_empty()).then(|| body.to_vec()), - ..Default::default() + headers, }) } @@ -324,13 +339,14 @@ pub mod blocking { details: e.to_string(), })?; + let headers = result.headers().clone(); let status = result.status(); result .bytes() .map_err(|e| PubNubError::Transport { details: e.to_string(), }) - .and_then(|bytes| create_result(status, bytes)) + .and_then(|bytes| create_result(status, bytes, &headers)) } }