Skip to content

Commit

Permalink
Improve performance for positioned reads on S3
Browse files Browse the repository at this point in the history
This eliminates reading extra data from S3 by always requesting the
exact range needed (rather than performing streaming reads). Also,
because all data is always read from connections, this allows
connection reuse via the HTTP client connection pool.
  • Loading branch information
electrum committed Feb 6, 2019
1 parent 5c57d92 commit 30f7812
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 34 deletions.
Expand Up @@ -73,6 +73,7 @@
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand All @@ -88,12 +89,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.amazonaws.regions.Regions.US_EAST_1;
import static com.amazonaws.services.s3.Headers.SERVER_SIDE_ENCRYPTION;
import static com.amazonaws.services.s3.Headers.UNENCRYPTED_CONTENT_LENGTH;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
Expand Down Expand Up @@ -139,6 +141,9 @@
import static java.nio.file.Files.createTempFile;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.fs.FSExceptionMessages.CANNOT_SEEK_PAST_EOF;
import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_SEEK;
import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;

public class PrestoS3FileSystem
extends FileSystem
Expand Down Expand Up @@ -789,7 +794,8 @@ private static class PrestoS3InputStream
private final Duration maxBackoffTime;
private final Duration maxRetryTime;

private boolean closed;
private final AtomicBoolean closed = new AtomicBoolean();

private InputStream in;
private long streamPosition;
private long nextReadPosition;
Expand All @@ -809,15 +815,87 @@ public PrestoS3InputStream(AmazonS3 s3, String host, Path path, int maxAttempts,
@Override
public void close()
{
closed = true;
closed.set(true);
closeStream();
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException
{
checkClosed();
if (position < 0) {
throw new EOFException(NEGATIVE_SEEK);
}
checkPositionIndexes(offset, offset + length, buffer.length);
if (length == 0) {
return 0;
}

try {
return retry()
.maxAttempts(maxAttempts)
.exponentialBackoff(BACKOFF_MIN_SLEEP, maxBackoffTime, maxRetryTime, 2.0)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, EOFException.class)
.onRetry(STATS::newGetObjectRetry)
.run("getS3Object", () -> {
InputStream stream;
try {
GetObjectRequest request = new GetObjectRequest(host, keyFromPath(path))
.withRange(position, (position + length) - 1);
stream = s3.getObject(request).getObjectContent();
}
catch (RuntimeException e) {
STATS.newGetObjectError();
if (e instanceof AmazonS3Exception) {
switch (((AmazonS3Exception) e).getStatusCode()) {
case HTTP_RANGE_NOT_SATISFIABLE:
throw new EOFException(CANNOT_SEEK_PAST_EOF);
case HTTP_FORBIDDEN:
case HTTP_NOT_FOUND:
case HTTP_BAD_REQUEST:
throw new UnrecoverableS3OperationException(path, e);
}
}
throw e;
}

STATS.connectionOpened();
try {
int read = 0;
while (read < length) {
int n = stream.read(buffer, offset + read, length - read);
if (n <= 0) {
break;
}
read += n;
}
return read;
}
catch (Throwable t) {
STATS.newReadError(t);
abortStream(stream);
throw t;
}
finally {
STATS.connectionReleased();
stream.close();
}
});
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public void seek(long pos)
throws IOException
{
checkState(!closed, "already closed");
checkArgument(pos >= 0, "position is negative: %s", pos);
checkClosed();
if (pos < 0) {
throw new EOFException(NEGATIVE_SEEK);
}

// this allows a seek beyond the end of the stream but the next read will fail
nextReadPosition = pos;
Expand All @@ -840,6 +918,7 @@ public int read()
public int read(byte[] buffer, int offset, int length)
throws IOException
{
checkClosed();
try {
int bytesRead = retry()
.maxAttempts(maxAttempts)
Expand All @@ -864,14 +943,8 @@ public int read(byte[] buffer, int offset, int length)
}
return bytesRead;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (Exception e) {
throwIfInstanceOf(e, IOException.class);
throwIfUnchecked(e);
throw new RuntimeException(e);
throw propagate(e);
}
}

Expand Down Expand Up @@ -953,35 +1026,54 @@ private InputStream openStream(Path path, long start)
}
});
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (Exception e) {
throwIfInstanceOf(e, IOException.class);
throwIfUnchecked(e);
throw new RuntimeException(e);
throw propagate(e);
}
}

private void closeStream()
{
if (in != null) {
try {
if (in instanceof S3ObjectInputStream) {
((S3ObjectInputStream) in).abort();
}
else {
in.close();
}
}
catch (IOException | AbortedException ignored) {
// thrown if the current thread is in the interrupted state
}
abortStream(in);
in = null;
STATS.connectionReleased();
}
}

private void checkClosed()
throws IOException
{
if (closed.get()) {
throw new IOException(STREAM_IS_CLOSED);
}
}

private static void abortStream(InputStream in)
{
try {
if (in instanceof S3ObjectInputStream) {
((S3ObjectInputStream) in).abort();
}
else {
in.close();
}
}
catch (IOException | AbortedException ignored) {
// thrown if the current thread is in the interrupted state
}
}

private static RuntimeException propagate(Exception e)
throws IOException
{
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
throwIfInstanceOf(e, IOException.class);
throwIfUnchecked(e);
throw new IOException(e);
}
}

private static class PrestoS3OutputStream
Expand Down
Expand Up @@ -251,15 +251,15 @@ public void newListObjectsCall()
listObjectsCalls.update(1);
}

public void newReadError(Exception e)
public void newReadError(Throwable t)
{
if (e instanceof SocketException) {
if (t instanceof SocketException) {
socketExceptions.update(1);
}
else if (e instanceof SocketTimeoutException) {
else if (t instanceof SocketTimeoutException) {
socketTimeoutExceptions.update(1);
}
else if (e instanceof AbortedException) {
else if (t instanceof AbortedException) {
awsAbortedExceptions.update(1);
}
else {
Expand Down

0 comments on commit 30f7812

Please sign in to comment.