Skip to content

Commit

Permalink
Introduce IOLimiter and remove background page cache flusher
Browse files Browse the repository at this point in the history
The IOLimiter allows the CheckPointer to flush at a rate that is more sympathetic to other urgent tasks that need IO, such as appending to the transaction log during commit.
The IOLimiter has, however, not yet been integrated with the CheckPointer.
Removing the background flushing in the page cache also allows a change to the SequenceLock, to allow reads and writes to be concurrent with flushing.
This has also not been implemented.
  • Loading branch information
chrisvest committed Mar 4, 2016
1 parent 282374e commit 091815b
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 259 deletions.
81 changes: 81 additions & 0 deletions community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.pagecache;

import java.io.Flushable;
import java.io.IOException;

/**
* IOLimiter instances can be passed to the {@link PageCache#flushAndForce(IOLimiter)} and
* {@link PagedFile#flushAndForce(IOLimiter)} methods, which will invoke the
* {@link #maybeLimitIO(long, int, Flushable)} method on regular intervals.
* <p>
* This allows the limiter to measure the rate of IO, and inject sleeps, pauses or flushes into the process.
* The flushes are in this case referring to the underlying hardware.
* <p>
* Normally, flushing a channel will just copy the dirty buffers into the OS page cache, but flushing is in this case
* implying that the OS pages are cleared as well. In other words, the IOPSLimiter can make sure that the operating
* system does not pile up too much IO work in its page cache, by flushing those caches as well on regular intervals.
* <p>
* The {@link #maybeLimitIO(long, int, Flushable)} method takes and returns a stamp long value. This allows the
* implementations to become stateless with respect to the individual flush, yet still keep context from one call to
* the next.
*/
public interface IOLimiter
{
/**
* The value of the initial stamp; that is, what should be passed as the {@code previousStamp} to
* {@link #maybeLimitIO(long, int, Flushable)} on the first call in a flush.
*/
long INITIAL_STAMP = 0;

/**
* Invoked at regular intervals during flushing of the {@link PageCache} or {@link PagedFile}s.
*
* For the first call in a flush, the {@code previousStamp} should have the {@link #INITIAL_STAMP} value.
* The return value of this method should then be used as the stamp of the next call. This allows implementations
* to be stateless, yet still keep some context around about a given flush, provided they can encode it as a
* {@code long}.
*
* The meaning of this long value is totally opaque to the caller, and can be anything the IOPSLimiter
* implementation desires.
*
* The implementation is allowed to force changes to the storage device using the given {@link Flushable}, or
* to perform {@link Thread#sleep(long) sleeps}, as it desires. It is not allowed to throw
* {@link InterruptedException}, however. Those should be dealt with by catching them and re-interrupting the
* current thread, or by wrapping them in {@link IOException}s.
* @param previousStamp The stamp from the previous call to this method, or {@link #INITIAL_STAMP} if this is the
* first call to this method for the given flush.
* @param recentlyCompletedIOs The number of IOs completed since the last call to this method.
* @param flushable A {@link Flushable} instance that can flush any relevant dirty system buffers, to help smooth
* out the IO load on the storage device.
* @return A new stamp to pass into the next call to this method.
*/
long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) throws IOException;

/**
* An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the
* flush to go as fast as possible.
*/
static IOLimiter unlimited()
{
return ( previousStamp, recentlyCompletedIOs, flushable ) -> 0;
}
}
Expand Up @@ -64,6 +64,13 @@ public interface PageCache extends AutoCloseable
/** Flush all dirty pages */
void flushAndForce() throws IOException;

/**
* Flush all dirty pages, but limit the rate of IO as advised by the given IOPSLimiter.
* @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing
* process to keep the IO rate down.
*/
void flushAndForce( IOLimiter limiter ) throws IOException;

/** Flush all dirty pages and close the page cache. */
void close() throws IOException;

Expand Down
11 changes: 8 additions & 3 deletions community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java
Expand Up @@ -127,12 +127,17 @@ public interface PagedFile extends AutoCloseable

/**
* Flush all dirty pages into the file channel, and force the file channel to disk.
* <p>
* Note: Flushing has to take locks on pages, so you cannot call flush
* while you have pages pinned.
*/
void flushAndForce() throws IOException;

/**
* Flush all dirty pages into the file channel, and force the file channel to disk, but limit the rate of IO as
* advised by the given IOPSLimiter.
* @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing
* process to keep the IO rate down.
*/
void flushAndForce( IOLimiter limiter ) throws IOException;

/**
* Get the file-page-id of the last page in the file.
* <p>
Expand Down

This file was deleted.

Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCacheOpenOptions;
import org.neo4j.io.pagecache.PageSwapperFactory;
Expand All @@ -45,9 +46,7 @@
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getDouble;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getInteger;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.getLong;

/**
* The Muninn {@link org.neo4j.io.pagecache.PageCache page cache} implementation.
Expand Down Expand Up @@ -114,29 +113,6 @@ public class MuninnPageCache implements PageCache
private static final int cooperativeEvictionLiveLockThreshold = getInteger(
MuninnPageCache.class, "cooperativeEvictionLiveLockThreshold", 100 );

private static final boolean backgroundFlushingEnabled = flag(
MuninnPageCache.class, "backgroundFlushingEnabled", true );

// The background flush task will only spend a certain amount of time doing IO, to avoid saturating the IO
// subsystem during times when there is more important work to be done. It will do this by measuring how much
// time it spends on each flush, and then accumulate a sleep debt. Once the sleep debt grows beyond this
// threshold, the flush task will pay it back.
private static final long backgroundFlushSleepDebtThreshold = getLong(
MuninnPageCache.class, "backgroundFlushSleepDebtThreshold", 10 );

// This ratio determines how the background flush task will spend its time. Specifically, it is a ratio of how
// much of its time will be spent doing IO. For instance, setting the ratio to 0.3 will make the flusher task
// spend 30% of its time doing IO, and 70% of its time sleeping.
private static final double backgroundFlushIoRatio = getDouble(
MuninnPageCache.class, "backgroundFlushIoRatio", 0.1 );

private static final long backgroundFlushBusyBreak = getLong(
MuninnPageCache.class, "backgroundFlushBusyBreak", 100 );
private static final long backgroundFlushMediumBreak = getLong(
MuninnPageCache.class, "backgroundFlushMediumBreak", 200 );
private static final long backgroundFlushLongBreak = getLong(
MuninnPageCache.class, "backgroundFlushLongBreak", 1000 );

// This is a pre-allocated constant, so we can throw it without allocating any objects:
@SuppressWarnings( "ThrowableInstanceNeverThrown" )
private static final IOException oomException = new IOException(
Expand Down Expand Up @@ -414,10 +390,6 @@ private void ensureThreadsInitialised() throws IOException
try
{
backgroundThreadExecutor.execute( new EvictionTask( this ) );
if ( backgroundFlushingEnabled )
{
backgroundThreadExecutor.execute( new FlushTask( this ) ); // TODO disable background flushing for good
}
}
catch ( Exception e )
{
Expand Down Expand Up @@ -501,22 +473,32 @@ public void setPrintExceptionsOnClose( boolean enabled )
}

@Override
public synchronized void flushAndForce() throws IOException
public void flushAndForce() throws IOException
{
flushAndForce( IOLimiter.unlimited() );
}

@Override
public synchronized void flushAndForce( IOLimiter limiter ) throws IOException
{
if ( limiter == null )
{
throw new IllegalArgumentException( "IOPSLimiter cannot be null" );
}
assertNotClosed();
flushAllPages();
flushAllPages( limiter );
clearEvictorException();
}

private void flushAllPages() throws IOException
private void flushAllPages( IOLimiter limiter ) throws IOException
{
try ( MajorFlushEvent cacheFlush = tracer.beginCacheFlush() )
{
FlushEventOpportunity flushOpportunity = cacheFlush.flushEventOpportunity();
FileMapping fileMapping = mappedFiles;
while ( fileMapping != null )
{
fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false );
fileMapping.pagedFile.flushAndForceInternal( flushOpportunity, false, limiter );
fileMapping = fileMapping.next;
}
syncDevice();
Expand Down Expand Up @@ -952,121 +934,6 @@ private void checkBackgroundFlushPause()
}
}

/**
* Scan through all the pages, flushing the dirty ones. Aim to only spend at most 50% of its time doing IO, in an
* effort to avoid saturating the IO subsystem or steal precious IO resources from more important work.
*/
void continuouslyFlushPages()
{
Thread thread = Thread.currentThread();
flushThread = thread;

while ( !thread.isInterrupted() )
{
long iterationSleepMillis = flushAtIORatio( backgroundFlushIoRatio );
if ( iterationSleepMillis > 0 )
{
LockSupport.parkNanos( this, TimeUnit.MILLISECONDS.toNanos( iterationSleepMillis ) );
sleepDebtNanos = 0;
}
}
}

private long flushAtIORatio( double ratio )
{
Thread thread = Thread.currentThread();
long sleepPaymentThreshold = TimeUnit.MILLISECONDS.toNanos( backgroundFlushSleepDebtThreshold );
boolean seenDirtyPages = false;
boolean flushedPages = false;
double sleepFactor = (1 - ratio) / ratio;

try ( MajorFlushEvent event = tracer.beginCacheFlush() )
{
for ( MuninnPage page : pages )
{
if ( page == null || thread.isInterrupted() )
{
// Null pages means the page cache has been closed.
thread.interrupt();
return 0;
}

// The rate is the percentage of time that we want to spend doing IO. If the rate is 0.3, then we
// want to spend 30% of our time doing IO. We would then spend the other 70% of the time just
// sleeping. This means that for every IO we do, we measure how long it takes. We can then compute
// the amount of time we need to sleep. Basically, if we spend 30 microseconds doing IO, then we need
// to sleep for 70 microseconds, with the 0.3 ratio. To get the sleep time, we can divide the IO time
// T by the ratio R, and then multiply the result by 1 - R. This is equivalent to (T/R) - T = S.
// Then, because we don't want to sleep too frequently in too small intervals, we sum up our S's and
// only sleep when we have collected a sleep debt of at least 10 milliseconds.
// IO is not the only point of contention, however. Doing a flush also means that we have to take a
// pessimistic read-lock on the page, and if we do this on a page that is very popular for writing,
// then it can noticeably impact the performance of the database. Therefore, we check the dirtiness of
// a given page under and *optimistic* read lock, and we also decrement the usage counter to avoid
// aggressively flushing very popular pages. We need to carefully balance this, though, since we are
// at risk of the mutator threads performing so many writes that we can't decrement the usage
// counters fast enough to reach zero.

// Skip the page if it is already write locked, or not dirty, or too popular.
boolean thisPageIsDirty;
if ( !(thisPageIsDirty = page.isDirty()) || !page.decrementUsage() )
{
seenDirtyPages |= thisPageIsDirty;
continue; // Continue looping to the next page.
}

if ( page.tryFreezeLock() )
{
try
{
// Double-check that the page is still dirty. We could be racing with other flushing threads.
if ( !page.isDirty() )
{
continue; // Continue looping to the next page.
}

long startNanos = System.nanoTime();
page.flush( event.flushEventOpportunity() );
long elapsedNanos = System.nanoTime() - startNanos;

sleepDebtNanos += elapsedNanos * sleepFactor;
flushedPages = true;
}
catch ( Throwable ignore )
{
// The MuninnPage.flush method will keep the page dirty if flushing fails, and the eviction
// thread will eventually report the problem if its serious. Ergo, we can just ignore any and
// all exceptions, and move on to the next page. If we end up not getting anything done this
// iteration of flushAtIORatio, then that's fine too.
}
finally
{
page.unlockFreeze();
}
}

// Check if we've collected enough sleep debt, and if so, pay it back.
if ( sleepDebtNanos > sleepPaymentThreshold )
{
LockSupport.parkNanos( sleepDebtNanos );
sleepDebtNanos = 0;
}

// Check if we've been asked to pause, because another thread wants to focus on flushing.
checkBackgroundFlushPause();
}
}

// We return an amount of time, in milliseconds, that we want to wait before we do the next iteration. If we
// have seen no dirty pages, then we can take a long break because the database is presumably not very busy
// with writing. If we have seen dirty pages and flushed some, then we can take a medium break since we've
// made some progress but we also need to keep up. If we have seen dirty pages and flushed none of them, then
// we shouldn't take any break, since we are falling behind the mutator threads.
return seenDirtyPages?
flushedPages? backgroundFlushMediumBreak : backgroundFlushBusyBreak
: backgroundFlushLongBreak;
}

@Override
public String toString()
{
Expand Down

0 comments on commit 091815b

Please sign in to comment.