Skip to content

Commit

Permalink
Retry on interrupted S3 connections
Browse files Browse the repository at this point in the history
Read the resposne inside a transformer lambda and throw a retryable
exception on IO errors to use the already configured SDK retry
mechanism.
  • Loading branch information
nineinchnick committed Aug 1, 2024
1 parent 547326c commit a54960e
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 23 deletions.
13 changes: 13 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>junit-extensions</artifactId>
Expand Down Expand Up @@ -236,6 +243,12 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInput;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.RetryableException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -24,7 +25,6 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

import static java.util.Objects.checkFromIndexSize;
Expand Down Expand Up @@ -61,11 +61,9 @@ public void readFully(long position, byte[] buffer, int offset, int length)
String range = "bytes=%s-%s".formatted(position, (position + length) - 1);
GetObjectRequest rangeRequest = request.toBuilder().range(range).build();

try (InputStream in = getObject(rangeRequest)) {
int n = readNBytes(in, buffer, offset, length);
if (n < length) {
throw new EOFException("Read %s of %s requested bytes: %s".formatted(n, length, location));
}
int n = read(buffer, offset, length, rangeRequest);
if (n < length) {
throw new EOFException("Read %s of %s requested bytes: %s".formatted(n, length, location));
}
}

Expand All @@ -82,9 +80,7 @@ public int readTail(byte[] buffer, int offset, int length)
String range = "bytes=-%s".formatted(length);
GetObjectRequest rangeRequest = request.toBuilder().range(range).build();

try (InputStream in = getObject(rangeRequest)) {
return readNBytes(in, buffer, offset, length);
}
return read(buffer, offset, length, rangeRequest);
}

@Override
Expand All @@ -101,28 +97,27 @@ private void ensureOpen()
}
}

private InputStream getObject(GetObjectRequest request)
private int read(byte[] buffer, int offset, int length, GetObjectRequest rangeRequest)
throws IOException
{
try {
return client.getObject(request);
return client.getObject(rangeRequest, (_, inputStream) -> {
try {
return inputStream.readNBytes(buffer, offset, length);
}
catch (AbortedException _) {
throw new InterruptedIOException();
}
catch (IOException e) {
throw RetryableException.create("Error reading getObject response", e);
}
});
}
catch (NoSuchKeyException e) {
catch (NoSuchKeyException _) {
throw new FileNotFoundException(location.toString());
}
catch (SdkException e) {
throw new IOException("Failed to open S3 file: " + location, e);
}
}

private static int readNBytes(InputStream in, byte[] buffer, int offset, int length)
throws IOException
{
try {
return in.readNBytes(buffer, offset, length);
}
catch (AbortedException e) {
throw new InterruptedIOException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.s3;

import com.google.common.io.Closer;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import io.trino.filesystem.Location;
import io.trino.testing.containers.Minio;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;

import static io.trino.testing.containers.Minio.MINIO_API_PORT;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestS3Retries
{
private static final int TOXIPROXY_CONTROL_PORT = 8474;
private static final int MINIO_PROXY_PORT = 1234;
private static final int TEST_DATA_SIZE = 1024;

private S3Client s3client;

private final Closer closer = Closer.create();

@BeforeAll
final void init()
throws IOException
{
Network network = Network.newNetwork();
closer.register(network::close);
Minio minio = Minio.builder()
.withNetwork(network)
.build();
minio.start();
minio.createBucket("bucket");
minio.writeFile(getTestData(), "bucket", "object");
closer.register(minio::close);

ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0")
.withExposedPorts(TOXIPROXY_CONTROL_PORT, MINIO_PROXY_PORT)
.withNetwork(network)
.withNetworkAliases("minio");
toxiproxy.start();
closer.register(toxiproxy::close);

ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
Proxy proxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT);
// the number of transferred bytes includes both the response headers (around 570 bytes) and body
proxy.toxics()
.limitData("broken connection", ToxicDirection.DOWNSTREAM, 700);

s3client = S3Client.builder()
.endpointOverride(URI.create("http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(MINIO_PROXY_PORT)))
.region(Region.of(Minio.MINIO_REGION))
.forcePathStyle(true)
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(Minio.MINIO_ACCESS_KEY, Minio.MINIO_SECRET_KEY)))
// explicitly configure the number of retries
.overrideConfiguration(o -> o.retryStrategy(b -> b.maxAttempts(3)))
.build();
closer.register(s3client::close);
}

@AfterAll
final void cleanup()
throws IOException
{
closer.close();
}

private static byte[] getTestData()
{
byte[] data = new byte[TEST_DATA_SIZE];
Arrays.fill(data, (byte) 1);
return data;
}

@Test
public void testRetries()
{
S3Location location = new S3Location(Location.of("s3://bucket/object"));
GetObjectRequest request = GetObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
.build();
S3Input input = new S3Input(location.location(), s3client, request);

byte[] bytes = new byte[TEST_DATA_SIZE];
assertThatThrownBy(() -> input.readFully(0, bytes, 0, TEST_DATA_SIZE)).cause()
.hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response"));
assertThatThrownBy(() -> input.readTail(bytes, 0, TEST_DATA_SIZE)).cause()
.hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response"));
}
}

0 comments on commit a54960e

Please sign in to comment.