Skip to content

Commit

Permalink
Backport or implement page cache features that are needed for experim…
Browse files Browse the repository at this point in the history
…ental parallel flushing, and a rudimentary page cache warmer backport.
  • Loading branch information
chrisvest committed May 16, 2018
1 parent 8f3e535 commit 4441417
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 14 deletions.
14 changes: 11 additions & 3 deletions community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
Expand Up @@ -82,10 +82,12 @@ public interface PageCache extends AutoCloseable
/**
* List a snapshot of the current file mappings.
* <p>
* The mappings can change as soon as this method returns. However, the returned {@link PagedFile}s will remain
* valid even if they are closed elsewhere.
* The mappings can change as soon as this method returns.
* <p>
* <strong>NOTE:</strong> The calling code is responsible for closing <em>all</em> the returned paged files.
* <strong>NOTE:</strong> The calling code should <em>not</em> close the returned paged files, unless it does so
* in collaboration with the code that originally mapped the file. Any reference count in the mapping will
* <em>not</em> be incremented by this method, so calling code must be prepared for that the returned
* {@link PagedFile}s can be asynchronously closed elsewhere.
*
* @throws IOException if page cache has been closed or page eviction problems occur.
*/
Expand Down Expand Up @@ -126,6 +128,12 @@ public interface PageCache extends AutoCloseable
*/
int maxCachedPages();

/**
* Report any thread-local events to the global page cache tracer, as if acquiring a thread-specific page cursor
* tracer, and reporting the events collected within it.
*/
void reportEvents();

/**
* Return a stream of {@link FileHandle file handles} for every file in the given directory, and its
* sub-directories.
Expand Down
Expand Up @@ -140,6 +140,11 @@ public interface PagedFile extends AutoCloseable
*/
long fileSize() throws IOException;

/**
* Get the filename that is mapped by this {@code PagedFile}.
*/
File file();

/**
* Flush all dirty pages into the file channel, and force the file channel to disk.
*/
Expand Down
Expand Up @@ -19,8 +19,11 @@
*/
package org.neo4j.io.pagecache.impl.muninn;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* An executor for the background threads for the page caches.
Expand All @@ -34,7 +37,7 @@ final class BackgroundThreadExecutor implements Executor
{
static final BackgroundThreadExecutor INSTANCE = new BackgroundThreadExecutor();

private final Executor executor;
private final ExecutorService executor;

private BackgroundThreadExecutor()
{
Expand All @@ -47,4 +50,8 @@ public void execute( Runnable command )
executor.execute( command );
}

public <T> Future<T> submit( Callable<T> callable )
{
return executor.submit( callable );
}
}
Expand Up @@ -29,7 +29,7 @@ final class CursorPool extends ThreadLocal<CursorPool.CursorSets>
private final MuninnPagedFile pagedFile;
private final long victimPage;
private final PageCursorTracerSupplier pageCursorTracerSupplier;
private PageCacheTracer pageCacheTracer;
private final PageCacheTracer pageCacheTracer;
private final VersionContextSupplier versionContextSupplier;

/**
Expand Down
Expand Up @@ -21,14 +21,16 @@

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.CopyOption;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -145,7 +147,7 @@ public class MuninnPageCache implements PageCache
// the constructor of the PageCache, because the Executors have too many configuration options, many of which are
// highly troublesome for our use case; caller-runs, bounded submission queues, bounded thread count, non-daemon
// thread factories, etc.
private static final Executor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE;
private static final BackgroundThreadExecutor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE;

private static final List<OpenOption> ignoredOpenOptions = Arrays.asList( (OpenOption) StandardOpenOption.APPEND,
StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SPARSE );
Expand Down Expand Up @@ -480,16 +482,17 @@ private void assertNotMapped( File file, FileIsMappedException.Operation operati
@Override
public synchronized List<PagedFile> listExistingMappings() throws IOException
{
assertHealthy();
assertNotClosed();
ensureThreadsInitialised();

List<PagedFile> list = new ArrayList<>();
FileMapping current = mappedFiles;

while ( current != null )
{
// Note that we are NOT incrementing the reference count here.
// Calling code is expected to be able to deal with asynchronously closed PagedFiles.
MuninnPagedFile pagedFile = current.pagedFile;
pagedFile.incrementRefCount();
list.add( pagedFile );
current = current.next;
}
Expand Down Expand Up @@ -610,6 +613,53 @@ public synchronized void flushAndForce( IOLimiter limiter ) throws IOException
clearEvictorException();
}

public void experimentalFlushAndForceParallelUnsynchronised() throws IOException
{
assertNotClosed();
List<Future<?>> flushes = new ArrayList<>();
List<PagedFile> files = listExistingMappings();
for ( PagedFile file : files )
{
MuninnPagedFile pagedFile = (MuninnPagedFile) file;
flushes.add( backgroundThreadExecutor.submit( () ->
{
try ( MajorFlushEvent fileFlush = pageCacheTracer.beginFileFlush( pagedFile.swapper ) )
{
FlushEventOpportunity flushOpportunity = fileFlush.flushEventOpportunity();
pagedFile.flushAndForceInternal( flushOpportunity, false, IOLimiter.unlimited() );
}
catch ( ClosedChannelException e )
{
if ( pagedFile.getRefCount() > 0 )
{
// The file is not supposed to be closed, since we have a positive ref-count, yet we got a
// ClosedChannelException anyway? It's an odd situation, so let's tell the outside world about
// this failure.
throw e;
}
// Otherwise: The file was closed while we were trying to flush it. Since unmapping implies a flush
// anyway, we can safely assume that this is not a problem. The file was flushed, and it doesn't
// really matter how that happened. We'll ignore this exception.
}
return null;
} ) );
}

for ( Future<?> flush : flushes )
{
try
{
flush.get();
}
catch ( InterruptedException | ExecutionException e )
{
throw new IOException( e );
}
}
syncDevice();
clearEvictorException();
}

private void flushAllPages( IOLimiter limiter ) throws IOException
{
try ( MajorFlushEvent cacheFlush = pageCacheTracer.beginCacheFlush() )
Expand Down Expand Up @@ -717,6 +767,12 @@ public int maxCachedPages()
return pages.length;
}

@Override
public void reportEvents()
{
pageCursorTracerSupplier.get().reportEvents();
}

int getPageCacheId()
{
return pageCacheId;
Expand Down
Expand Up @@ -203,7 +203,8 @@ public long fileSize()
return (lastPageId + 1) * pageSize();
}

File file()
@Override
public File file()
{
return swapper.file();
}
Expand Down
Expand Up @@ -129,6 +129,12 @@ public int maxCachedPages()
return delegate.maxCachedPages();
}

@Override
public void reportEvents()
{
delegate.reportEvents();
}

@Override
public Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.adversaries.pagecache;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
Expand Down Expand Up @@ -76,6 +77,12 @@ public long fileSize() throws IOException
return delegate.fileSize();
}

@Override
public File file()
{
return delegate.file();
}

@Override
public void flushAndForce() throws IOException
{
Expand Down
Expand Up @@ -68,6 +68,12 @@ public int maxCachedPages()
return delegate.maxCachedPages();
}

@Override
public void reportEvents()
{
delegate.reportEvents();
}

@Override
public Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.io.pagecache;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
Expand Down Expand Up @@ -58,6 +59,12 @@ public long fileSize() throws IOException
return delegate.fileSize();
}

@Override
public File file()
{
return delegate.file();
}

public void close() throws IOException
{
delegate.close();
Expand Down
Expand Up @@ -1325,10 +1325,6 @@ public void mustListExistingMappings() throws Exception
List<PagedFile> existingMappings = pageCache.listExistingMappings();
assertThat( existingMappings.size(), is( 2 ) );
assertThat( existingMappings, containsInAnyOrder( pf1, pf2 ) );
for ( PagedFile existingMapping : existingMappings )
{
existingMapping.close();
}
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.io.pagecache;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
Expand Down Expand Up @@ -64,6 +65,12 @@ public long fileSize() throws IOException
return (lastPageId + 1) * pageSize();
}

@Override
public File file()
{
return null;
}

@Override
public void flushAndForce() throws IOException
{
Expand Down

0 comments on commit 4441417

Please sign in to comment.