diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapper.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapper.java index 1e3d9dc30d703..894b319980713 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapper.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapper.java @@ -55,6 +55,8 @@ */ public class SingleFilePageSwapper implements PageSwapper { + private static final int MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS = 42; + private static int defaultChannelStripePower() { int vcores = Runtime.getRuntime().availableProcessors(); @@ -284,6 +286,11 @@ private void clear( long bufferAddress, int bufferSize ) @Override public long read( long filePageId, long bufferAddress, int bufferSize ) throws IOException + { + return readAndRetryIfInterrupted( filePageId, bufferAddress, bufferSize, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); + } + + private long readAndRetryIfInterrupted( long filePageId, long bufferAddress, int bufferSize, int attemptsLeft ) throws IOException { long fileOffset = pageIdToPosition( filePageId ); try @@ -299,13 +306,15 @@ public long read( long filePageId, long bufferAddress, int bufferSize ) throws I } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( filePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. - long bytesRead = read( filePageId, bufferAddress, bufferSize ); + long bytesRead = readAndRetryIfInterrupted( filePageId, bufferAddress, bufferSize, attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt(); @@ -343,8 +352,7 @@ private long readPositionedVectoredToFileChannel( long fileOffset = pageIdToPosition( startFilePageId ); FileChannel channel = unwrappedChannel( startFilePageId ); ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length ); - long bytesRead = lockPositionReadVector( - startFilePageId, channel, fileOffset, srcs ); + long bytesRead = lockPositionReadVectorAndRetryIfInterrupted( startFilePageId, channel, fileOffset, srcs, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); if ( bytesRead == -1 ) { for ( long address : bufferAddresses ) @@ -373,8 +381,8 @@ else if ( bytesRead < ((long) filePageSize) * length ) return bytesRead; } - private long lockPositionReadVector( - long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException + private long lockPositionReadVectorAndRetryIfInterrupted( long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs, int attemptsLeft ) + throws IOException { try { @@ -394,14 +402,16 @@ private long lockPositionReadVector( } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( filePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. channel = unwrappedChannel( filePageId ); - long bytesWritten = lockPositionReadVector( filePageId, channel, fileOffset, srcs ); + long bytesWritten = lockPositionReadVectorAndRetryIfInterrupted( filePageId, channel, fileOffset, srcs, attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt(); @@ -424,6 +434,11 @@ private int readPositionedVectoredFallback( @Override public long write( long filePageId, long bufferAddress ) throws IOException + { + return writeAndRetryIfInterrupted( filePageId, bufferAddress, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); + } + + private long writeAndRetryIfInterrupted( long filePageId, long bufferAddress, int attemptsLeft ) throws IOException { long fileOffset = pageIdToPosition( filePageId ); increaseFileSizeTo( fileOffset + filePageSize ); @@ -434,13 +449,15 @@ public long write( long filePageId, long bufferAddress ) throws IOException } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( filePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. - long bytesWritten = write( filePageId, bufferAddress ); + long bytesWritten = writeAndRetryIfInterrupted( filePageId, bufferAddress, attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt(); @@ -478,7 +495,7 @@ private long writePositionedVectoredToFileChannel( increaseFileSizeTo( fileOffset + (((long) filePageSize) * length) ); FileChannel channel = unwrappedChannel( startFilePageId ); ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length ); - return lockPositionWriteVector( startFilePageId, channel, fileOffset, srcs ); + return lockPositionWriteVectorAndRetryIfInterrupted( startFilePageId, channel, fileOffset, srcs, MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); } private ByteBuffer[] convertToByteBuffers( long[] bufferAddresses, int arrayOffset, int length ) throws Exception @@ -498,8 +515,8 @@ private FileChannel unwrappedChannel( long startFilePageId ) return StoreFileChannelUnwrapper.unwrap( storeChannel ); } - private long lockPositionWriteVector( - long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException + private long lockPositionWriteVectorAndRetryIfInterrupted( long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs, int attemptsLeft ) + throws IOException { try { @@ -518,14 +535,16 @@ private long lockPositionWriteVector( } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( filePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. channel = unwrappedChannel( filePageId ); - long bytesWritten = lockPositionWriteVector( filePageId, channel, fileOffset, srcs ); + long bytesWritten = lockPositionWriteVectorAndRetryIfInterrupted( filePageId, channel, fileOffset, srcs, attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt(); @@ -706,6 +725,11 @@ public synchronized void closeAndDelete() throws IOException @Override public void force() throws IOException + { + forceAndRetryIfInterrupted( MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); + } + + private void forceAndRetryIfInterrupted( int attemptsLeft ) throws IOException { try { @@ -713,13 +737,15 @@ public void force() throws IOException } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( tokenFilePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. - force(); + forceAndRetryIfInterrupted( attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt(); @@ -742,6 +768,11 @@ public long getLastPageId() @Override public void truncate() throws IOException + { + truncateAndRetryIfInterrupted( MAX_INTERRUPTED_CHANNEL_REOPEN_ATTEMPTS ); + } + + private void truncateAndRetryIfInterrupted( int attemptsLeft ) throws IOException { setCurrentFileSize( 0 ); try @@ -750,13 +781,15 @@ public void truncate() throws IOException } catch ( ClosedChannelException e ) { - // AsynchronousCloseException is a subclass of - // ClosedChannelException, and ClosedByInterruptException is in - // turn a subclass of AsynchronousCloseException. tryReopen( tokenFilePageId, e ); + + if ( attemptsLeft < 1 ) + { + throw new IOException( "IO failed due to interruption", e ); + } + boolean interrupted = Thread.interrupted(); - // Recurse because this is hopefully a very rare occurrence. - truncate(); + truncateAndRetryIfInterrupted( attemptsLeft - 1 ); if ( interrupted ) { Thread.currentThread().interrupt();