Skip to content

Commit

Permalink
make it possible to retry any response (#3389)
Browse files Browse the repository at this point in the history
## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
awsdocs/aws-doc-sdk-examples#6021

## Description
<!--- Describe your changes in detail -->
This change makes it possible to retry requests that were successfully
deserialized into an output.

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
I wrote a test

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
Velfi committed Apr 2, 2024
1 parent 09ba40e commit f19a9da
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 53 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ references = ["smithy-rs#3493"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = "All requests are now retryable, even if they are deserialized successfully. Previously, this was not allowed."
references = ["smithy-rs#3389"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
authors = ["Velfi"]

[[aws-sdk-rust]]
message = "All requests are now retryable, even if they are deserialized successfully. Previously, this was not allowed."
references = ["smithy-rs#3389"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = "Fix bug in Hyper 1.0 support where https URLs returned an error"
references = ["smithy-rs#3539"]
Expand Down
9 changes: 9 additions & 0 deletions rust-runtime/aws-smithy-runtime-api/src/client/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ impl ShouldAttempt {
_ => panic!("Expected this to be the `YesAfterDelay` variant but it was the `{self:?}` variant instead"),
}
}

/// If this isn't a `No` variant, panic.
pub fn expect_no(self) {
if ShouldAttempt::No == self {
return;
}

panic!("Expected this to be the `No` variant but it was the `{self:?}` variant instead");
}
}

impl_shared_conversions!(convert SharedRetryStrategy from RetryStrategy using SharedRetryStrategy::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ impl RetryAction {
pub fn client_error() -> Self {
Self::retryable_error(ErrorKind::ClientError)
}

/// Check if a retry is indicated.
pub fn should_retry(&self) -> bool {
match self {
Self::NoActionIndicated | Self::RetryForbidden => false,
Self::RetryIndicated(_) => true,
}
}
}

/// The reason for a retry.
Expand Down
210 changes: 157 additions & 53 deletions rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;

use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};

use crate::client::retries::classifiers::run_classifiers_on_ctx;
use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
use crate::client::retries::strategy::standard::ReleaseResult::{
Expand All @@ -11,17 +25,6 @@ use crate::client::retries::strategy::standard::ReleaseResult::{
use crate::client::retries::token_bucket::TokenBucket;
use crate::client::retries::{ClientRateLimiterPartition, RetryPartition};
use crate::static_partition_map::StaticPartitionMap;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;

static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
StaticPartitionMap::new();
Expand Down Expand Up @@ -56,7 +59,7 @@ impl StandardRetryStrategy {
fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
let mut old_retry_permit = self.retry_permit.lock().unwrap();
if let Some(p) = old_retry_permit.replace(new_retry_permit) {
// Whenever we set a new retry permit and it replaces the old one, we need to "forget"
// Whenever we set a new retry permit, and it replaces the old one, we need to "forget"
// the old permit, removing it from the bucket forever.
p.forget()
}
Expand Down Expand Up @@ -141,7 +144,7 @@ impl StandardRetryStrategy {
// Get the backoff time multiplier in seconds (with fractional seconds)
retry_cfg.initial_backoff().as_secs_f64(),
// `self.local.attempts` tracks number of requests made including the initial request
// The initial attempt shouldn't count towards backoff calculations so we subtract it
// The initial attempt shouldn't count towards backoff calculations, so we subtract it
request_attempts - 1,
);
Ok(Duration::from_secs_f64(backoff).min(retry_cfg.max_backoff()))
Expand Down Expand Up @@ -194,27 +197,6 @@ impl RetryStrategy for StandardRetryStrategy {
cfg: &ConfigBag,
) -> Result<ShouldAttempt, BoxError> {
let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
// Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it
let output_or_error = ctx.output_or_error().expect(
"This must never be called without reaching the point where the result exists.",
);
let token_bucket = cfg.load::<TokenBucket>();
if output_or_error.is_ok() {
debug!("request succeeded, no retry necessary");
if let Some(tb) = token_bucket {
// If this retry strategy is holding any permits, release them back to the bucket.
if let NoPermitWasReleased = self.release_retry_permit() {
// In the event that there was no retry permit to release, we generate new
// permits from nothing. We do this to make up for permits we had to "forget".
// Otherwise, repeated retries would empty the bucket and nothing could fill it
// back up again.
tb.regenerate_a_token();
}
}
update_rate_limiter_if_exists(runtime_components, cfg, false);

return Ok(ShouldAttempt::No);
}

// Check if we're out of attempts
let request_attempts = cfg
Expand All @@ -236,19 +218,40 @@ impl RetryStrategy for StandardRetryStrategy {
let retry_classifiers = runtime_components.retry_classifiers();
let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);

// Calculate the appropriate backoff time.
let backoff =
match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
if classifier_result.should_retry() {
// Calculate the appropriate backoff time.
let backoff = match self.calculate_backoff(
runtime_components,
cfg,
retry_cfg,
&classifier_result,
) {
Ok(value) => value,
// In some cases, backoff calculation will decide that we shouldn't retry at all.
Err(value) => return Ok(value),
};
debug!(
"attempt #{request_attempts} failed with {:?}; retrying after {:?}",
classifier_result, backoff,
);
debug!(
"attempt #{request_attempts} failed with {:?}; retrying after {:?}",
classifier_result, backoff,
);

Ok(ShouldAttempt::YesAfterDelay(backoff))
} else {
debug!("attempt #{request_attempts} succeeded, no retry necessary");
if let Some(tb) = cfg.load::<TokenBucket>() {
// If this retry strategy is holding any permits, release them back to the bucket.
if let NoPermitWasReleased = self.release_retry_permit() {
// In the event that there was no retry permit to release, we generate new
// permits from nothing. We do this to make up for permits we had to "forget".
// Otherwise, repeated retries would empty the bucket and nothing could fill it
// back up again.
tb.regenerate_a_token();
}
}
update_rate_limiter_if_exists(runtime_components, cfg, false);

Ok(ShouldAttempt::YesAfterDelay(backoff))
Ok(ShouldAttempt::No)
}
}
}

Expand Down Expand Up @@ -305,34 +308,43 @@ fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {

#[cfg(test)]
mod tests {
use super::*;
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;

use aws_smithy_runtime_api::client::interceptors::context::{
Input, InterceptorContext, Output,
};
use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
use aws_smithy_runtime_api::client::retries::classifiers::{
ClassifyRetry, RetryAction, SharedRetryClassifier,
};
use aws_smithy_runtime_api::client::retries::{AlwaysRetry, RetryStrategy};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
use aws_smithy_types::config_bag::Layer;
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind};
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;
use aws_smithy_runtime_api::client::retries::{
AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
};
use aws_smithy_runtime_api::client::runtime_components::{
RuntimeComponents, RuntimeComponentsBuilder,
};
use aws_smithy_types::config_bag::{ConfigBag, Layer};
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, RetryConfig};

use super::{calculate_exponential_backoff, StandardRetryStrategy};
#[cfg(feature = "test-util")]
use crate::client::retries::token_bucket::TokenBucket;
use aws_smithy_runtime_api::client::interceptors::context::{Input, Output};
use crate::client::retries::TokenBucket;

#[test]
fn no_retry_necessary_for_ok_result() {
let cfg = ConfigBag::of_layers(vec![{
let mut layer = Layer::new("test");
layer.store_put(RetryConfig::standard());
layer.store_put(RequestAttempts::new(1));
layer
}]);
let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
let strategy = StandardRetryStrategy::default();
ctx.set_output_or_error(Ok(Output::doesnt_matter()));

let actual = strategy
.should_attempt_retry(&ctx, &rc, &cfg)
.expect("method is infallible for this use");
Expand Down Expand Up @@ -441,7 +453,7 @@ mod tests {
#[cfg(feature = "test-util")]
impl PresetReasonRetryClassifier {
fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
// We'll pop the retry_reasons in reverse order so we reverse the list to fix that.
// We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
retry_reasons.reverse();
Self {
retry_actions: Mutex::new(retry_reasons),
Expand Down Expand Up @@ -557,6 +569,98 @@ mod tests {
assert_eq!(token_bucket.available_permits(), 490);
}

#[cfg(feature = "test-util")]
#[test]
fn successful_request_and_deser_should_be_retryable() {
#[derive(Clone, Copy, Debug)]
enum LongRunningOperationStatus {
Running,
Complete,
}

#[derive(Debug)]
struct LongRunningOperationOutput {
status: Option<LongRunningOperationStatus>,
}

impl LongRunningOperationOutput {
fn status(&self) -> Option<LongRunningOperationStatus> {
self.status
}
}

struct WaiterRetryClassifier {}

impl WaiterRetryClassifier {
fn new() -> Self {
WaiterRetryClassifier {}
}
}

impl fmt::Debug for WaiterRetryClassifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WaiterRetryClassifier")
}
}
impl ClassifyRetry for WaiterRetryClassifier {
fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
let status: Option<LongRunningOperationStatus> =
ctx.output_or_error().and_then(|res| {
res.ok().and_then(|output| {
output
.downcast_ref::<LongRunningOperationOutput>()
.and_then(|output| output.status())
})
});

if let Some(LongRunningOperationStatus::Running) = status {
return RetryAction::server_error();
};

RetryAction::NoActionIndicated
}

fn name(&self) -> &'static str {
"waiter retry classifier"
}
}

let retry_config = RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5);

let rc = RuntimeComponentsBuilder::for_tests()
.with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
.build()
.unwrap();
let mut layer = Layer::new("test");
layer.store_put(retry_config);
let mut cfg = ConfigBag::of_layers(vec![layer]);
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
let strategy = StandardRetryStrategy::new();

ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
status: Some(LongRunningOperationStatus::Running),
})));

cfg.interceptor_state().store_put(TokenBucket::new(5));
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();

cfg.interceptor_state().store_put(RequestAttempts::new(1));
let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
let dur = should_retry.expect_delay();
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(token_bucket.available_permits(), 0);

ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
status: Some(LongRunningOperationStatus::Complete),
})));
cfg.interceptor_state().store_put(RequestAttempts::new(2));
let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
should_retry.expect_no();
assert_eq!(token_bucket.available_permits(), 5);
}

#[cfg(feature = "test-util")]
#[test]
fn no_quota() {
Expand Down

0 comments on commit f19a9da

Please sign in to comment.