Skip to content

Commit

Permalink
Do flush and force in parallel if configuration allows.
Browse files Browse the repository at this point in the history
Added way to determine if IOLimiter is currently limiting IO or not.
  • Loading branch information
klaren committed Jun 20, 2018
1 parent 38aa5f9 commit 58b39d6
Show file tree
Hide file tree
Showing 45 changed files with 339 additions and 110 deletions.
Expand Up @@ -575,7 +575,7 @@ public void shouldReportNodesThatAreNotIndexed() throws Exception
}
}
}
accessor.force( IOLimiter.unlimited() );
accessor.force( IOLimiter.UNLIMITED );
accessor.close();
}

Expand All @@ -602,7 +602,7 @@ public void shouldReportNodesWithDuplicatePropertyValueInUniqueIndex() throws Ex
IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE );
updater.process( IndexEntryUpdate.add( 42, indexRule.schema(), values( indexRule ) ) );
updater.close();
accessor.force( IOLimiter.unlimited() );
accessor.force( IOLimiter.UNLIMITED );
accessor.close();
}

Expand Down
Expand Up @@ -248,6 +248,6 @@ private void rotateLogAndCheckPoint() throws IOException

private void flushAll()
{
db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.unlimited() );
db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.UNLIMITED );
}
}
Expand Up @@ -225,7 +225,7 @@ public void failForceIndexesWhenOneOfTheIndexesIsBroken() throws Exception

expectedException.expect( UnderlyingStorageException.class );
expectedException.expectMessage( "Unable to force" );
indexingService.forceAll( IOLimiter.unlimited() );
indexingService.forceAll( IOLimiter.UNLIMITED );
}

private void waitIndexOnline( IndexProxy indexProxy ) throws InterruptedException
Expand Down
Expand Up @@ -82,7 +82,7 @@ public void scanStoreRecreateCorruptedIndexOnStartup() throws Throwable
createTestNode();
long[] labels = readNodesForLabel( labelScanStore );
assertEquals( "Label scan store see 1 label for node", 1, labels.length );
labelScanStore.force( IOLimiter.unlimited() );
labelScanStore.force( IOLimiter.UNLIMITED );
labelScanStore.shutdown();
workCollector.shutdown();

Expand Down
Expand Up @@ -518,7 +518,7 @@ private <RECORD extends AbstractBaseRecord> void assertSameStoreContents( Record

private void flush( GraphDatabaseService db )
{
((GraphDatabaseAPI)db).getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.unlimited() );
((GraphDatabaseAPI)db).getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.UNLIMITED );
}

private void checkPoint( GraphDatabaseService db ) throws IOException
Expand Down
Expand Up @@ -610,7 +610,7 @@ public void testSetLatestConstraintTx()
assertEquals( 10L, metaDataStore.getLatestConstraintIntroducingTx() );

// when
neoStores.flush( IOLimiter.unlimited() );
neoStores.flush( IOLimiter.UNLIMITED );
neoStores.close();
neoStores = sf.openAllNeoStores();

Expand Down
Expand Up @@ -215,7 +215,7 @@ private void initializeNativeLabelScanStoreWithContent( File dir ) throws IOExce
{
labelScanWriter.write( NodeLabelUpdate.labelChanges( 1, new long[0], new long[]{1} ) );
}
nativeLabelScanStore.force( IOLimiter.unlimited() );
nativeLabelScanStore.force( IOLimiter.UNLIMITED );
}
}

Expand Down
Expand Up @@ -261,7 +261,7 @@ public enum FlushStrategy
@Override
void flush( GraphDatabaseAPI db )
{
IOLimiter limiter = IOLimiter.unlimited();
IOLimiter limiter = IOLimiter.UNLIMITED;
db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( limiter );
}
},
Expand Down
Expand Up @@ -488,7 +488,7 @@ private void initializeAfterCreation( Consumer<PageCursor> headerWriter ) throws
changesSinceLastCheckpoint = true;

// Checkpoint to make the created root node stable. Forcing tree state also piggy-backs on this.
checkpoint( IOLimiter.unlimited(), headerWriter );
checkpoint( IOLimiter.UNLIMITED, headerWriter );
clean = true;
}

Expand Down
Expand Up @@ -264,7 +264,7 @@ private void createAndZipTree( File storeFile ) throws IOException
put( writer, key );
}
}
tree.checkpoint( IOLimiter.unlimited() );
tree.checkpoint( IOLimiter.UNLIMITED );
}
ZipUtils.zip( fsRule.get(), storeFile, directory.file( zipName ) );
}
Expand Down
Expand Up @@ -584,7 +584,7 @@ private Runnable checkpointThread( AtomicBoolean endSignal, AtomicReference<Thro
{
try
{
index.checkpoint( IOLimiter.unlimited() );
index.checkpoint( IOLimiter.UNLIMITED );
// Sleep a little in between checkpoints
MILLISECONDS.sleep( 20L );
}
Expand Down
Expand Up @@ -140,7 +140,7 @@ public void shouldStayCorrectAfterRandomModifications() throws Exception
}
}

index.checkpoint( IOLimiter.unlimited() );
index.checkpoint( IOLimiter.UNLIMITED );
randomlyModifyIndex( index, data, random.random(), (double) round / totalNumberOfRounds );
}

Expand Down
Expand Up @@ -47,7 +47,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing;
import static org.neo4j.io.pagecache.IOLimiter.unlimited;
import static org.neo4j.io.pagecache.IOLimiter.UNLIMITED;
import static org.neo4j.test.rule.PageCacheRule.config;

public abstract class GBPTreeRecoveryITBase<KEY,VALUE>
Expand Down Expand Up @@ -585,7 +585,7 @@ class CheckpointAction extends Action
@Override
public void execute( GBPTree<KEY,VALUE> index ) throws IOException
{
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
}

@Override
Expand Down
Expand Up @@ -83,7 +83,7 @@
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER;
import static org.neo4j.index.internal.gbptree.SimpleLongLayout.longLayout;
import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing;
import static org.neo4j.io.pagecache.IOLimiter.unlimited;
import static org.neo4j.io.pagecache.IOLimiter.UNLIMITED;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -321,7 +321,7 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E
writer.put( key, value );
}
}
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
}

// THEN
Expand Down Expand Up @@ -483,7 +483,7 @@ public void shouldPutHeaderDataInCheckPoint() throws Exception
BiConsumer<GBPTree<MutableLong,MutableLong>,byte[]> beforeClose = ( index, expected ) ->
{
ThrowingRunnable throwingRunnable = () ->
index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) );
index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) );
ThrowingRunnable.throwing( throwingRunnable ).run();
};
verifyHeaderDataAfterClose( beforeClose );
Expand All @@ -496,12 +496,12 @@ public void shouldCarryOverHeaderDataInCheckPoint() throws Exception
{
ThrowingRunnable throwingRunnable = () ->
{
index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) );
index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) );
insert( index, 0, 1 );

// WHEN
// Should carry over header data
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
};
ThrowingRunnable.throwing( throwingRunnable ).run();
};
Expand All @@ -515,7 +515,7 @@ public void shouldCarryOverHeaderDataOnDirtyClose() throws Exception
{
ThrowingRunnable throwingRunnable = () ->
{
index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) );
index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) );
insert( index, 0, 1 );

// No checkpoint
Expand All @@ -532,9 +532,9 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception
{
ThrowingRunnable throwingRunnable = () ->
{
index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) );
index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) );
ThreadLocalRandom.current().nextBytes( expected );
index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) );
index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) );
};
ThrowingRunnable.throwing( throwingRunnable ).run();
};
Expand Down Expand Up @@ -613,7 +613,7 @@ public void writeHeaderInDirtyTreeMustNotDeadlock() throws Exception
Consumer<PageCursor> headerWriter = pc -> pc.putBytes( "failed".getBytes() );
try ( GBPTree<MutableLong,MutableLong> index = index( pageCache ).with( RecoveryCleanupWorkCollector.IGNORE ).build() )
{
index.checkpoint( IOLimiter.unlimited(), headerWriter );
index.checkpoint( UNLIMITED, headerWriter );
}

verifyHeader( pageCache, "failed".getBytes() );
Expand Down Expand Up @@ -811,7 +811,7 @@ public void checkPointShouldLockOutWriter() throws Exception

// WHEN
monitor.enabled = true;
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) );
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) );
monitor.barrier.awaitUninterruptibly();
// now we're in the smack middle of a checkpoint
Future<?> writerClose = executor.submit( throwing( () -> index.writer().close() ) );
Expand Down Expand Up @@ -842,7 +842,7 @@ public void checkPointShouldWaitForWriter() throws Exception
}
} ) );
barrier.awaitUninterruptibly();
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) );
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) );
shouldWait( checkpoint );

// THEN
Expand Down Expand Up @@ -963,7 +963,7 @@ public void correctlyShutdownIndexIsClean() throws IOException
{
writer.put( new MutableLong( 1L ), new MutableLong( 2L ) );
}
index.checkpoint( IOLimiter.unlimited() );
index.checkpoint( UNLIMITED );
}
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
{
Expand All @@ -987,7 +987,7 @@ public void cleanJobShouldLockOutCheckpoint() throws Exception
index.writer().close();

// THEN
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) );
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) );
shouldWait( checkpoint );

monitor.barrier.release();
Expand All @@ -1011,7 +1011,7 @@ public void cleanJobShouldLockOutCheckpointOnNoUpdate() throws Exception
monitor.barrier.awaitUninterruptibly();

// THEN
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) );
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) );
shouldWait( checkpoint );

monitor.barrier.release();
Expand Down Expand Up @@ -1213,7 +1213,7 @@ public void cleanupFinished( long numberOfPagesVisited, long numberOfCleanedCras
Future<?> cleanup = executor.submit( throwing( collector::start ) );
shouldWait( cleanup );

Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) );
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) );
shouldWait( checkpoint );

cleanupMonitor.barrier.release();
Expand Down Expand Up @@ -1262,7 +1262,7 @@ public void shouldNotCheckpointOnClose() throws Exception
{
writer.put( new MutableLong( 0 ), new MutableLong( 1 ) );
}
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
assertEquals( 1, checkpointCounter.count() );
}

Expand All @@ -1280,7 +1280,7 @@ public void shouldCheckpointEvenIfNoChanges() throws Exception
try ( GBPTree<MutableLong,MutableLong> index = index().with( checkpointCounter ).build() )
{
checkpointCounter.reset();
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );

// THEN
assertEquals( 1, checkpointCounter.count() );
Expand Down Expand Up @@ -1322,7 +1322,7 @@ public void mustSeeUpdatesThatWasCheckpointed() throws Exception
insert( index, key, value );

// WHEN
index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
}

// THEN
Expand Down Expand Up @@ -1408,7 +1408,7 @@ public void indexMustBeCleanWhenClosedAfterCheckpoint() throws Exception
{
insert( index, 0, 1 );

index.checkpoint( unlimited() );
index.checkpoint( UNLIMITED );
}

// WHEN
Expand Down Expand Up @@ -1501,7 +1501,7 @@ public void cleanCrashPointersMustNotTriggerOnCleanStart() throws Exception
{
insert( index, 0, 1 );

index.checkpoint( IOLimiter.unlimited() );
index.checkpoint( UNLIMITED );
}

// WHEN
Expand Down
2 changes: 1 addition & 1 deletion community/io/src/main/java/org/neo4j/io/IOUtils.java
Expand Up @@ -42,7 +42,7 @@ private IOUtils()
*/
public static <T extends AutoCloseable> void closeAll( Collection<T> closeables ) throws IOException
{
closeAll( closeables.toArray( new AutoCloseable[closeables.size()] ) );
closeAll( closeables.toArray( new AutoCloseable[0] ) );
}

/**
Expand Down
23 changes: 21 additions & 2 deletions community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java
Expand Up @@ -107,8 +107,27 @@ default void enableLimit()
* An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the
* flush to go as fast as possible.
*/
static IOLimiter unlimited()
IOLimiter UNLIMITED = new IOLimiter()
{
return ( previousStamp, recentlyCompletedIOs, flushable ) -> previousStamp;
@Override
public long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable )
{
return previousStamp;
}

@Override
public boolean isLimited()
{
return false;
}
};


/**
* @return {@code true} if IO is currently limited
*/
default boolean isLimited()
{
return true;
}
}
Expand Up @@ -20,21 +20,23 @@
package org.neo4j.io.pagecache.impl.muninn;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* An executor for the background threads for the page caches.
*
* <p>
* This is similar to an unbounded cached thread pool, except it uses daemon threads.
*
* <p>
* There are only one of these (it's a singleton) to facilitate reusing the threads of closed page caches.
* This is useful for making tests run faster.
*/
final class BackgroundThreadExecutor implements Executor
{
static final BackgroundThreadExecutor INSTANCE = new BackgroundThreadExecutor();

private final Executor executor;
private final ExecutorService executor;

private BackgroundThreadExecutor()
{
Expand All @@ -47,4 +49,8 @@ public void execute( Runnable command )
executor.execute( command );
}

public Future<?> submit( Runnable command )
{
return executor.submit( command );
}
}

0 comments on commit 58b39d6

Please sign in to comment.