Skip to content

Commit

Permalink
Simplify the IOLimiter infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Mar 8, 2016
1 parent e6c4750 commit b3bb64c
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 311 deletions.
49 changes: 43 additions & 6 deletions community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java
Expand Up @@ -26,14 +26,14 @@
* IOLimiter instances can be passed to the {@link PageCache#flushAndForce(IOLimiter)} and
* {@link PagedFile#flushAndForce(IOLimiter)} methods, which will invoke the
* {@link #maybeLimitIO(long, int, Flushable)} method on regular intervals.
* <p>
* <p/>
* This allows the limiter to measure the rate of IO, and inject sleeps, pauses or flushes into the process.
* The flushes are in this case referring to the underlying hardware.
* <p>
* <p/>
* Normally, flushing a channel will just copy the dirty buffers into the OS page cache, but flushing is in this case
* implying that the OS pages are cleared as well. In other words, the IOPSLimiter can make sure that the operating
* system does not pile up too much IO work in its page cache, by flushing those caches as well on regular intervals.
* <p>
* <p/>
* The {@link #maybeLimitIO(long, int, Flushable)} method takes and returns a stamp long value. This allows the
* implementations to become stateless with respect to the individual flush, yet still keep context from one call to
* the next.
Expand All @@ -48,19 +48,20 @@ public interface IOLimiter

/**
* Invoked at regular intervals during flushing of the {@link PageCache} or {@link PagedFile}s.
*
* <p/>
* For the first call in a flush, the {@code previousStamp} should have the {@link #INITIAL_STAMP} value.
* The return value of this method should then be used as the stamp of the next call. This allows implementations
* to be stateless, yet still keep some context around about a given flush, provided they can encode it as a
* {@code long}.
*
* <p/>
* The meaning of this long value is totally opaque to the caller, and can be anything the IOPSLimiter
* implementation desires.
*
* <p/>
* The implementation is allowed to force changes to the storage device using the given {@link Flushable}, or
* to perform {@link Thread#sleep(long) sleeps}, as it desires. It is not allowed to throw
* {@link InterruptedException}, however. Those should be dealt with by catching them and re-interrupting the
* current thread, or by wrapping them in {@link IOException}s.
*
* @param previousStamp The stamp from the previous call to this method, or {@link #INITIAL_STAMP} if this is the
* first call to this method for the given flush.
* @param recentlyCompletedIOs The number of IOs completed since the last call to this method.
Expand All @@ -70,6 +71,42 @@ public interface IOLimiter
*/
long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) throws IOException;

/**
* Temporarily disable the IOLimiter, to allow IO to proceed at full speed.
* This call <strong>MUST</strong> be paired with a subsequent {@link #enableLimit()} call.
* This method is thread-safe and reentrant. Multiple concurrent calls will "stack", and IO limitations will be
* enabled again once the last overlapping limit-disabling period ends with the "last" call to
* {@link #enableLimit()}. This is conceptually similar to how a reentrant read-lock works.
*
* Thus, the typical usage pattern is with a {@code try-finally} clause, like this:
*
* <pre><code>
* limiter.disableLimit();
* try
* {
* // ... do work that needs maximum IO performance ...
* }
* finally
* {
* limiter.enableLimit();
* }
* </code></pre>
*/
default void disableLimit()
{
// By default this method does nothing, assuming the implementation always has no or fixed limits.
}

/**
* Re-enable the IOLimiter, after having disabled it with {@link #disableLimit()}.
*
* @see #disableLimit() for how to use this method.
*/
default void enableLimit()
{
// Same as for disableLimit().
}

/**
* An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the
* flush to go as fast as possible.
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.TokenNameLookup;
Expand Down Expand Up @@ -117,7 +118,6 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerImpl;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CountCommittedTransactionThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointFlushControl;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TimeCheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
Expand Down Expand Up @@ -286,7 +286,7 @@ boolean applicable( DiagnosticsPhase phase )
private final LegacyIndexProviderLookup legacyIndexProviderLookup;
private final ConstraintSemantics constraintSemantics;
private final Procedures procedures;
private final CheckPointFlushControl checkPointFlushControl;
private final IOLimiter ioLimiter;

private Dependencies dependencies;
private LifeSupport life;
Expand Down Expand Up @@ -333,7 +333,7 @@ public NeoStoreDataSource(
Monitors monitors,
Tracers tracers,
Procedures procedures,
CheckPointFlushControl checkPointFlushControl )
IOLimiter ioLimiter )
{
this.storeDir = storeDir;
this.config = config;
Expand Down Expand Up @@ -361,7 +361,7 @@ public NeoStoreDataSource(
this.monitors = monitors;
this.tracers = tracers;
this.procedures = procedures;
this.checkPointFlushControl = checkPointFlushControl;
this.ioLimiter = ioLimiter;

readOnly = config.get( Configuration.read_only );
msgLog = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -646,7 +646,7 @@ public long getTimestampForVersion( long version ) throws IOException

final CheckPointerImpl checkPointer = new CheckPointerImpl(
transactionIdStore, threshold, storageEngine, logPruning, appender, databaseHealth, logProvider,
tracers.checkPointTracer, checkPointFlushControl );
tracers.checkPointTracer, ioLimiter );

long recurringPeriod = Math.min( timeMillisThreshold, TimeUnit.SECONDS.toMillis( 10 ) );
CheckPointScheduler checkPointScheduler = new CheckPointScheduler( checkPointer, scheduler, recurringPeriod );
Expand Down
Expand Up @@ -25,12 +25,10 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.DatabaseAvailability;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.internal.KernelData;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.internal.Version;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.index.RemoveOrphanConstraintIndexesOnStartup;
Expand All @@ -52,8 +50,10 @@
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.log.checkpoint.FullSpeedCheckPointFlushControl;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.internal.KernelData;
import org.neo4j.kernel.internal.Version;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
Expand Down Expand Up @@ -107,7 +107,7 @@ public CommunityEditionModule( PlatformModule platformModule )

coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout );

checkPointFlushControl = new FullSpeedCheckPointFlushControl();
ioLimiter = IOLimiter.unlimited();

registerRecovery( platformModule.databaseInfo, life, dependencies );

Expand Down
Expand Up @@ -198,7 +198,7 @@ public DataSourceModule( final GraphDatabaseFacadeFactory.Dependencies dependenc
platformModule.monitors,
platformModule.tracers,
procedures,
editionModule.checkPointFlushControl ) );
editionModule.ioLimiter ) );
dataSourceManager.register( neoStoreDataSource );

life.add( new MonitorGc( config, logging.getInternalLog( MonitorGc.class ) ) );
Expand Down
Expand Up @@ -20,8 +20,8 @@
package org.neo4j.kernel.impl.factory;

import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.internal.KernelDiagnostics;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
Expand All @@ -35,8 +35,8 @@
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointFlushControl;
import org.neo4j.kernel.info.DiagnosticsManager;
import org.neo4j.kernel.internal.KernelDiagnostics;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.server.security.auth.AuthManager;
Expand Down Expand Up @@ -76,7 +76,7 @@ public abstract class EditionModule

public CoreAPIAvailabilityGuard coreAPIAvailabilityGuard;

public CheckPointFlushControl checkPointFlushControl;
public IOLimiter ioLimiter;

protected void doAfterRecoveryAndStartup( DatabaseInfo databaseInfo, DependencyResolver dependencyResolver)
{
Expand Down

This file was deleted.

Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
Expand All @@ -43,7 +44,7 @@ public class CheckPointerImpl extends LifecycleAdapter implements CheckPointer
private final StorageEngine storageEngine;
private final LogPruning logPruning;
private final DatabaseHealth databaseHealth;
private final CheckPointFlushControl flushControl;
private final IOLimiter ioLimiter;
private final Log msgLog;
private final CheckPointTracer tracer;
private final Lock lock;
Expand All @@ -59,7 +60,7 @@ public CheckPointerImpl(
DatabaseHealth databaseHealth,
LogProvider logProvider,
CheckPointTracer tracer,
CheckPointFlushControl checkPointFlushControl )
IOLimiter ioLimiter )
{
this( transactionIdStore,
threshold,
Expand All @@ -69,7 +70,7 @@ public CheckPointerImpl(
databaseHealth,
logProvider,
tracer,
checkPointFlushControl,
ioLimiter,
new ReentrantLock() );
}

Expand All @@ -82,7 +83,7 @@ public CheckPointerImpl(
DatabaseHealth databaseHealth,
LogProvider logProvider,
CheckPointTracer tracer,
CheckPointFlushControl flushControl,
IOLimiter ioLimiter,
Lock lock )
{
this.appender = appender;
Expand All @@ -91,7 +92,7 @@ public CheckPointerImpl(
this.storageEngine = storageEngine;
this.logPruning = logPruning;
this.databaseHealth = databaseHealth;
this.flushControl = flushControl;
this.ioLimiter = ioLimiter;
this.msgLog = logProvider.getLog( CheckPointerImpl.class );
this.tracer = tracer;
this.lock = lock;
Expand All @@ -106,24 +107,24 @@ public void start() throws Throwable
@Override
public long forceCheckPoint( TriggerInfo info ) throws IOException
{
try ( Rush ignore = flushControl.beginTemporaryRush() )
ioLimiter.disableLimit();
lock.lock();
try
{
lock.lock();
try
{
return doCheckPoint( info, LogCheckPointEvent.NULL );
}
finally
{
lock.unlock();
}
return doCheckPoint( info, LogCheckPointEvent.NULL );
}
finally
{
lock.unlock();
ioLimiter.enableLimit();
}
}

@Override
public long tryCheckPoint( TriggerInfo info ) throws IOException
{
try ( Rush ignore = flushControl.beginTemporaryRush() )
ioLimiter.disableLimit();
try
{
if ( lock.tryLock() )
{
Expand Down Expand Up @@ -151,6 +152,10 @@ public long tryCheckPoint( TriggerInfo info ) throws IOException
}
}
}
finally
{
ioLimiter.enableLimit();
}
}

@Override
Expand Down Expand Up @@ -195,7 +200,7 @@ private long doCheckPoint( TriggerInfo triggerInfo, LogCheckPointEvent logCheckP
* earlier check point and replay from there all the log entries. Everything will be ok.
*/
msgLog.info( prefix + " Starting store flush..." );
storageEngine.flushAndForce( flushControl.getIOLimiter() );
storageEngine.flushAndForce( ioLimiter );
msgLog.info( prefix + " Store flush completed" );

/*
Expand Down

0 comments on commit b3bb64c

Please sign in to comment.