From 091815bb058e5190b07a3cb4571019d9e5162330 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 12 Feb 2016 17:15:52 +0100 Subject: [PATCH] Introduce IOLimiter and remove background page cache flusher The IOLimiter allows the CheckPointer to flush at a rate that is more sympathetic to other urgent tasks that need IO, such as appending to the transaction log during commit. The IOLimiter has, however, not yet been integrated with the CheckPointer. Removing the background flushing in the page cache also allows a change to the SequenceLock, to allow reads and writes to be concurrent with flushing. This has also not been implemented. --- .../org/neo4j/io/pagecache/IOLimiter.java | 81 +++++++++ .../org/neo4j/io/pagecache/PageCache.java | 7 + .../org/neo4j/io/pagecache/PagedFile.java | 11 +- .../io/pagecache/impl/muninn/FlushTask.java | 38 ---- .../impl/muninn/MuninnPageCache.java | 163 ++---------------- .../impl/muninn/MuninnPagedFile.java | 31 +++- .../pagecache/AdversarialPageCache.java | 8 + .../pagecache/AdversarialPagedFile.java | 8 + .../org/neo4j/io/pagecache/PageCacheTest.java | 140 ++++++++------- .../storecopy/ExternallyManagedPageCache.java | 7 + 10 files changed, 235 insertions(+), 259 deletions(-) create mode 100644 community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java delete mode 100644 community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java new file mode 100644 index 0000000000000..f32b5c59812b7 --- /dev/null +++ b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache; + +import java.io.Flushable; +import java.io.IOException; + +/** + * IOLimiter instances can be passed to the {@link PageCache#flushAndForce(IOLimiter)} and + * {@link PagedFile#flushAndForce(IOLimiter)} methods, which will invoke the + * {@link #maybeLimitIO(long, int, Flushable)} method on regular intervals. + *

+ * This allows the limiter to measure the rate of IO, and inject sleeps, pauses or flushes into the process. + * The flushes are in this case referring to the underlying hardware. + *

+ * Normally, flushing a channel will just copy the dirty buffers into the OS page cache, but flushing is in this case + * implying that the OS pages are cleared as well. In other words, the IOPSLimiter can make sure that the operating + * system does not pile up too much IO work in its page cache, by flushing those caches as well on regular intervals. + *

+ * The {@link #maybeLimitIO(long, int, Flushable)} method takes and returns a stamp long value. This allows the + * implementations to become stateless with respect to the individual flush, yet still keep context from one call to + * the next. + */ +public interface IOLimiter +{ + /** + * The value of the initial stamp; that is, what should be passed as the {@code previousStamp} to + * {@link #maybeLimitIO(long, int, Flushable)} on the first call in a flush. + */ + long INITIAL_STAMP = 0; + + /** + * Invoked at regular intervals during flushing of the {@link PageCache} or {@link PagedFile}s. + * + * For the first call in a flush, the {@code previousStamp} should have the {@link #INITIAL_STAMP} value. + * The return value of this method should then be used as the stamp of the next call. This allows implementations + * to be stateless, yet still keep some context around about a given flush, provided they can encode it as a + * {@code long}. + * + * The meaning of this long value is totally opaque to the caller, and can be anything the IOPSLimiter + * implementation desires. + * + * The implementation is allowed to force changes to the storage device using the given {@link Flushable}, or + * to perform {@link Thread#sleep(long) sleeps}, as it desires. It is not allowed to throw + * {@link InterruptedException}, however. Those should be dealt with by catching them and re-interrupting the + * current thread, or by wrapping them in {@link IOException}s. + * @param previousStamp The stamp from the previous call to this method, or {@link #INITIAL_STAMP} if this is the + * first call to this method for the given flush. + * @param recentlyCompletedIOs The number of IOs completed since the last call to this method. + * @param flushable A {@link Flushable} instance that can flush any relevant dirty system buffers, to help smooth + * out the IO load on the storage device. + * @return A new stamp to pass into the next call to this method. + */ + long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) throws IOException; + + /** + * An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the + * flush to go as fast as possible. + */ + static IOLimiter unlimited() + { + return ( previousStamp, recentlyCompletedIOs, flushable ) -> 0; + } +} diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java index 0725436be95b2..2d4c4013022d0 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java @@ -64,6 +64,13 @@ public interface PageCache extends AutoCloseable /** Flush all dirty pages */ void flushAndForce() throws IOException; + /** + * Flush all dirty pages, but limit the rate of IO as advised by the given IOPSLimiter. + * @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing + * process to keep the IO rate down. + */ + void flushAndForce( IOLimiter limiter ) throws IOException; + /** Flush all dirty pages and close the page cache. */ void close() throws IOException; diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java index 22a2441cfaef3..e31ac9484d6b8 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java @@ -127,12 +127,17 @@ public interface PagedFile extends AutoCloseable /** * Flush all dirty pages into the file channel, and force the file channel to disk. - *

- * Note: Flushing has to take locks on pages, so you cannot call flush - * while you have pages pinned. */ void flushAndForce() throws IOException; + /** + * Flush all dirty pages into the file channel, and force the file channel to disk, but limit the rate of IO as + * advised by the given IOPSLimiter. + * @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing + * process to keep the IO rate down. + */ + void flushAndForce( IOLimiter limiter ) throws IOException; + /** * Get the file-page-id of the last page in the file. *

diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java deleted file mode 100644 index faf408be87177..0000000000000 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.io.pagecache.impl.muninn; - -/** - * This runnable continuously flushes dirty pages in the background, such that future calls to - * {@link MuninnPageCache#flushAndForce()} have less work to do. - */ -final class FlushTask extends BackgroundTask -{ - public FlushTask( MuninnPageCache pageCache ) - { - super( pageCache ); - } - - @Override - protected void run( MuninnPageCache pageCache ) - { - pageCache.continuouslyFlushPages(); - } -} diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java index a90d550e766cf..7abfd11796a8f 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCacheOpenOptions; import org.neo4j.io.pagecache.PageSwapperFactory; @@ -45,9 +46,7 @@ import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil; import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag; -import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getDouble; import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getInteger; -import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getLong; /** * The Muninn {@link org.neo4j.io.pagecache.PageCache page cache} implementation. @@ -114,29 +113,6 @@ public class MuninnPageCache implements PageCache private static final int cooperativeEvictionLiveLockThreshold = getInteger( MuninnPageCache.class, "cooperativeEvictionLiveLockThreshold", 100 ); - private static final boolean backgroundFlushingEnabled = flag( - MuninnPageCache.class, "backgroundFlushingEnabled", true ); - - // The background flush task will only spend a certain amount of time doing IO, to avoid saturating the IO - // subsystem during times when there is more important work to be done. It will do this by measuring how much - // time it spends on each flush, and then accumulate a sleep debt. Once the sleep debt grows beyond this - // threshold, the flush task will pay it back. - private static final long backgroundFlushSleepDebtThreshold = getLong( - MuninnPageCache.class, "backgroundFlushSleepDebtThreshold", 10 ); - - // This ratio determines how the background flush task will spend its time. Specifically, it is a ratio of how - // much of its time will be spent doing IO. For instance, setting the ratio to 0.3 will make the flusher task - // spend 30% of its time doing IO, and 70% of its time sleeping. - private static final double backgroundFlushIoRatio = getDouble( - MuninnPageCache.class, "backgroundFlushIoRatio", 0.1 ); - - private static final long backgroundFlushBusyBreak = getLong( - MuninnPageCache.class, "backgroundFlushBusyBreak", 100 ); - private static final long backgroundFlushMediumBreak = getLong( - MuninnPageCache.class, "backgroundFlushMediumBreak", 200 ); - private static final long backgroundFlushLongBreak = getLong( - MuninnPageCache.class, "backgroundFlushLongBreak", 1000 ); - // This is a pre-allocated constant, so we can throw it without allocating any objects: @SuppressWarnings( "ThrowableInstanceNeverThrown" ) private static final IOException oomException = new IOException( @@ -414,10 +390,6 @@ private void ensureThreadsInitialised() throws IOException try { backgroundThreadExecutor.execute( new EvictionTask( this ) ); - if ( backgroundFlushingEnabled ) - { - backgroundThreadExecutor.execute( new FlushTask( this ) ); // TODO disable background flushing for good - } } catch ( Exception e ) { @@ -501,14 +473,24 @@ public void setPrintExceptionsOnClose( boolean enabled ) } @Override - public synchronized void flushAndForce() throws IOException + public void flushAndForce() throws IOException + { + flushAndForce( IOLimiter.unlimited() ); + } + + @Override + public synchronized void flushAndForce( IOLimiter limiter ) throws IOException { + if ( limiter == null ) + { + throw new IllegalArgumentException( "IOPSLimiter cannot be null" ); + } assertNotClosed(); - flushAllPages(); + flushAllPages( limiter ); clearEvictorException(); } - private void flushAllPages() throws IOException + private void flushAllPages( IOLimiter limiter ) throws IOException { try ( MajorFlushEvent cacheFlush = tracer.beginCacheFlush() ) { @@ -516,7 +498,7 @@ private void flushAllPages() throws IOException FileMapping fileMapping = mappedFiles; while ( fileMapping != null ) { - fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false ); + fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false, limiter ); fileMapping = fileMapping.next; } syncDevice(); @@ -952,121 +934,6 @@ private void checkBackgroundFlushPause() } } - /** - * Scan through all the pages, flushing the dirty ones. Aim to only spend at most 50% of its time doing IO, in an - * effort to avoid saturating the IO subsystem or steal precious IO resources from more important work. - */ - void continuouslyFlushPages() - { - Thread thread = Thread.currentThread(); - flushThread = thread; - - while ( !thread.isInterrupted() ) - { - long iterationSleepMillis = flushAtIORatio( backgroundFlushIoRatio ); - if ( iterationSleepMillis > 0 ) - { - LockSupport.parkNanos( this, TimeUnit.MILLISECONDS.toNanos( iterationSleepMillis ) ); - sleepDebtNanos = 0; - } - } - } - - private long flushAtIORatio( double ratio ) - { - Thread thread = Thread.currentThread(); - long sleepPaymentThreshold = TimeUnit.MILLISECONDS.toNanos( backgroundFlushSleepDebtThreshold ); - boolean seenDirtyPages = false; - boolean flushedPages = false; - double sleepFactor = (1 - ratio) / ratio; - - try ( MajorFlushEvent event = tracer.beginCacheFlush() ) - { - for ( MuninnPage page : pages ) - { - if ( page == null || thread.isInterrupted() ) - { - // Null pages means the page cache has been closed. - thread.interrupt(); - return 0; - } - - // The rate is the percentage of time that we want to spend doing IO. If the rate is 0.3, then we - // want to spend 30% of our time doing IO. We would then spend the other 70% of the time just - // sleeping. This means that for every IO we do, we measure how long it takes. We can then compute - // the amount of time we need to sleep. Basically, if we spend 30 microseconds doing IO, then we need - // to sleep for 70 microseconds, with the 0.3 ratio. To get the sleep time, we can divide the IO time - // T by the ratio R, and then multiply the result by 1 - R. This is equivalent to (T/R) - T = S. - // Then, because we don't want to sleep too frequently in too small intervals, we sum up our S's and - // only sleep when we have collected a sleep debt of at least 10 milliseconds. - // IO is not the only point of contention, however. Doing a flush also means that we have to take a - // pessimistic read-lock on the page, and if we do this on a page that is very popular for writing, - // then it can noticeably impact the performance of the database. Therefore, we check the dirtiness of - // a given page under and *optimistic* read lock, and we also decrement the usage counter to avoid - // aggressively flushing very popular pages. We need to carefully balance this, though, since we are - // at risk of the mutator threads performing so many writes that we can't decrement the usage - // counters fast enough to reach zero. - - // Skip the page if it is already write locked, or not dirty, or too popular. - boolean thisPageIsDirty; - if ( !(thisPageIsDirty = page.isDirty()) || !page.decrementUsage() ) - { - seenDirtyPages |= thisPageIsDirty; - continue; // Continue looping to the next page. - } - - if ( page.tryFreezeLock() ) - { - try - { - // Double-check that the page is still dirty. We could be racing with other flushing threads. - if ( !page.isDirty() ) - { - continue; // Continue looping to the next page. - } - - long startNanos = System.nanoTime(); - page.flush( event.flushEventOpportunity() ); - long elapsedNanos = System.nanoTime() - startNanos; - - sleepDebtNanos += elapsedNanos * sleepFactor; - flushedPages = true; - } - catch ( Throwable ignore ) - { - // The MuninnPage.flush method will keep the page dirty if flushing fails, and the eviction - // thread will eventually report the problem if its serious. Ergo, we can just ignore any and - // all exceptions, and move on to the next page. If we end up not getting anything done this - // iteration of flushAtIORatio, then that's fine too. - } - finally - { - page.unlockFreeze(); - } - } - - // Check if we've collected enough sleep debt, and if so, pay it back. - if ( sleepDebtNanos > sleepPaymentThreshold ) - { - LockSupport.parkNanos( sleepDebtNanos ); - sleepDebtNanos = 0; - } - - // Check if we've been asked to pause, because another thread wants to focus on flushing. - checkBackgroundFlushPause(); - } - } - - // We return an amount of time, in milliseconds, that we want to wait before we do the next iteration. If we - // have seen no dirty pages, then we can take a long break because the database is presumably not very busy - // with writing. If we have seen dirty pages and flushed some, then we can take a medium break since we've - // made some progress but we also need to keep up. If we have seen dirty pages and flushed none of them, then - // we shouldn't take any break, since we are falling behind the mutator threads. - return seenDirtyPages? - flushedPages? backgroundFlushMediumBreak : backgroundFlushBusyBreak - : backgroundFlushLongBreak; - } - @Override public String toString() { diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java index 82e45a97ca9cb..436512b29f1d0 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java @@ -20,8 +20,10 @@ package org.neo4j.io.pagecache.impl.muninn; import java.io.File; +import java.io.Flushable; import java.io.IOException; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PageEvictionCallback; import org.neo4j.io.pagecache.PageSwapper; @@ -194,27 +196,40 @@ void closeSwapper() throws IOException @Override public void flushAndForce() throws IOException { + flushAndForce( IOLimiter.unlimited() ); + } + + @Override + public void flushAndForce( IOLimiter limiter ) throws IOException + { + if ( limiter == null ) + { + throw new IllegalArgumentException( "IOPSLimiter cannot be null" ); + } try ( MajorFlushEvent flushEvent = tracer.beginFileFlush( swapper ) ) { - flushAndForceInternal( flushEvent.flushEventOpportunity(), false ); + flushAndForceInternal( flushEvent.flushEventOpportunity(), false, limiter ); syncDevice(); } } - public void flushAndForceForClose() throws IOException + void flushAndForceForClose() throws IOException { try ( MajorFlushEvent flushEvent = tracer.beginFileFlush( swapper ) ) { - flushAndForceInternal( flushEvent.flushEventOpportunity(), true ); + flushAndForceInternal( flushEvent.flushEventOpportunity(), true, IOLimiter.unlimited() ); syncDevice(); } } - void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forClosing ) throws IOException + void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forClosing, IOLimiter limiter ) + throws IOException { pageCache.pauseBackgroundFlushTask(); + Flushable flushable = swapper::force; MuninnPage[] pages = new MuninnPage[translationTableChunkSize]; long filePageId = -1; // Start at -1 because we increment at the *start* of the chunk-loop iteration. + long limiterStamp = IOLimiter.INITIAL_STAMP; try { for ( Object[] chunk : translationTable ) @@ -262,12 +277,15 @@ else if ( forClosing ) } if ( pagesGrabbed > 0 ) { - pagesGrabbed = vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing ); + vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing ); + limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, flushable ); + pagesGrabbed = 0; } } if ( pagesGrabbed > 0 ) { vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing ); + limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, flushable ); } } @@ -279,7 +297,7 @@ else if ( forClosing ) } } - private int vectoredFlush( + private void vectoredFlush( MuninnPage[] pages, int pagesGrabbed, FlushEventOpportunity flushOpportunity, boolean forClosing ) throws IOException { @@ -307,7 +325,6 @@ private int vectoredFlush( flush.done(); // There are now 0 'grabbed' pages - return 0; } catch ( IOException ioe ) { diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java index d956791039684..103fe66708e0f 100644 --- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java +++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java @@ -29,6 +29,7 @@ import java.util.Objects; import org.neo4j.adversaries.Adversary; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PagedFile; @@ -73,6 +74,13 @@ public void flushAndForce() throws IOException delegate.flushAndForce(); } + @Override + public void flushAndForce( IOLimiter limiter ) throws IOException + { + adversary.injectFailure( FileNotFoundException.class, IOException.class, SecurityException.class ); + delegate.flushAndForce( limiter ); + } + @Override public void close() throws IOException { diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java index 0e0d0f0be70bb..b5e7b035dac34 100644 --- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java +++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java @@ -24,6 +24,7 @@ import java.util.Objects; import org.neo4j.adversaries.Adversary; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; @@ -71,6 +72,13 @@ public void flushAndForce() throws IOException delegate.flushAndForce(); } + @Override + public void flushAndForce( IOLimiter limiter ) throws IOException + { + adversary.injectFailure( FileNotFoundException.class, IOException.class, SecurityException.class ); + delegate.flushAndForce( limiter ); + } + @Override public long getLastPageId() throws IOException { diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java index 1eb95fae5a964..1d6a9ad61a4c9 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java @@ -205,6 +205,80 @@ public void writesFlushedFromPageFileMustBeExternallyObservable() throws IOExcep pagedFile.close(); } + @Test( expected = IllegalArgumentException.class ) + public void pageCacheFlushAndForceMustThrowOnNullIOPSLimiter() throws Exception + { + PageCache cache = getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); + cache.flushAndForce( null ); + } + + @Test( expected = IllegalArgumentException.class ) + public void pagedFileFlushAndForceMustThrowOnNullIOPSLimiter() throws Exception + { + PageCache cache = getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); + try ( PagedFile pf = cache.map( file( "a" ), filePageSize ) ) + { + pf.flushAndForce( null ); + } + } + + @Test + public void pageCacheFlushAndForceMustQueryTheGivenIOPSLimiter() throws Exception + { + int pagesToDirty = 10_000; + PageCache cache = getPageCache( fs, nextPowerOf2( 2 * pagesToDirty ), pageCachePageSize, PageCacheTracer.NULL ); + PagedFile pfA = cache.map( existingFile( "a" ), filePageSize ); + PagedFile pfB = cache.map( existingFile( "b" ), filePageSize ); + + dirtyManyPages( pfA, pagesToDirty ); + dirtyManyPages( pfB, pagesToDirty ); + + AtomicInteger callbackCounter = new AtomicInteger(); + AtomicInteger ioCounter = new AtomicInteger(); + cache.flushAndForce( (previousStamp, recentlyCompletedIOs, swapper) -> { + ioCounter.addAndGet( recentlyCompletedIOs ); + return callbackCounter.getAndIncrement(); + }); + pfA.close(); + pfB.close(); + + assertThat( callbackCounter.get(), greaterThan( 0 ) ); + assertThat( ioCounter.get(), greaterThanOrEqualTo( pagesToDirty * 2 - 30 ) ); // -30 because of the eviction thread + } + + @Test + public void pagedFileFlushAndForceMustQueryTheGivenIOPSLimiter() throws Exception + { + int pagesToDirty = 10_000; + PageCache cache = getPageCache( fs, nextPowerOf2( pagesToDirty ), pageCachePageSize, PageCacheTracer.NULL ); + PagedFile pf = cache.map( file( "a" ), filePageSize ); + + // Dirty a bunch of data + dirtyManyPages( pf, pagesToDirty ); + + AtomicInteger callbackCounter = new AtomicInteger(); + AtomicInteger ioCounter = new AtomicInteger(); + pf.flushAndForce( (previousStamp, recentlyCompletedIOs, swapper) -> { + ioCounter.addAndGet( recentlyCompletedIOs ); + return callbackCounter.getAndIncrement(); + }); + pf.close(); + + assertThat( callbackCounter.get(), greaterThan( 0 ) ); + assertThat( ioCounter.get(), greaterThanOrEqualTo( pagesToDirty - 30 ) ); // -30 because of the eviction thread + } + + private void dirtyManyPages( PagedFile pf, int pagesToDirty ) throws IOException + { + try ( PageCursor cursor = pf.io( 0, PF_SHARED_WRITE_LOCK ) ) + { + for ( int i = 0; i < pagesToDirty; i++ ) + { + assertTrue( cursor.next() ); + } + } + } + @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) public void repeatablyWritesFlushedFromPageFileMustBeExternallyObservable() throws IOException { @@ -1609,10 +1683,7 @@ public void mustNotLiveLockIfWeRunOutOfEvictablePages() throws Exception } finally { - for ( PageCursor cursor : cursors ) - { - cursor.close(); - } + cursors.forEach( PageCursor::close ); } } } @@ -2005,13 +2076,7 @@ public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swappe } } - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) - { - for ( int i = 0; i < pinsForWrite; i++ ) - { - assertTrue( cursor.next() ); - } - } + dirtyManyPages( pagedFile, pinsForWrite ); } assertThat( "wrong read pin count", readCount.get(), is( pinsForRead ) ); @@ -2133,13 +2198,7 @@ public void lastPageIdMustIncreaseWhenScanningPastEndWithWriteLock() PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize ); assertThat( pagedFile.getLastPageId(), is( 9L ) ); - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) - { - for ( int i = 0; i < 15; i++ ) - { - assertTrue( cursor.next() ); - } - } + dirtyManyPages( pagedFile, 15 ); try { assertThat( pagedFile.getLastPageId(), is( 14L ) ); @@ -3245,51 +3304,6 @@ public void mustEvictPagesFromUnmappedFiles() throws Exception } } - @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) - public void mustFlushDirtyPagesInTheBackground() throws Exception - { - final CountDownLatch swapOutLatch = new CountDownLatch( 1 ); - PageSwapperFactory swapperFactory = new SingleFilePageSwapperFactory() - { - @Override - public PageSwapper createPageSwapper( - File file, int filePageSize, PageEvictionCallback onEviction, boolean createIfNotExist ) throws IOException - { - PageSwapper delegate = super.createPageSwapper( file, filePageSize, onEviction, createIfNotExist ); - return new DelegatingPageSwapper( delegate ) - { - @Override - public long write( long filePageId, Page page ) throws IOException - { - try - { - return super.write( filePageId, page ); - } - finally - { - swapOutLatch.countDown(); - } - } - }; - } - }; - swapperFactory.setFileSystemAbstraction( fs ); - - try ( PageCache pageCache = createPageCache( - swapperFactory, maxPages, pageCachePageSize, PageCacheTracer.NULL ); - PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize ) ) - { - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) - { - assertTrue( cursor.next() ); - writeRecords( cursor ); - } - - swapOutLatch.await(); - verifyRecordsInFile( file( "a" ), recordsPerFilePage ); - } - } - @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) public void mustReadZerosFromBeyondEndOfFile() throws Exception { diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java index 3fd1295dd3423..5097f46819507 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java @@ -27,6 +27,7 @@ import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.impl.enterprise.EnterpriseFacadeFactory; @@ -70,6 +71,12 @@ public void flushAndForce() throws IOException delegate.flushAndForce(); } + @Override + public void flushAndForce( IOLimiter limiter ) throws IOException + { + delegate.flushAndForce( limiter ); + } + @Override public int pageSize() {