Skip to content

Commit

Permalink
Fix a datarace bug in CC StreamToDisk.
Browse files Browse the repository at this point in the history
The lifecycle of the StreamToDisk and the resources it opens, such as
PagedFiles and PagedWritableByteChannels, are managed separately from
the threads that are writing file contents through it.

Previously, the StreamToDisk implementation was oblivious to this
concurrency, and would freely allow the write threads to race with the
closing of the StreamToDisk instance.

This could ultimately lead to write page cursors leaking, as in, not
being closed when the StreamToDisk instance was closed.

When a write page cursor was leaked, it would cause a page in the page
cache (usually the last page of a file) to be permanently write locked,
and this in turn would cause all future read page cursors that would
access that page, to get stuck in infinite shouldRetry-loops.

Since this would always happen to the last page in a file, the database
would get stuck in these infinite loops on startup, as part of the
highId scanning, or the GBPTree crash recovery scanning.

This bug is fixed by making the opening and closing of files in
StreamToDisk atomic and mutually exclusive. Furthermore, the writing to
a paged file, and the closing of the paged file, are made mutually
exclusive. Both of these pairs of operations are protected by each their
own monitor locks per file, so the IO parallelism is largely preserved.

Additionally, the fields in the TrackingResponseHandler that are updated
when a store copy is initiated, have been made volatile to ensure that
their updates are visible to the IO threads that will be doing the
writing. This ensures that the network IO ends up with the correct
handlers, and thus the current StreamToDisk instance, in case multiple
instances end up being juggled at runtime.
  • Loading branch information
chrisvest committed Jan 22, 2018
1 parent b4ed4ca commit 18257d8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 26 deletions.
Expand Up @@ -35,10 +35,10 @@
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
class TrackingResponseHandler implements CatchUpResponseHandler class TrackingResponseHandler implements CatchUpResponseHandler
{ {
private CatchUpResponseCallback delegate;
private CompletableFuture<?> requestOutcomeSignal = new CompletableFuture<>();
private final Clock clock; private final Clock clock;
private Long lastResponseTime; private volatile CatchUpResponseCallback delegate;
private volatile CompletableFuture<?> requestOutcomeSignal = new CompletableFuture<>();
private volatile Long lastResponseTime;


TrackingResponseHandler( CatchUpResponseCallback delegate, Clock clock ) TrackingResponseHandler( CatchUpResponseCallback delegate, Clock clock )
{ {
Expand Down
Expand Up @@ -44,9 +44,9 @@ class StreamToDisk implements StoreFileStreams
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final PageCache pageCache; private final PageCache pageCache;
private final FileCopyMonitor fileCopyMonitor; private final FileCopyMonitor fileCopyMonitor;
private final Map<String,WritableByteChannel> channels;
private final Map<String,PagedFile> pagedFiles;
private final PageCacheFlusher flusherThread; private final PageCacheFlusher flusherThread;
private final Map<String,PageCacheDestination> channels;
private boolean closed;


StreamToDisk( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Monitors monitors ) throws IOException StreamToDisk( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Monitors monitors ) throws IOException
{ {
Expand All @@ -58,7 +58,6 @@ class StreamToDisk implements StoreFileStreams
fs.mkdirs( storeDir ); fs.mkdirs( storeDir );
this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class ); this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class );
channels = new HashMap<>(); channels = new HashMap<>();
pagedFiles = new HashMap<>();
} }


@Override @Override
Expand All @@ -70,41 +69,98 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro
fileCopyMonitor.copyFile( fileName ); fileCopyMonitor.copyFile( fileName );
if ( StoreType.shouldBeManagedByPageCache( destination ) ) if ( StoreType.shouldBeManagedByPageCache( destination ) )
{ {
WritableByteChannel channel = channels.get( destination ); PageCacheDestination dest = getPageCacheDestination( destination, requiredAlignment, fileName );
if ( channel == null ) dest.write( data );
}
else
{
try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) )
{ {
int filePageSize = pageCache.pageSize() - pageCache.pageSize() % requiredAlignment; outputStream.write( data );
PagedFile pagedFile = pageCache.map( fileName, filePageSize, StandardOpenOption.CREATE );
channel = pagedFile.openWritableByteChannel();
pagedFiles.put( destination, pagedFile );
channels.put( destination, channel );
} }
}
}

private synchronized PageCacheDestination getPageCacheDestination(
String destination, int requiredAlignment, File fileName ) throws IOException
{
if ( closed )
{
throw new IOException( "Destination has been closed: " + fileName );
}
PageCacheDestination dest = channels.get( destination );
if ( dest == null )
{
dest = new PageCacheDestination( pageCache, fileName, requiredAlignment );
channels.put( destination, dest );
}
return dest;
}

private static final class PageCacheDestination implements Closeable
{
private final PagedFile pagedFile;
private final WritableByteChannel channel;


ByteBuffer buffer = ByteBuffer.wrap( data ); PageCacheDestination( PageCache pageCache, File fileName, int requiredAlignment ) throws IOException
while ( buffer.hasRemaining() ) {
int filePageSize = pageCache.pageSize() - pageCache.pageSize() % requiredAlignment;
pagedFile = pageCache.map( fileName, filePageSize, StandardOpenOption.CREATE );
try
{ {
channel.write( buffer ); channel = pagedFile.openWritableByteChannel();
}
catch ( IOException channelException )
{
try
{
pagedFile.close();
}
catch ( IOException pagedFileException )
{
channelException.addSuppressed( pagedFileException );
}
channelException.printStackTrace();
throw channelException;
} }
} }
else
public synchronized void write( byte[] data ) throws IOException
{ {
try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) ) ByteBuffer buf = ByteBuffer.wrap( data );
while ( buf.hasRemaining() )
{ {
outputStream.write( data ); channel.write( buf );
} }
} }

@Override
public synchronized void close() throws IOException
{
IOUtils.closeAll( channel, pagedFile );
}
} }


@Override @Override
public void close() throws IOException public synchronized void close() throws IOException
{ {
//noinspection EmptyTryBlock,unused closed = true;
try ( Closeable chans = () -> IOUtils.closeAll( channels.values() ); try
Closeable pfs = () -> IOUtils.closeAll( pagedFiles.values() ); {
Closeable flusher = flusherThread::halt ) flusherThread.halt();
}
catch ( Exception haltException )
{ {
// The combination of try-with-resources and closeAll will ensure that all resources are closed, try
// regardless of any exceptions thrown, and that any exceptions will be properly combined. {
IOUtils.closeAll( channels.values() );
}
catch ( IOException channelsException )
{
haltException.addSuppressed( channelsException );
}
throw haltException;
} }
IOUtils.closeAll( channels.values() );
} }
} }

0 comments on commit 18257d8

Please sign in to comment.