Skip to content

Commit

Permalink
Limit number of retries when file channels closed by interruption
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Koval committed Apr 4, 2018
1 parent bb01881 commit a5c052d
Showing 1 changed file with 70 additions and 37 deletions.
Expand Up @@ -55,6 +55,8 @@
*/ */
public class SingleFilePageSwapper implements PageSwapper public class SingleFilePageSwapper implements PageSwapper
{ {
private static final int MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS = 42;

private static int defaultChannelStripePower() private static int defaultChannelStripePower()
{ {
int vcores = Runtime.getRuntime().availableProcessors(); int vcores = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -284,6 +286,11 @@ private void clear( long bufferAddress, int bufferSize )


@Override @Override
public long read( long filePageId, long bufferAddress, int bufferSize ) throws IOException public long read( long filePageId, long bufferAddress, int bufferSize ) throws IOException
{
return readAndRetryIfInterrupted( filePageId, bufferAddress, bufferSize, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
}

private long readAndRetryIfInterrupted( long filePageId, long bufferAddress, int bufferSize, int attemptsLeft ) throws IOException
{ {
long fileOffset = pageIdToPosition( filePageId ); long fileOffset = pageIdToPosition( filePageId );
try try
Expand All @@ -299,13 +306,15 @@ public long read( long filePageId, long bufferAddress, int bufferSize ) throws I
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( filePageId, e ); tryReopen( filePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence. long bytesRead = readAndRetryIfInterrupted( filePageId, bufferAddress, bufferSize, attemptsLeft - 1 );
long bytesRead = read( filePageId, bufferAddress, bufferSize );
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand Down Expand Up @@ -343,8 +352,7 @@ private long readPositionedVectoredToFileChannel(
long fileOffset = pageIdToPosition( startFilePageId ); long fileOffset = pageIdToPosition( startFilePageId );
FileChannel channel = unwrappedChannel( startFilePageId ); FileChannel channel = unwrappedChannel( startFilePageId );
ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length ); ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length );
long bytesRead = lockPositionReadVector( long bytesRead = lockPositionReadVectorAndRetryIfInterrupted( startFilePageId, channel, fileOffset, srcs, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
startFilePageId, channel, fileOffset, srcs );
if ( bytesRead == -1 ) if ( bytesRead == -1 )
{ {
for ( long address : bufferAddresses ) for ( long address : bufferAddresses )
Expand Down Expand Up @@ -373,8 +381,8 @@ else if ( bytesRead < ((long) filePageSize) * length )
return bytesRead; return bytesRead;
} }


private long lockPositionReadVector( private long lockPositionReadVectorAndRetryIfInterrupted( long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs, int attemptsLeft )
long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException throws IOException
{ {
try try
{ {
Expand All @@ -394,14 +402,16 @@ private long lockPositionReadVector(
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( filePageId, e ); tryReopen( filePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
channel = unwrappedChannel( filePageId ); channel = unwrappedChannel( filePageId );
long bytesWritten = lockPositionReadVector( filePageId, channel, fileOffset, srcs ); long bytesWritten = lockPositionReadVectorAndRetryIfInterrupted( filePageId, channel, fileOffset, srcs, attemptsLeft - 1 );
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand All @@ -424,6 +434,11 @@ private int readPositionedVectoredFallback(


@Override @Override
public long write( long filePageId, long bufferAddress ) throws IOException public long write( long filePageId, long bufferAddress ) throws IOException
{
return writeAndRetryIfInterrupted( filePageId, bufferAddress, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
}

private long writeAndRetryIfInterrupted( long filePageId, long bufferAddress, int attemptsLeft ) throws IOException
{ {
long fileOffset = pageIdToPosition( filePageId ); long fileOffset = pageIdToPosition( filePageId );
increaseFileSizeTo( fileOffset + filePageSize ); increaseFileSizeTo( fileOffset + filePageSize );
Expand All @@ -434,13 +449,15 @@ public long write( long filePageId, long bufferAddress ) throws IOException
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( filePageId, e ); tryReopen( filePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence. long bytesWritten = writeAndRetryIfInterrupted( filePageId, bufferAddress, attemptsLeft - 1 );
long bytesWritten = write( filePageId, bufferAddress );
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand Down Expand Up @@ -478,7 +495,7 @@ private long writePositionedVectoredToFileChannel(
increaseFileSizeTo( fileOffset + (((long) filePageSize) * length) ); increaseFileSizeTo( fileOffset + (((long) filePageSize) * length) );
FileChannel channel = unwrappedChannel( startFilePageId ); FileChannel channel = unwrappedChannel( startFilePageId );
ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length ); ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length );
return lockPositionWriteVector( startFilePageId, channel, fileOffset, srcs ); return lockPositionWriteVectorAndRetryIfInterrupted( startFilePageId, channel, fileOffset, srcs, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
} }


private ByteBuffer[] convertToByteBuffers( long[] bufferAddresses, int arrayOffset, int length ) throws Exception private ByteBuffer[] convertToByteBuffers( long[] bufferAddresses, int arrayOffset, int length ) throws Exception
Expand All @@ -498,8 +515,8 @@ private FileChannel unwrappedChannel( long startFilePageId )
return StoreFileChannelUnwrapper.unwrap( storeChannel ); return StoreFileChannelUnwrapper.unwrap( storeChannel );
} }


private long lockPositionWriteVector( private long lockPositionWriteVectorAndRetryIfInterrupted( long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs, int attemptsLeft )
long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException throws IOException
{ {
try try
{ {
Expand All @@ -518,14 +535,16 @@ private long lockPositionWriteVector(
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( filePageId, e ); tryReopen( filePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
channel = unwrappedChannel( filePageId ); channel = unwrappedChannel( filePageId );
long bytesWritten = lockPositionWriteVector( filePageId, channel, fileOffset, srcs ); long bytesWritten = lockPositionWriteVectorAndRetryIfInterrupted( filePageId, channel, fileOffset, srcs, attemptsLeft - 1 );
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand Down Expand Up @@ -706,20 +725,27 @@ public synchronized void closeAndDelete() throws IOException


@Override @Override
public void force() throws IOException public void force() throws IOException
{
forceAndRetryIfInterrupted( MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
}

private void forceAndRetryIfInterrupted( int attemptsLeft ) throws IOException
{ {
try try
{ {
channel( tokenFilePageId ).force( false ); channel( tokenFilePageId ).force( false );
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( tokenFilePageId, e ); tryReopen( tokenFilePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence. forceAndRetryIfInterrupted( attemptsLeft - 1 );
force();
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand All @@ -742,6 +768,11 @@ public long getLastPageId()


@Override @Override
public void truncate() throws IOException public void truncate() throws IOException
{
truncateAndRetryIfInterrupted( MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS );
}

private void truncateAndRetryIfInterrupted( int attemptsLeft ) throws IOException
{ {
setCurrentFileSize( 0 ); setCurrentFileSize( 0 );
try try
Expand All @@ -750,13 +781,15 @@ public void truncate() throws IOException
} }
catch ( ClosedChannelException e ) catch ( ClosedChannelException e )
{ {
// AsynchronousCloseException is a subclass of
// ClosedChannelException, and ClosedByInterruptException is in
// turn a subclass of AsynchronousCloseException.
tryReopen( tokenFilePageId, e ); tryReopen( tokenFilePageId, e );

if ( attemptsLeft < 1 )
{
throw new IOException( "IO failed due to interruption", e );
}

boolean interrupted = Thread.interrupted(); boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence. truncateAndRetryIfInterrupted( attemptsLeft - 1 );
truncate();
if ( interrupted ) if ( interrupted )
{ {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
Expand Down

0 comments on commit a5c052d

Please sign in to comment.