Skip to content

Commit

Permalink
Make PageCache.flushAndForce cope with concurrent file unmappings.
Browse files Browse the repository at this point in the history
This is important because `flushAndForce` is called during steady state as part of check point.
Meanwhile, mapping and unmapping files are also happening during steady state operations when indexes are created or dropped.
And now that `flushAndForce` is no longer holding the `PageCache` monitor lock throughout the whole thing, those two operations can happen concurrently.
  • Loading branch information
chrisvest committed Apr 27, 2018
1 parent c5fc7bc commit 076736f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -597,6 +598,19 @@ private void flushAllPages( List<PagedFile> files, IOLimiter limiter ) throws IO
FlushEventOpportunity flushOpportunity = fileFlush.flushEventOpportunity();
muninnPagedFile.flushAndForceInternal( flushOpportunity, false, limiter );
}
catch ( ClosedChannelException e )
{
if ( muninnPagedFile.getRefCount() > 0 )
{
// The file is not supposed to be closed, since we have a positive ref-count, yet we got a
// ClosedChannelException anyway? It's an odd situation, so let's tell the outside world about
// this failure.
throw e;
}
// Otherwise: The file was closed while we were trying to flush it. Since unmapping implies a flush
// anyway, we can safely assume that this is not a problem. The file was flushed, and it doesn't
// really matter how that happened. We'll ignore this exception.
}
}
syncDevice();
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
Expand Down Expand Up @@ -347,7 +348,21 @@ private void markAllDirtyPagesAsClean()
}
}

void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forClosing, IOLimiter limiter )
void flushAndForceInternal( FlushEventOpportunity flushes, boolean forClosing, IOLimiter limiter )
throws IOException
{
try
{
doFlushAndForceInternal( flushes, forClosing, limiter );
}
catch ( ClosedChannelException e )
{
e.addSuppressed( closeStackTrace );
throw e;
}
}

private void doFlushAndForceInternal( FlushEventOpportunity flushes, boolean forClosing, IOLimiter limiter )
throws IOException
{
// TODO it'd be awesome if, on Linux, we'd call sync_file_range(2) instead of fsync
Expand Down Expand Up @@ -415,14 +430,14 @@ else if ( forClosing )
}
if ( pagesGrabbed > 0 )
{
vectoredFlush( pages, bufferAddresses, flushStamps, pagesGrabbed, flushOpportunity, forClosing );
vectoredFlush( pages, bufferAddresses, flushStamps, pagesGrabbed, flushes, forClosing );
limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, this );
pagesGrabbed = 0;
}
}
if ( pagesGrabbed > 0 )
{
vectoredFlush( pages, bufferAddresses, flushStamps, pagesGrabbed, flushOpportunity, forClosing );
vectoredFlush( pages, bufferAddresses, flushStamps, pagesGrabbed, flushes, forClosing );
limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, this );
}
}
Expand Down
Expand Up @@ -474,6 +474,56 @@ public void flushAndForceMustNotLockPageCacheForWholeDuration() throws Exception
}
}

@Test
public void flushAndForceMustTolerateAsynchronousFileUnmapping() throws Exception
{
configureStandardPageCache();
PageCache pageCache = this.pageCache;
this.pageCache = null; // `null` out to prevent `tearDown` from getting stuck if test fails.
File a = existingFile( "a" );
File b = existingFile( "b" );
File c = existingFile( "c" );

BinaryLatch limiterStartLatch = new BinaryLatch();
BinaryLatch limiterBlockLatch = new BinaryLatch();
Future<?> flusher;

try ( PagedFile pfA = pageCache.map( a, filePageSize );
PagedFile pfB = pageCache.map( b, filePageSize );
PagedFile pfC = pageCache.map( c, filePageSize ) )
{
// Dirty a bunch of pages.
try ( PageCursor cursor = pfA.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next() );
}
try ( PageCursor cursor = pfB.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next() );
}
try ( PageCursor cursor = pfC.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next() );
}
flusher = executor.submit( () ->
{
pageCache.flushAndForce( ( stamp, ios, flushable ) ->
{
limiterStartLatch.release();
limiterBlockLatch.await();
return 0;
} );
return null;
} );

limiterStartLatch.await(); // Flusher is now stuck inside flushAndForce.
} // We should be able to unmap all the files.
// And then when the flusher resumes again, it should not throw any exceptions from the asynchronously
// closed files.
limiterBlockLatch.release();
flusher.get(); // This must not throw.
}

@Test( timeout = SHORT_TIMEOUT_MILLIS )
public void writesFlushedFromPageCacheMustBeExternallyObservable() throws IOException
{
Expand Down
Expand Up @@ -29,5 +29,4 @@ protected String getRecordFormatName()
{
return HighLimitWithSmallRecords.NAME;
}

}

0 comments on commit 076736f

Please sign in to comment.