From d663e8bd9620024ed6fa381960769254b70d6de6 Mon Sep 17 00:00:00 2001 From: Anu Sudarsan Date: Tue, 7 May 2024 16:19:45 -0400 Subject: [PATCH] Add configurable retry policy for S3 client --- .../filesystem/s3/S3FileSystemConfig.java | 43 +++++++++++++++++++ .../filesystem/s3/S3FileSystemFactory.java | 6 +++ .../filesystem/s3/TestS3FileSystemConfig.java | 8 ++++ 3 files changed, 57 insertions(+) diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemConfig.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemConfig.java index b725007ef3c0c..1298d5e0584ff 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemConfig.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemConfig.java @@ -60,6 +60,22 @@ public static ObjectCannedACL getCannedAcl(S3FileSystemConfig.ObjectCannedAcl ca } } + public enum RetryMode + { + STANDARD, + LEGACY, + ADAPTIVE; + + public static software.amazon.awssdk.core.retry.RetryMode getRetryMode(RetryMode retryMode) + { + return switch (retryMode) { + case STANDARD -> software.amazon.awssdk.core.retry.RetryMode.STANDARD; + case LEGACY -> software.amazon.awssdk.core.retry.RetryMode.LEGACY; + case ADAPTIVE -> software.amazon.awssdk.core.retry.RetryMode.ADAPTIVE; + }; + } + } + private String awsAccessKey; private String awsSecretKey; private String endpoint; @@ -83,6 +99,8 @@ public static ObjectCannedACL getCannedAcl(S3FileSystemConfig.ObjectCannedAcl ca private HostAndPort httpProxy; private boolean httpProxySecure; private ObjectCannedAcl objectCannedAcl = ObjectCannedAcl.NONE; + private RetryMode retryMode = RetryMode.LEGACY; + private int maxErrorRetries = 10; public String getAwsAccessKey() { @@ -224,6 +242,31 @@ public S3FileSystemConfig setCannedAcl(ObjectCannedAcl objectCannedAcl) return this; } + public RetryMode getRetryMode() + { + return retryMode; + } + + @Config("s3.retry-mode") + public S3FileSystemConfig setRetryMode(RetryMode retryMode) + { + this.retryMode = retryMode; + return this; + } + + @Min(1) // minimum set to 1 as the SDK validates this has to be > 0 + public int getMaxErrorRetries() + { + return maxErrorRetries; + } + + @Config("s3.max-error-retries") + public S3FileSystemConfig setMaxErrorRetries(int maxErrorRetries) + { + this.maxErrorRetries = maxErrorRetries; + return this; + } + @NotNull public S3SseType getSseType() { diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java index 25fc94e266a0e..31d3bd1f28cad 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java @@ -25,6 +25,7 @@ import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.regions.Region; @@ -37,6 +38,7 @@ import java.net.URI; import java.util.Optional; +import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryMode; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; @@ -53,11 +55,15 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi { S3ClientBuilder s3 = S3Client.builder(); + RetryPolicy retryPolicy = RetryPolicy.builder(getRetryMode(config.getRetryMode())) + .numRetries(config.getMaxErrorRetries()) + .build(); s3.overrideConfiguration(ClientOverrideConfiguration.builder() .addExecutionInterceptor(AwsSdkTelemetry.builder(openTelemetry) .setCaptureExperimentalSpanAttributes(true) .setRecordIndividualHttpError(true) .build().newExecutionInterceptor()) + .retryPolicy(retryPolicy) .build()); Optional staticCredentialsProvider = getStaticCredentialsProvider(config); diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemConfig.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemConfig.java index c620308ea65f2..74b52d20f3984 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemConfig.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemConfig.java @@ -27,6 +27,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.LEGACY; +import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.STANDARD; import static java.util.concurrent.TimeUnit.MINUTES; public class TestS3FileSystemConfig @@ -47,6 +49,8 @@ public void testDefaults() .setStsRegion(null) .setCannedAcl(ObjectCannedAcl.NONE) .setSseType(S3SseType.NONE) + .setRetryMode(LEGACY) + .setMaxErrorRetries(10) .setSseKmsKeyId(null) .setStreamingPartSize(DataSize.of(16, MEGABYTE)) .setRequesterPays(false) @@ -75,6 +79,8 @@ public void testExplicitPropertyMappings() .put("s3.sts.endpoint", "sts.example.com") .put("s3.sts.region", "us-west-2") .put("s3.canned-acl", "BUCKET_OWNER_FULL_CONTROL") + .put("s3.retry-mode", "STANDARD") + .put("s3.max-error-retries", "12") .put("s3.sse.type", "KMS") .put("s3.sse.kms-key-id", "mykey") .put("s3.streaming.part-size", "42MB") @@ -102,6 +108,8 @@ public void testExplicitPropertyMappings() .setStsRegion("us-west-2") .setCannedAcl(ObjectCannedAcl.BUCKET_OWNER_FULL_CONTROL) .setStreamingPartSize(DataSize.of(42, MEGABYTE)) + .setRetryMode(STANDARD) + .setMaxErrorRetries(12) .setSseType(S3SseType.KMS) .setSseKmsKeyId("mykey") .setRequesterPays(true)