Skip to content

Commit

Permalink
Add configurable retry policy for S3 client
Browse files Browse the repository at this point in the history
  • Loading branch information
anusudarsan committed May 13, 2024
1 parent 8e5ee08 commit d663e8b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public static ObjectCannedACL getCannedAcl(S3FileSystemConfig.ObjectCannedAcl ca
}
}

public enum RetryMode
{
STANDARD,
LEGACY,
ADAPTIVE;

public static software.amazon.awssdk.core.retry.RetryMode getRetryMode(RetryMode retryMode)
{
return switch (retryMode) {
case STANDARD -> software.amazon.awssdk.core.retry.RetryMode.STANDARD;
case LEGACY -> software.amazon.awssdk.core.retry.RetryMode.LEGACY;
case ADAPTIVE -> software.amazon.awssdk.core.retry.RetryMode.ADAPTIVE;
};
}
}

private String awsAccessKey;
private String awsSecretKey;
private String endpoint;
Expand All @@ -83,6 +99,8 @@ public static ObjectCannedACL getCannedAcl(S3FileSystemConfig.ObjectCannedAcl ca
private HostAndPort httpProxy;
private boolean httpProxySecure;
private ObjectCannedAcl objectCannedAcl = ObjectCannedAcl.NONE;
private RetryMode retryMode = RetryMode.LEGACY;
private int maxErrorRetries = 10;

public String getAwsAccessKey()
{
Expand Down Expand Up @@ -224,6 +242,31 @@ public S3FileSystemConfig setCannedAcl(ObjectCannedAcl objectCannedAcl)
return this;
}

public RetryMode getRetryMode()
{
return retryMode;
}

@Config("s3.retry-mode")
public S3FileSystemConfig setRetryMode(RetryMode retryMode)
{
this.retryMode = retryMode;
return this;
}

@Min(1) // minimum set to 1 as the SDK validates this has to be > 0
public int getMaxErrorRetries()
{
return maxErrorRetries;
}

@Config("s3.max-error-retries")
public S3FileSystemConfig setMaxErrorRetries(int maxErrorRetries)
{
this.maxErrorRetries = maxErrorRetries;
return this;
}

@NotNull
public S3SseType getSseType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
Expand All @@ -37,6 +38,7 @@
import java.net.URI;
import java.util.Optional;

import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryMode;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
Expand All @@ -53,11 +55,15 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
{
S3ClientBuilder s3 = S3Client.builder();

RetryPolicy retryPolicy = RetryPolicy.builder(getRetryMode(config.getRetryMode()))
.numRetries(config.getMaxErrorRetries())
.build();
s3.overrideConfiguration(ClientOverrideConfiguration.builder()
.addExecutionInterceptor(AwsSdkTelemetry.builder(openTelemetry)
.setCaptureExperimentalSpanAttributes(true)
.setRecordIndividualHttpError(true)
.build().newExecutionInterceptor())
.retryPolicy(retryPolicy)
.build());

Optional<StaticCredentialsProvider> staticCredentialsProvider = getStaticCredentialsProvider(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.LEGACY;
import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.STANDARD;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestS3FileSystemConfig
Expand All @@ -47,6 +49,8 @@ public void testDefaults()
.setStsRegion(null)
.setCannedAcl(ObjectCannedAcl.NONE)
.setSseType(S3SseType.NONE)
.setRetryMode(LEGACY)
.setMaxErrorRetries(10)
.setSseKmsKeyId(null)
.setStreamingPartSize(DataSize.of(16, MEGABYTE))
.setRequesterPays(false)
Expand Down Expand Up @@ -75,6 +79,8 @@ public void testExplicitPropertyMappings()
.put("s3.sts.endpoint", "sts.example.com")
.put("s3.sts.region", "us-west-2")
.put("s3.canned-acl", "BUCKET_OWNER_FULL_CONTROL")
.put("s3.retry-mode", "STANDARD")
.put("s3.max-error-retries", "12")
.put("s3.sse.type", "KMS")
.put("s3.sse.kms-key-id", "mykey")
.put("s3.streaming.part-size", "42MB")
Expand Down Expand Up @@ -102,6 +108,8 @@ public void testExplicitPropertyMappings()
.setStsRegion("us-west-2")
.setCannedAcl(ObjectCannedAcl.BUCKET_OWNER_FULL_CONTROL)
.setStreamingPartSize(DataSize.of(42, MEGABYTE))
.setRetryMode(STANDARD)
.setMaxErrorRetries(12)
.setSseType(S3SseType.KMS)
.setSseKmsKeyId("mykey")
.setRequesterPays(true)
Expand Down

0 comments on commit d663e8b

Please sign in to comment.