Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make it possible to retry any response #3389

Merged
merged 5 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rust-runtime/aws-smithy-runtime-api/src/client/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ impl ShouldAttempt {
_ => panic!("Expected this to be the `YesAfterDelay` variant but it was the `{self:?}` variant instead"),
}
}

/// If this isn't a `No` variant, panic.
pub fn expect_no(self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used outside tests?

Copy link
Contributor Author

@Velfi Velfi Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's defined in an impl block with #[cfg(feature = "test-util")]. Of course, that's just beyond the fold so it's not clear that that's the case. Thanks GitHub.

if ShouldAttempt::No == self {
return;
}

panic!("Expected this to be the `No` variant but it was the `{self:?}` variant instead");
}
}

impl_shared_conversions!(convert SharedRetryStrategy from RetryStrategy using SharedRetryStrategy::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ impl RetryAction {
pub fn client_error() -> Self {
Self::retryable_error(ErrorKind::ClientError)
}

/// Check if a retry is indicated.
pub fn should_retry(&self) -> bool {
match self {
Self::NoActionIndicated | Self::RetryForbidden => false,
Self::RetryIndicated(_) => true,
}
}
}

/// The reason for a retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;

use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};

use crate::client::retries::classifiers::run_classifiers_on_ctx;
use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
use crate::client::retries::strategy::standard::ReleaseResult::{
Expand All @@ -11,17 +25,6 @@ use crate::client::retries::strategy::standard::ReleaseResult::{
use crate::client::retries::token_bucket::TokenBucket;
use crate::client::retries::{ClientRateLimiterPartition, RetryPartition};
use crate::static_partition_map::StaticPartitionMap;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;

static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
StaticPartitionMap::new();
Expand Down Expand Up @@ -56,7 +59,7 @@ impl StandardRetryStrategy {
fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
let mut old_retry_permit = self.retry_permit.lock().unwrap();
if let Some(p) = old_retry_permit.replace(new_retry_permit) {
// Whenever we set a new retry permit and it replaces the old one, we need to "forget"
// Whenever we set a new retry permit, and it replaces the old one, we need to "forget"
// the old permit, removing it from the bucket forever.
p.forget()
}
Expand Down Expand Up @@ -141,7 +144,7 @@ impl StandardRetryStrategy {
// Get the backoff time multiplier in seconds (with fractional seconds)
retry_cfg.initial_backoff().as_secs_f64(),
// `self.local.attempts` tracks number of requests made including the initial request
// The initial attempt shouldn't count towards backoff calculations so we subtract it
// The initial attempt shouldn't count towards backoff calculations, so we subtract it
request_attempts - 1,
);
Ok(Duration::from_secs_f64(backoff).min(retry_cfg.max_backoff()))
Expand Down Expand Up @@ -194,27 +197,6 @@ impl RetryStrategy for StandardRetryStrategy {
cfg: &ConfigBag,
) -> Result<ShouldAttempt, BoxError> {
let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
// Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it
let output_or_error = ctx.output_or_error().expect(
"This must never be called without reaching the point where the result exists.",
);
let token_bucket = cfg.load::<TokenBucket>();
if output_or_error.is_ok() {
debug!("request succeeded, no retry necessary");
if let Some(tb) = token_bucket {
// If this retry strategy is holding any permits, release them back to the bucket.
if let NoPermitWasReleased = self.release_retry_permit() {
// In the event that there was no retry permit to release, we generate new
// permits from nothing. We do this to make up for permits we had to "forget".
// Otherwise, repeated retries would empty the bucket and nothing could fill it
// back up again.
tb.regenerate_a_token();
}
}
update_rate_limiter_if_exists(runtime_components, cfg, false);

return Ok(ShouldAttempt::No);
}

// Check if we're out of attempts
let request_attempts = cfg
Expand All @@ -236,19 +218,40 @@ impl RetryStrategy for StandardRetryStrategy {
let retry_classifiers = runtime_components.retry_classifiers();
let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);

// Calculate the appropriate backoff time.
let backoff =
match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
if classifier_result.should_retry() {
// Calculate the appropriate backoff time.
let backoff = match self.calculate_backoff(
runtime_components,
cfg,
retry_cfg,
&classifier_result,
) {
Ok(value) => value,
// In some cases, backoff calculation will decide that we shouldn't retry at all.
Err(value) => return Ok(value),
};
debug!(
"attempt #{request_attempts} failed with {:?}; retrying after {:?}",
classifier_result, backoff,
);
debug!(
"attempt #{request_attempts} failed with {:?}; retrying after {:?}",
classifier_result, backoff,
);

Ok(ShouldAttempt::YesAfterDelay(backoff))
} else {
debug!("attempt #{request_attempts} succeeded, no retry necessary");
if let Some(tb) = cfg.load::<TokenBucket>() {
// If this retry strategy is holding any permits, release them back to the bucket.
if let NoPermitWasReleased = self.release_retry_permit() {
// In the event that there was no retry permit to release, we generate new
// permits from nothing. We do this to make up for permits we had to "forget".
// Otherwise, repeated retries would empty the bucket and nothing could fill it
// back up again.
tb.regenerate_a_token();
}
}
update_rate_limiter_if_exists(runtime_components, cfg, false);

Ok(ShouldAttempt::YesAfterDelay(backoff))
Ok(ShouldAttempt::No)
}
}
}

Expand Down Expand Up @@ -305,34 +308,43 @@ fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {

#[cfg(test)]
mod tests {
use super::*;
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;

use aws_smithy_runtime_api::client::interceptors::context::{
Input, InterceptorContext, Output,
};
use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
use aws_smithy_runtime_api::client::retries::classifiers::{
ClassifyRetry, RetryAction, SharedRetryClassifier,
};
use aws_smithy_runtime_api::client::retries::{AlwaysRetry, RetryStrategy};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
use aws_smithy_types::config_bag::Layer;
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind};
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;
use aws_smithy_runtime_api::client::retries::{
AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
};
use aws_smithy_runtime_api::client::runtime_components::{
RuntimeComponents, RuntimeComponentsBuilder,
};
use aws_smithy_types::config_bag::{ConfigBag, Layer};
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, RetryConfig};

use super::{calculate_exponential_backoff, StandardRetryStrategy};
#[cfg(feature = "test-util")]
use crate::client::retries::token_bucket::TokenBucket;
use aws_smithy_runtime_api::client::interceptors::context::{Input, Output};
use crate::client::retries::TokenBucket;

#[test]
fn no_retry_necessary_for_ok_result() {
let cfg = ConfigBag::of_layers(vec![{
let mut layer = Layer::new("test");
layer.store_put(RetryConfig::standard());
layer.store_put(RequestAttempts::new(1));
layer
}]);
let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
let strategy = StandardRetryStrategy::default();
ctx.set_output_or_error(Ok(Output::doesnt_matter()));

let actual = strategy
.should_attempt_retry(&ctx, &rc, &cfg)
.expect("method is infallible for this use");
Expand Down Expand Up @@ -441,7 +453,7 @@ mod tests {
#[cfg(feature = "test-util")]
impl PresetReasonRetryClassifier {
fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
// We'll pop the retry_reasons in reverse order so we reverse the list to fix that.
// We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
retry_reasons.reverse();
Self {
retry_actions: Mutex::new(retry_reasons),
Expand Down Expand Up @@ -557,6 +569,98 @@ mod tests {
assert_eq!(token_bucket.available_permits(), 490);
}

#[cfg(feature = "test-util")]
#[test]
fn successful_request_and_deser_should_be_retryable() {
#[derive(Clone, Copy, Debug)]
enum LongRunningOperationStatus {
Running,
Complete,
}

#[derive(Debug)]
struct LongRunningOperationOutput {
status: Option<LongRunningOperationStatus>,
}

impl LongRunningOperationOutput {
fn status(&self) -> Option<LongRunningOperationStatus> {
self.status
}
}

struct WaiterRetryClassifier {}

impl WaiterRetryClassifier {
fn new() -> Self {
WaiterRetryClassifier {}
}
}

impl fmt::Debug for WaiterRetryClassifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WaiterRetryClassifier")
}
}
impl ClassifyRetry for WaiterRetryClassifier {
fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
let status: Option<LongRunningOperationStatus> =
ctx.output_or_error().and_then(|res| {
res.ok().and_then(|output| {
output
.downcast_ref::<LongRunningOperationOutput>()
.and_then(|output| output.status())
})
});

if let Some(LongRunningOperationStatus::Running) = status {
return RetryAction::server_error();
};

RetryAction::NoActionIndicated
}

fn name(&self) -> &'static str {
"waiter retry classifier"
}
}

let retry_config = RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5);

let rc = RuntimeComponentsBuilder::for_tests()
.with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
.build()
.unwrap();
let mut layer = Layer::new("test");
layer.store_put(retry_config);
let mut cfg = ConfigBag::of_layers(vec![layer]);
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
let strategy = StandardRetryStrategy::new();

ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
status: Some(LongRunningOperationStatus::Running),
})));

cfg.interceptor_state().store_put(TokenBucket::new(5));
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();

cfg.interceptor_state().store_put(RequestAttempts::new(1));
let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
let dur = should_retry.expect_delay();
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(token_bucket.available_permits(), 0);

ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
status: Some(LongRunningOperationStatus::Complete),
})));
cfg.interceptor_state().store_put(RequestAttempts::new(2));
let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
should_retry.expect_no();
assert_eq!(token_bucket.available_permits(), 5);
}

#[cfg(feature = "test-util")]
#[test]
fn no_quota() {
Expand Down
Loading