diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java
new file mode 100644
index 0000000000000..f32b5c59812b7
--- /dev/null
+++ b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.io.pagecache;
+
+import java.io.Flushable;
+import java.io.IOException;
+
+/**
+ * IOLimiter instances can be passed to the {@link PageCache#flushAndForce(IOLimiter)} and
+ * {@link PagedFile#flushAndForce(IOLimiter)} methods, which will invoke the
+ * {@link #maybeLimitIO(long, int, Flushable)} method on regular intervals.
+ *
+ * This allows the limiter to measure the rate of IO, and inject sleeps, pauses or flushes into the process.
+ * The flushes are in this case referring to the underlying hardware.
+ *
+ * Normally, flushing a channel will just copy the dirty buffers into the OS page cache, but flushing is in this case
+ * implying that the OS pages are cleared as well. In other words, the IOPSLimiter can make sure that the operating
+ * system does not pile up too much IO work in its page cache, by flushing those caches as well on regular intervals.
+ *
+ * The {@link #maybeLimitIO(long, int, Flushable)} method takes and returns a stamp long value. This allows the
+ * implementations to become stateless with respect to the individual flush, yet still keep context from one call to
+ * the next.
+ */
+public interface IOLimiter
+{
+ /**
+ * The value of the initial stamp; that is, what should be passed as the {@code previousStamp} to
+ * {@link #maybeLimitIO(long, int, Flushable)} on the first call in a flush.
+ */
+ long INITIAL_STAMP = 0;
+
+ /**
+ * Invoked at regular intervals during flushing of the {@link PageCache} or {@link PagedFile}s.
+ *
+ * For the first call in a flush, the {@code previousStamp} should have the {@link #INITIAL_STAMP} value.
+ * The return value of this method should then be used as the stamp of the next call. This allows implementations
+ * to be stateless, yet still keep some context around about a given flush, provided they can encode it as a
+ * {@code long}.
+ *
+ * The meaning of this long value is totally opaque to the caller, and can be anything the IOPSLimiter
+ * implementation desires.
+ *
+ * The implementation is allowed to force changes to the storage device using the given {@link Flushable}, or
+ * to perform {@link Thread#sleep(long) sleeps}, as it desires. It is not allowed to throw
+ * {@link InterruptedException}, however. Those should be dealt with by catching them and re-interrupting the
+ * current thread, or by wrapping them in {@link IOException}s.
+ * @param previousStamp The stamp from the previous call to this method, or {@link #INITIAL_STAMP} if this is the
+ * first call to this method for the given flush.
+ * @param recentlyCompletedIOs The number of IOs completed since the last call to this method.
+ * @param flushable A {@link Flushable} instance that can flush any relevant dirty system buffers, to help smooth
+ * out the IO load on the storage device.
+ * @return A new stamp to pass into the next call to this method.
+ */
+ long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) throws IOException;
+
+ /**
+ * An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the
+ * flush to go as fast as possible.
+ */
+ static IOLimiter unlimited()
+ {
+ return ( previousStamp, recentlyCompletedIOs, flushable ) -> 0;
+ }
+}
diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
index 0725436be95b2..2d4c4013022d0 100644
--- a/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
+++ b/community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
@@ -64,6 +64,13 @@ public interface PageCache extends AutoCloseable
/** Flush all dirty pages */
void flushAndForce() throws IOException;
+ /**
+ * Flush all dirty pages, but limit the rate of IO as advised by the given IOPSLimiter.
+ * @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing
+ * process to keep the IO rate down.
+ */
+ void flushAndForce( IOLimiter limiter ) throws IOException;
+
/** Flush all dirty pages and close the page cache. */
void close() throws IOException;
diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java
index 22a2441cfaef3..e31ac9484d6b8 100644
--- a/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java
+++ b/community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java
@@ -127,12 +127,17 @@ public interface PagedFile extends AutoCloseable
/**
* Flush all dirty pages into the file channel, and force the file channel to disk.
- *
- * Note: Flushing has to take locks on pages, so you cannot call flush
- * while you have pages pinned.
*/
void flushAndForce() throws IOException;
+ /**
+ * Flush all dirty pages into the file channel, and force the file channel to disk, but limit the rate of IO as
+ * advised by the given IOPSLimiter.
+ * @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing
+ * process to keep the IO rate down.
+ */
+ void flushAndForce( IOLimiter limiter ) throws IOException;
+
/**
* Get the file-page-id of the last page in the file.
*
diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java
deleted file mode 100644
index faf408be87177..0000000000000
--- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/FlushTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.io.pagecache.impl.muninn;
-
-/**
- * This runnable continuously flushes dirty pages in the background, such that future calls to
- * {@link MuninnPageCache#flushAndForce()} have less work to do.
- */
-final class FlushTask extends BackgroundTask
-{
- public FlushTask( MuninnPageCache pageCache )
- {
- super( pageCache );
- }
-
- @Override
- protected void run( MuninnPageCache pageCache )
- {
- pageCache.continuouslyFlushPages();
- }
-}
diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java
index a90d550e766cf..7abfd11796a8f 100644
--- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java
+++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java
@@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
+import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCacheOpenOptions;
import org.neo4j.io.pagecache.PageSwapperFactory;
@@ -45,9 +46,7 @@
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;
-import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getDouble;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getInteger;
-import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getLong;
/**
* The Muninn {@link org.neo4j.io.pagecache.PageCache page cache} implementation.
@@ -114,29 +113,6 @@ public class MuninnPageCache implements PageCache
private static final int cooperativeEvictionLiveLockThreshold = getInteger(
MuninnPageCache.class, "cooperativeEvictionLiveLockThreshold", 100 );
- private static final boolean backgroundFlushingEnabled = flag(
- MuninnPageCache.class, "backgroundFlushingEnabled", true );
-
- // The background flush task will only spend a certain amount of time doing IO, to avoid saturating the IO
- // subsystem during times when there is more important work to be done. It will do this by measuring how much
- // time it spends on each flush, and then accumulate a sleep debt. Once the sleep debt grows beyond this
- // threshold, the flush task will pay it back.
- private static final long backgroundFlushSleepDebtThreshold = getLong(
- MuninnPageCache.class, "backgroundFlushSleepDebtThreshold", 10 );
-
- // This ratio determines how the background flush task will spend its time. Specifically, it is a ratio of how
- // much of its time will be spent doing IO. For instance, setting the ratio to 0.3 will make the flusher task
- // spend 30% of its time doing IO, and 70% of its time sleeping.
- private static final double backgroundFlushIoRatio = getDouble(
- MuninnPageCache.class, "backgroundFlushIoRatio", 0.1 );
-
- private static final long backgroundFlushBusyBreak = getLong(
- MuninnPageCache.class, "backgroundFlushBusyBreak", 100 );
- private static final long backgroundFlushMediumBreak = getLong(
- MuninnPageCache.class, "backgroundFlushMediumBreak", 200 );
- private static final long backgroundFlushLongBreak = getLong(
- MuninnPageCache.class, "backgroundFlushLongBreak", 1000 );
-
// This is a pre-allocated constant, so we can throw it without allocating any objects:
@SuppressWarnings( "ThrowableInstanceNeverThrown" )
private static final IOException oomException = new IOException(
@@ -414,10 +390,6 @@ private void ensureThreadsInitialised() throws IOException
try
{
backgroundThreadExecutor.execute( new EvictionTask( this ) );
- if ( backgroundFlushingEnabled )
- {
- backgroundThreadExecutor.execute( new FlushTask( this ) ); // TODO disable background flushing for good
- }
}
catch ( Exception e )
{
@@ -501,14 +473,24 @@ public void setPrintExceptionsOnClose( boolean enabled )
}
@Override
- public synchronized void flushAndForce() throws IOException
+ public void flushAndForce() throws IOException
+ {
+ flushAndForce( IOLimiter.unlimited() );
+ }
+
+ @Override
+ public synchronized void flushAndForce( IOLimiter limiter ) throws IOException
{
+ if ( limiter == null )
+ {
+ throw new IllegalArgumentException( "IOPSLimiter cannot be null" );
+ }
assertNotClosed();
- flushAllPages();
+ flushAllPages( limiter );
clearEvictorException();
}
- private void flushAllPages() throws IOException
+ private void flushAllPages( IOLimiter limiter ) throws IOException
{
try ( MajorFlushEvent cacheFlush = tracer.beginCacheFlush() )
{
@@ -516,7 +498,7 @@ private void flushAllPages() throws IOException
FileMapping fileMapping = mappedFiles;
while ( fileMapping != null )
{
- fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false );
+ fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false, limiter );
fileMapping = fileMapping.next;
}
syncDevice();
@@ -952,121 +934,6 @@ private void checkBackgroundFlushPause()
}
}
- /**
- * Scan through all the pages, flushing the dirty ones. Aim to only spend at most 50% of its time doing IO, in an
- * effort to avoid saturating the IO subsystem or steal precious IO resources from more important work.
- */
- void continuouslyFlushPages()
- {
- Thread thread = Thread.currentThread();
- flushThread = thread;
-
- while ( !thread.isInterrupted() )
- {
- long iterationSleepMillis = flushAtIORatio( backgroundFlushIoRatio );
- if ( iterationSleepMillis > 0 )
- {
- LockSupport.parkNanos( this, TimeUnit.MILLISECONDS.toNanos( iterationSleepMillis ) );
- sleepDebtNanos = 0;
- }
- }
- }
-
- private long flushAtIORatio( double ratio )
- {
- Thread thread = Thread.currentThread();
- long sleepPaymentThreshold = TimeUnit.MILLISECONDS.toNanos( backgroundFlushSleepDebtThreshold );
- boolean seenDirtyPages = false;
- boolean flushedPages = false;
- double sleepFactor = (1 - ratio) / ratio;
-
- try ( MajorFlushEvent event = tracer.beginCacheFlush() )
- {
- for ( MuninnPage page : pages )
- {
- if ( page == null || thread.isInterrupted() )
- {
- // Null pages means the page cache has been closed.
- thread.interrupt();
- return 0;
- }
-
- // The rate is the percentage of time that we want to spend doing IO. If the rate is 0.3, then we
- // want to spend 30% of our time doing IO. We would then spend the other 70% of the time just
- // sleeping. This means that for every IO we do, we measure how long it takes. We can then compute
- // the amount of time we need to sleep. Basically, if we spend 30 microseconds doing IO, then we need
- // to sleep for 70 microseconds, with the 0.3 ratio. To get the sleep time, we can divide the IO time
- // T by the ratio R, and then multiply the result by 1 - R. This is equivalent to (T/R) - T = S.
- // Then, because we don't want to sleep too frequently in too small intervals, we sum up our S's and
- // only sleep when we have collected a sleep debt of at least 10 milliseconds.
- // IO is not the only point of contention, however. Doing a flush also means that we have to take a
- // pessimistic read-lock on the page, and if we do this on a page that is very popular for writing,
- // then it can noticeably impact the performance of the database. Therefore, we check the dirtiness of
- // a given page under and *optimistic* read lock, and we also decrement the usage counter to avoid
- // aggressively flushing very popular pages. We need to carefully balance this, though, since we are
- // at risk of the mutator threads performing so many writes that we can't decrement the usage
- // counters fast enough to reach zero.
-
- // Skip the page if it is already write locked, or not dirty, or too popular.
- boolean thisPageIsDirty;
- if ( !(thisPageIsDirty = page.isDirty()) || !page.decrementUsage() )
- {
- seenDirtyPages |= thisPageIsDirty;
- continue; // Continue looping to the next page.
- }
-
- if ( page.tryFreezeLock() )
- {
- try
- {
- // Double-check that the page is still dirty. We could be racing with other flushing threads.
- if ( !page.isDirty() )
- {
- continue; // Continue looping to the next page.
- }
-
- long startNanos = System.nanoTime();
- page.flush( event.flushEventOpportunity() );
- long elapsedNanos = System.nanoTime() - startNanos;
-
- sleepDebtNanos += elapsedNanos * sleepFactor;
- flushedPages = true;
- }
- catch ( Throwable ignore )
- {
- // The MuninnPage.flush method will keep the page dirty if flushing fails, and the eviction
- // thread will eventually report the problem if its serious. Ergo, we can just ignore any and
- // all exceptions, and move on to the next page. If we end up not getting anything done this
- // iteration of flushAtIORatio, then that's fine too.
- }
- finally
- {
- page.unlockFreeze();
- }
- }
-
- // Check if we've collected enough sleep debt, and if so, pay it back.
- if ( sleepDebtNanos > sleepPaymentThreshold )
- {
- LockSupport.parkNanos( sleepDebtNanos );
- sleepDebtNanos = 0;
- }
-
- // Check if we've been asked to pause, because another thread wants to focus on flushing.
- checkBackgroundFlushPause();
- }
- }
-
- // We return an amount of time, in milliseconds, that we want to wait before we do the next iteration. If we
- // have seen no dirty pages, then we can take a long break because the database is presumably not very busy
- // with writing. If we have seen dirty pages and flushed some, then we can take a medium break since we've
- // made some progress but we also need to keep up. If we have seen dirty pages and flushed none of them, then
- // we shouldn't take any break, since we are falling behind the mutator threads.
- return seenDirtyPages?
- flushedPages? backgroundFlushMediumBreak : backgroundFlushBusyBreak
- : backgroundFlushLongBreak;
- }
-
@Override
public String toString()
{
diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java
index 82e45a97ca9cb..436512b29f1d0 100644
--- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java
+++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java
@@ -20,8 +20,10 @@
package org.neo4j.io.pagecache.impl.muninn;
import java.io.File;
+import java.io.Flushable;
import java.io.IOException;
+import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PageEvictionCallback;
import org.neo4j.io.pagecache.PageSwapper;
@@ -194,27 +196,40 @@ void closeSwapper() throws IOException
@Override
public void flushAndForce() throws IOException
{
+ flushAndForce( IOLimiter.unlimited() );
+ }
+
+ @Override
+ public void flushAndForce( IOLimiter limiter ) throws IOException
+ {
+ if ( limiter == null )
+ {
+ throw new IllegalArgumentException( "IOPSLimiter cannot be null" );
+ }
try ( MajorFlushEvent flushEvent = tracer.beginFileFlush( swapper ) )
{
- flushAndForceInternal( flushEvent.flushEventOpportunity(), false );
+ flushAndForceInternal( flushEvent.flushEventOpportunity(), false, limiter );
syncDevice();
}
}
- public void flushAndForceForClose() throws IOException
+ void flushAndForceForClose() throws IOException
{
try ( MajorFlushEvent flushEvent = tracer.beginFileFlush( swapper ) )
{
- flushAndForceInternal( flushEvent.flushEventOpportunity(), true );
+ flushAndForceInternal( flushEvent.flushEventOpportunity(), true, IOLimiter.unlimited() );
syncDevice();
}
}
- void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forClosing ) throws IOException
+ void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forClosing, IOLimiter limiter )
+ throws IOException
{
pageCache.pauseBackgroundFlushTask();
+ Flushable flushable = swapper::force;
MuninnPage[] pages = new MuninnPage[translationTableChunkSize];
long filePageId = -1; // Start at -1 because we increment at the *start* of the chunk-loop iteration.
+ long limiterStamp = IOLimiter.INITIAL_STAMP;
try
{
for ( Object[] chunk : translationTable )
@@ -262,12 +277,15 @@ else if ( forClosing )
}
if ( pagesGrabbed > 0 )
{
- pagesGrabbed = vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing );
+ vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing );
+ limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, flushable );
+ pagesGrabbed = 0;
}
}
if ( pagesGrabbed > 0 )
{
vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing );
+ limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, flushable );
}
}
@@ -279,7 +297,7 @@ else if ( forClosing )
}
}
- private int vectoredFlush(
+ private void vectoredFlush(
MuninnPage[] pages, int pagesGrabbed, FlushEventOpportunity flushOpportunity, boolean forClosing )
throws IOException
{
@@ -307,7 +325,6 @@ private int vectoredFlush(
flush.done();
// There are now 0 'grabbed' pages
- return 0;
}
catch ( IOException ioe )
{
diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java
index d956791039684..103fe66708e0f 100644
--- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java
+++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPageCache.java
@@ -29,6 +29,7 @@
import java.util.Objects;
import org.neo4j.adversaries.Adversary;
+import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
@@ -73,6 +74,13 @@ public void flushAndForce() throws IOException
delegate.flushAndForce();
}
+ @Override
+ public void flushAndForce( IOLimiter limiter ) throws IOException
+ {
+ adversary.injectFailure( FileNotFoundException.class, IOException.class, SecurityException.class );
+ delegate.flushAndForce( limiter );
+ }
+
@Override
public void close() throws IOException
{
diff --git a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java
index 0e0d0f0be70bb..b5e7b035dac34 100644
--- a/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java
+++ b/community/io/src/test/java/org/neo4j/adversaries/pagecache/AdversarialPagedFile.java
@@ -24,6 +24,7 @@
import java.util.Objects;
import org.neo4j.adversaries.Adversary;
+import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
@@ -71,6 +72,13 @@ public void flushAndForce() throws IOException
delegate.flushAndForce();
}
+ @Override
+ public void flushAndForce( IOLimiter limiter ) throws IOException
+ {
+ adversary.injectFailure( FileNotFoundException.class, IOException.class, SecurityException.class );
+ delegate.flushAndForce( limiter );
+ }
+
@Override
public long getLastPageId() throws IOException
{
diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java
index 1eb95fae5a964..1d6a9ad61a4c9 100644
--- a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java
+++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java
@@ -205,6 +205,80 @@ public void writesFlushedFromPageFileMustBeExternallyObservable() throws IOExcep
pagedFile.close();
}
+ @Test( expected = IllegalArgumentException.class )
+ public void pageCacheFlushAndForceMustThrowOnNullIOPSLimiter() throws Exception
+ {
+ PageCache cache = getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL );
+ cache.flushAndForce( null );
+ }
+
+ @Test( expected = IllegalArgumentException.class )
+ public void pagedFileFlushAndForceMustThrowOnNullIOPSLimiter() throws Exception
+ {
+ PageCache cache = getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL );
+ try ( PagedFile pf = cache.map( file( "a" ), filePageSize ) )
+ {
+ pf.flushAndForce( null );
+ }
+ }
+
+ @Test
+ public void pageCacheFlushAndForceMustQueryTheGivenIOPSLimiter() throws Exception
+ {
+ int pagesToDirty = 10_000;
+ PageCache cache = getPageCache( fs, nextPowerOf2( 2 * pagesToDirty ), pageCachePageSize, PageCacheTracer.NULL );
+ PagedFile pfA = cache.map( existingFile( "a" ), filePageSize );
+ PagedFile pfB = cache.map( existingFile( "b" ), filePageSize );
+
+ dirtyManyPages( pfA, pagesToDirty );
+ dirtyManyPages( pfB, pagesToDirty );
+
+ AtomicInteger callbackCounter = new AtomicInteger();
+ AtomicInteger ioCounter = new AtomicInteger();
+ cache.flushAndForce( (previousStamp, recentlyCompletedIOs, swapper) -> {
+ ioCounter.addAndGet( recentlyCompletedIOs );
+ return callbackCounter.getAndIncrement();
+ });
+ pfA.close();
+ pfB.close();
+
+ assertThat( callbackCounter.get(), greaterThan( 0 ) );
+ assertThat( ioCounter.get(), greaterThanOrEqualTo( pagesToDirty * 2 - 30 ) ); // -30 because of the eviction thread
+ }
+
+ @Test
+ public void pagedFileFlushAndForceMustQueryTheGivenIOPSLimiter() throws Exception
+ {
+ int pagesToDirty = 10_000;
+ PageCache cache = getPageCache( fs, nextPowerOf2( pagesToDirty ), pageCachePageSize, PageCacheTracer.NULL );
+ PagedFile pf = cache.map( file( "a" ), filePageSize );
+
+ // Dirty a bunch of data
+ dirtyManyPages( pf, pagesToDirty );
+
+ AtomicInteger callbackCounter = new AtomicInteger();
+ AtomicInteger ioCounter = new AtomicInteger();
+ pf.flushAndForce( (previousStamp, recentlyCompletedIOs, swapper) -> {
+ ioCounter.addAndGet( recentlyCompletedIOs );
+ return callbackCounter.getAndIncrement();
+ });
+ pf.close();
+
+ assertThat( callbackCounter.get(), greaterThan( 0 ) );
+ assertThat( ioCounter.get(), greaterThanOrEqualTo( pagesToDirty - 30 ) ); // -30 because of the eviction thread
+ }
+
+ private void dirtyManyPages( PagedFile pf, int pagesToDirty ) throws IOException
+ {
+ try ( PageCursor cursor = pf.io( 0, PF_SHARED_WRITE_LOCK ) )
+ {
+ for ( int i = 0; i < pagesToDirty; i++ )
+ {
+ assertTrue( cursor.next() );
+ }
+ }
+ }
+
@Test( timeout = SEMI_LONG_TIMEOUT_MILLIS )
public void repeatablyWritesFlushedFromPageFileMustBeExternallyObservable() throws IOException
{
@@ -1609,10 +1683,7 @@ public void mustNotLiveLockIfWeRunOutOfEvictablePages() throws Exception
}
finally
{
- for ( PageCursor cursor : cursors )
- {
- cursor.close();
- }
+ cursors.forEach( PageCursor::close );
}
}
}
@@ -2005,13 +2076,7 @@ public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swappe
}
}
- try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
- {
- for ( int i = 0; i < pinsForWrite; i++ )
- {
- assertTrue( cursor.next() );
- }
- }
+ dirtyManyPages( pagedFile, pinsForWrite );
}
assertThat( "wrong read pin count", readCount.get(), is( pinsForRead ) );
@@ -2133,13 +2198,7 @@ public void lastPageIdMustIncreaseWhenScanningPastEndWithWriteLock()
PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize );
assertThat( pagedFile.getLastPageId(), is( 9L ) );
- try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
- {
- for ( int i = 0; i < 15; i++ )
- {
- assertTrue( cursor.next() );
- }
- }
+ dirtyManyPages( pagedFile, 15 );
try
{
assertThat( pagedFile.getLastPageId(), is( 14L ) );
@@ -3245,51 +3304,6 @@ public void mustEvictPagesFromUnmappedFiles() throws Exception
}
}
- @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS )
- public void mustFlushDirtyPagesInTheBackground() throws Exception
- {
- final CountDownLatch swapOutLatch = new CountDownLatch( 1 );
- PageSwapperFactory swapperFactory = new SingleFilePageSwapperFactory()
- {
- @Override
- public PageSwapper createPageSwapper(
- File file, int filePageSize, PageEvictionCallback onEviction, boolean createIfNotExist ) throws IOException
- {
- PageSwapper delegate = super.createPageSwapper( file, filePageSize, onEviction, createIfNotExist );
- return new DelegatingPageSwapper( delegate )
- {
- @Override
- public long write( long filePageId, Page page ) throws IOException
- {
- try
- {
- return super.write( filePageId, page );
- }
- finally
- {
- swapOutLatch.countDown();
- }
- }
- };
- }
- };
- swapperFactory.setFileSystemAbstraction( fs );
-
- try ( PageCache pageCache = createPageCache(
- swapperFactory, maxPages, pageCachePageSize, PageCacheTracer.NULL );
- PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize ) )
- {
- try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
- {
- assertTrue( cursor.next() );
- writeRecords( cursor );
- }
-
- swapOutLatch.await();
- verifyRecordsInFile( file( "a" ), recordsPerFilePage );
- }
- }
-
@Test( timeout = SEMI_LONG_TIMEOUT_MILLIS )
public void mustReadZerosFromBeyondEndOfFile() throws Exception
{
diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java
index 3fd1295dd3423..5097f46819507 100644
--- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java
+++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ExternallyManagedPageCache.java
@@ -27,6 +27,7 @@
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
+import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.enterprise.EnterpriseFacadeFactory;
@@ -70,6 +71,12 @@ public void flushAndForce() throws IOException
delegate.flushAndForce();
}
+ @Override
+ public void flushAndForce( IOLimiter limiter ) throws IOException
+ {
+ delegate.flushAndForce( limiter );
+ }
+
@Override
public int pageSize()
{