From 44414170a38a58079d0b6ea0094e844a884efc43 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 16 May 2018 17:32:15 +0200 Subject: [PATCH] Backport or implement page cache features that are needed for experimental parallel flushing, and a rudimentary page cache warmer backport. --- .../org/neo4j/io/pagecache/PageCache.java | 14 +++- .../org/neo4j/io/pagecache/PagedFile.java | 5 ++ .../impl/muninn/BackgroundThreadExecutor.java | 9 ++- .../io/pagecache/impl/muninn/CursorPool.java | 2 +- .../impl/muninn/MuninnPageCache.java | 64 +++++++++++++++++-- .../impl/muninn/MuninnPagedFile.java | 3 +- .../pagecache/AdversarialPageCache.java | 6 ++ .../pagecache/AdversarialPagedFile.java | 7 ++ .../io/pagecache/DelegatingPageCache.java | 6 ++ .../io/pagecache/DelegatingPagedFile.java | 7 ++ .../org/neo4j/io/pagecache/PageCacheTest.java | 4 -- .../org/neo4j/io/pagecache/StubPagedFile.java | 7 ++ 12 files changed, 120 insertions(+), 14 deletions(-) diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java index faa88f46488fe..47fea5b8f6a5f 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java @@ -82,10 +82,12 @@ public interface PageCache extends AutoCloseable /** * List a snapshot of the current file mappings. *

- * The mappings can change as soon as this method returns. However, the returned {@link PagedFile}s will remain - * valid even if they are closed elsewhere. + * The mappings can change as soon as this method returns. *

- * NOTE: The calling code is responsible for closing all the returned paged files. + * NOTE: The calling code should not close the returned paged files, unless it does so + * in collaboration with the code that originally mapped the file. Any reference count in the mapping will + * not be incremented by this method, so calling code must be prepared for that the returned + * {@link PagedFile}s can be asynchronously closed elsewhere. * * @throws IOException if page cache has been closed or page eviction problems occur. */ @@ -126,6 +128,12 @@ public interface PageCache extends AutoCloseable */ int maxCachedPages(); + /** + * Report any thread-local events to the global page cache tracer, as if acquiring a thread-specific page cursor + * tracer, and reporting the events collected within it. + */ + void reportEvents(); + /** * Return a stream of {@link FileHandle file handles} for every file in the given directory, and its * sub-directories. diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java index 4c61030ab2b21..c5f1a79d9ae6b 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java @@ -140,6 +140,11 @@ public interface PagedFile extends AutoCloseable */ long fileSize() throws IOException; + /** + * Get the filename that is mapped by this {@code PagedFile}. + */ + File file(); + /** * Flush all dirty pages into the file channel, and force the file channel to disk. */ diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java index 66d3b7dddfb91..c13a81d87c5e2 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java @@ -19,8 +19,11 @@ */ package org.neo4j.io.pagecache.impl.muninn; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * An executor for the background threads for the page caches. @@ -34,7 +37,7 @@ final class BackgroundThreadExecutor implements Executor { static final BackgroundThreadExecutor INSTANCE = new BackgroundThreadExecutor(); - private final Executor executor; + private final ExecutorService executor; private BackgroundThreadExecutor() { @@ -47,4 +50,8 @@ public void execute( Runnable command ) executor.execute( command ); } + public Future submit( Callable callable ) + { + return executor.submit( callable ); + } } diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/CursorPool.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/CursorPool.java index 5dc6c53df7db9..7b4f82484d5db 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/CursorPool.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/CursorPool.java @@ -29,7 +29,7 @@ final class CursorPool extends ThreadLocal private final MuninnPagedFile pagedFile; private final long victimPage; private final PageCursorTracerSupplier pageCursorTracerSupplier; - private PageCacheTracer pageCacheTracer; + private final PageCacheTracer pageCacheTracer; private final VersionContextSupplier versionContextSupplier; /** diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java index 77961c9128b36..27df3afdd75ea 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.file.CopyOption; import java.nio.file.OpenOption; import java.nio.file.StandardOpenOption; @@ -28,7 +29,8 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -145,7 +147,7 @@ public class MuninnPageCache implements PageCache // the constructor of the PageCache, because the Executors have too many configuration options, many of which are // highly troublesome for our use case; caller-runs, bounded submission queues, bounded thread count, non-daemon // thread factories, etc. - private static final Executor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE; + private static final BackgroundThreadExecutor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE; private static final List ignoredOpenOptions = Arrays.asList( (OpenOption) StandardOpenOption.APPEND, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SPARSE ); @@ -480,7 +482,7 @@ private void assertNotMapped( File file, FileIsMappedException.Operation operati @Override public synchronized List listExistingMappings() throws IOException { - assertHealthy(); + assertNotClosed(); ensureThreadsInitialised(); List list = new ArrayList<>(); @@ -488,8 +490,9 @@ public synchronized List listExistingMappings() throws IOException while ( current != null ) { + // Note that we are NOT incrementing the reference count here. + // Calling code is expected to be able to deal with asynchronously closed PagedFiles. MuninnPagedFile pagedFile = current.pagedFile; - pagedFile.incrementRefCount(); list.add( pagedFile ); current = current.next; } @@ -610,6 +613,53 @@ public synchronized void flushAndForce( IOLimiter limiter ) throws IOException clearEvictorException(); } + public void experimentalFlushAndForceParallelUnsynchronised() throws IOException + { + assertNotClosed(); + List> flushes = new ArrayList<>(); + List files = listExistingMappings(); + for ( PagedFile file : files ) + { + MuninnPagedFile pagedFile = (MuninnPagedFile) file; + flushes.add( backgroundThreadExecutor.submit( () -> + { + try ( MajorFlushEvent fileFlush = pageCacheTracer.beginFileFlush( pagedFile.swapper ) ) + { + FlushEventOpportunity flushOpportunity = fileFlush.flushEventOpportunity(); + pagedFile.flushAndForceInternal( flushOpportunity, false, IOLimiter.unlimited() ); + } + catch ( ClosedChannelException e ) + { + if ( pagedFile.getRefCount() > 0 ) + { + // The file is not supposed to be closed, since we have a positive ref-count, yet we got a + // ClosedChannelException anyway? It's an odd situation, so let's tell the outside world about + // this failure. + throw e; + } + // Otherwise: The file was closed while we were trying to flush it. Since unmapping implies a flush + // anyway, we can safely assume that this is not a problem. The file was flushed, and it doesn't + // really matter how that happened. We'll ignore this exception. + } + return null; + } ) ); + } + + for ( Future flush : flushes ) + { + try + { + flush.get(); + } + catch ( InterruptedException | ExecutionException e ) + { + throw new IOException( e ); + } + } + syncDevice(); + clearEvictorException(); + } + private void flushAllPages( IOLimiter limiter ) throws IOException { try ( MajorFlushEvent cacheFlush = pageCacheTracer.beginCacheFlush() ) @@ -717,6 +767,12 @@ public int maxCachedPages() return pages.length; } + @Override + public void reportEvents() + { + pageCursorTracerSupplier.get().reportEvents(); + } + int getPageCacheId() { return pageCacheId; diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java index 411838ccb8761..21fb5cfd7ba76 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java @@ -203,7 +203,8 @@ public long fileSize() return (lastPageId + 1) * pageSize(); } - File file() + @Override + public File file() { return swapper.file(); } diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java index fb4c13c8c8f7e..273beada31e3d 100644 --- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java +++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java @@ -129,6 +129,12 @@ public int maxCachedPages() return delegate.maxCachedPages(); } + @Override + public void reportEvents() + { + delegate.reportEvents(); + } + @Override public Stream streamFilesRecursive( File directory ) throws IOException { diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java index 300a5954e9c1d..397859f1a98db 100644 --- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java +++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java @@ -19,6 +19,7 @@ */ package org.neo4j.adversaries.pagecache; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.ReadableByteChannel; @@ -76,6 +77,12 @@ public long fileSize() throws IOException return delegate.fileSize(); } + @Override + public File file() + { + return delegate.file(); + } + @Override public void flushAndForce() throws IOException { diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPageCache.java b/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPageCache.java index d9f0c99c15371..5dd9fce9cd1ab 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPageCache.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPageCache.java @@ -68,6 +68,12 @@ public int maxCachedPages() return delegate.maxCachedPages(); } + @Override + public void reportEvents() + { + delegate.reportEvents(); + } + @Override public Stream streamFilesRecursive( File directory ) throws IOException { diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPagedFile.java b/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPagedFile.java index 6217c16f03360..1d3e0505cad0e 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPagedFile.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/DelegatingPagedFile.java @@ -19,6 +19,7 @@ */ package org.neo4j.io.pagecache; +import java.io.File; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -58,6 +59,12 @@ public long fileSize() throws IOException return delegate.fileSize(); } + @Override + public File file() + { + return delegate.file(); + } + public void close() throws IOException { delegate.close(); diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java index 44633c20466bf..e97a3d1a163fc 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java @@ -1325,10 +1325,6 @@ public void mustListExistingMappings() throws Exception List existingMappings = pageCache.listExistingMappings(); assertThat( existingMappings.size(), is( 2 ) ); assertThat( existingMappings, containsInAnyOrder( pf1, pf2 ) ); - for ( PagedFile existingMapping : existingMappings ) - { - existingMapping.close(); - } } } diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/StubPagedFile.java b/community/io/src/test/java/org/neo4j/io/pagecache/StubPagedFile.java index e1d0148bcff0f..a058a14ae311c 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/StubPagedFile.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/StubPagedFile.java @@ -19,6 +19,7 @@ */ package org.neo4j.io.pagecache; +import java.io.File; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -64,6 +65,12 @@ public long fileSize() throws IOException return (lastPageId + 1) * pageSize(); } + @Override + public File file() + { + return null; + } + @Override public void flushAndForce() throws IOException {