diff --git a/community/community-it/consistency-it/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java b/community/community-it/consistency-it/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java index c07124d5bbde3..dcf95e547b0be 100644 --- a/community/community-it/consistency-it/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java +++ b/community/community-it/consistency-it/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java @@ -575,7 +575,7 @@ public void shouldReportNodesThatAreNotIndexed() throws Exception } } } - accessor.force( IOLimiter.unlimited() ); + accessor.force( IOLimiter.UNLIMITED ); accessor.close(); } @@ -602,7 +602,7 @@ public void shouldReportNodesWithDuplicatePropertyValueInUniqueIndex() throws Ex IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE ); updater.process( IndexEntryUpdate.add( 42, indexRule.schema(), values( indexRule ) ) ); updater.close(); - accessor.force( IOLimiter.unlimited() ); + accessor.force( IOLimiter.UNLIMITED ); accessor.close(); } diff --git a/community/community-it/index-it/src/test/java/org/neo4j/index/recovery/UniqueIndexRecoveryTest.java b/community/community-it/index-it/src/test/java/org/neo4j/index/recovery/UniqueIndexRecoveryTest.java index 35ba21417fc33..76f93c683a503 100644 --- a/community/community-it/index-it/src/test/java/org/neo4j/index/recovery/UniqueIndexRecoveryTest.java +++ b/community/community-it/index-it/src/test/java/org/neo4j/index/recovery/UniqueIndexRecoveryTest.java @@ -248,6 +248,6 @@ private void rotateLogAndCheckPoint() throws IOException private void flushAll() { - db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.unlimited() ); + db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.UNLIMITED ); } } diff --git a/community/community-it/index-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceIntegrationTest.java b/community/community-it/index-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceIntegrationTest.java index 510bb22559568..fc4b1785cf252 100644 --- a/community/community-it/index-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceIntegrationTest.java +++ b/community/community-it/index-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceIntegrationTest.java @@ -225,7 +225,7 @@ public void failForceIndexesWhenOneOfTheIndexesIsBroken() throws Exception expectedException.expect( UnderlyingStorageException.class ); expectedException.expectMessage( "Unable to force" ); - indexingService.forceAll( IOLimiter.unlimited() ); + indexingService.forceAll( IOLimiter.UNLIMITED ); } private void waitIndexOnline( IndexProxy indexProxy ) throws InterruptedException diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/graphdb/NativeLabelScanStoreStartupIT.java b/community/community-it/kernel-it/src/test/java/org/neo4j/graphdb/NativeLabelScanStoreStartupIT.java index 3a6cca2f56838..bf8fe8fae0f3b 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/graphdb/NativeLabelScanStoreStartupIT.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/graphdb/NativeLabelScanStoreStartupIT.java @@ -82,7 +82,7 @@ public void scanStoreRecreateCorruptedIndexOnStartup() throws Throwable createTestNode(); long[] labels = readNodesForLabel( labelScanStore ); assertEquals( "Label scan store see 1 label for node", 1, labels.length ); - labelScanStore.force( IOLimiter.unlimited() ); + labelScanStore.force( IOLimiter.UNLIMITED ); labelScanStore.shutdown(); workCollector.shutdown(); diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/RecoveryIT.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/RecoveryIT.java index c4fb91c5cca76..df06e48aa90fb 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/RecoveryIT.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/RecoveryIT.java @@ -518,7 +518,7 @@ private void assertSameStoreContents( Record private void flush( GraphDatabaseService db ) { - ((GraphDatabaseAPI)db).getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.unlimited() ); + ((GraphDatabaseAPI)db).getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( IOLimiter.UNLIMITED ); } private void checkPoint( GraphDatabaseService db ) throws IOException diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java index b310e97212f19..e0aab227d24d0 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java @@ -610,7 +610,7 @@ public void testSetLatestConstraintTx() assertEquals( 10L, metaDataStore.getLatestConstraintIntroducingTx() ); // when - neoStores.flush( IOLimiter.unlimited() ); + neoStores.flush( IOLimiter.UNLIMITED ); neoStores.close(); neoStores = sf.openAllNeoStores(); diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/storemigration/participant/NativeLabelScanStoreMigratorTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/storemigration/participant/NativeLabelScanStoreMigratorTest.java index 2a178a85b1c93..20873b384bf73 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/storemigration/participant/NativeLabelScanStoreMigratorTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/storemigration/participant/NativeLabelScanStoreMigratorTest.java @@ -215,7 +215,7 @@ private void initializeNativeLabelScanStoreWithContent( File dir ) throws IOExce { labelScanWriter.write( NodeLabelUpdate.labelChanges( 1, new long[0], new long[]{1} ) ); } - nativeLabelScanStore.force( IOLimiter.unlimited() ); + nativeLabelScanStore.force( IOLimiter.UNLIMITED ); } } diff --git a/community/community-it/kernel-it/src/test/java/recovery/TestRecoveryScenarios.java b/community/community-it/kernel-it/src/test/java/recovery/TestRecoveryScenarios.java index ae6e451ce0923..8967101f67b27 100644 --- a/community/community-it/kernel-it/src/test/java/recovery/TestRecoveryScenarios.java +++ b/community/community-it/kernel-it/src/test/java/recovery/TestRecoveryScenarios.java @@ -261,7 +261,7 @@ public enum FlushStrategy @Override void flush( GraphDatabaseAPI db ) { - IOLimiter limiter = IOLimiter.unlimited(); + IOLimiter limiter = IOLimiter.UNLIMITED; db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( limiter ); } }, diff --git a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java index 79ace8eb6c408..df1994288fe37 100644 --- a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java +++ b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java @@ -488,7 +488,7 @@ private void initializeAfterCreation( Consumer headerWriter ) throws changesSinceLastCheckpoint = true; // Checkpoint to make the created root node stable. Forcing tree state also piggy-backs on this. - checkpoint( IOLimiter.unlimited(), headerWriter ); + checkpoint( IOLimiter.UNLIMITED, headerWriter ); clean = true; } diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/FormatCompatibilityTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/FormatCompatibilityTest.java index b9ff2212f1aee..348e726a510cd 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/FormatCompatibilityTest.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/FormatCompatibilityTest.java @@ -264,7 +264,7 @@ private void createAndZipTree( File storeFile ) throws IOException put( writer, key ); } } - tree.checkpoint( IOLimiter.unlimited() ); + tree.checkpoint( IOLimiter.UNLIMITED ); } ZipUtils.zip( fsRule.get(), storeFile, directory.file( zipName ) ); } diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeConcurrencyITBase.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeConcurrencyITBase.java index 906f067b180a8..61fe330dd8f63 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeConcurrencyITBase.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeConcurrencyITBase.java @@ -584,7 +584,7 @@ private Runnable checkpointThread( AtomicBoolean endSignal, AtomicReference @@ -585,7 +585,7 @@ class CheckpointAction extends Action @Override public void execute( GBPTree index ) throws IOException { - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); } @Override diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java index 459dc98d89d58..8996e8f9c4c09 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java @@ -83,7 +83,7 @@ import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER; import static org.neo4j.index.internal.gbptree.SimpleLongLayout.longLayout; import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing; -import static org.neo4j.io.pagecache.IOLimiter.unlimited; +import static org.neo4j.io.pagecache.IOLimiter.UNLIMITED; import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK; import static org.neo4j.test.rule.PageCacheRule.config; @@ -321,7 +321,7 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E writer.put( key, value ); } } - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); } // THEN @@ -483,7 +483,7 @@ public void shouldPutHeaderDataInCheckPoint() throws Exception BiConsumer,byte[]> beforeClose = ( index, expected ) -> { ThrowingRunnable throwingRunnable = () -> - index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) ); + index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) ); ThrowingRunnable.throwing( throwingRunnable ).run(); }; verifyHeaderDataAfterClose( beforeClose ); @@ -496,12 +496,12 @@ public void shouldCarryOverHeaderDataInCheckPoint() throws Exception { ThrowingRunnable throwingRunnable = () -> { - index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) ); + index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) ); insert( index, 0, 1 ); // WHEN // Should carry over header data - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); }; ThrowingRunnable.throwing( throwingRunnable ).run(); }; @@ -515,7 +515,7 @@ public void shouldCarryOverHeaderDataOnDirtyClose() throws Exception { ThrowingRunnable throwingRunnable = () -> { - index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) ); + index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) ); insert( index, 0, 1 ); // No checkpoint @@ -532,9 +532,9 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception { ThrowingRunnable throwingRunnable = () -> { - index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) ); + index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) ); ThreadLocalRandom.current().nextBytes( expected ); - index.checkpoint( unlimited(), cursor -> cursor.putBytes( expected ) ); + index.checkpoint( UNLIMITED, cursor -> cursor.putBytes( expected ) ); }; ThrowingRunnable.throwing( throwingRunnable ).run(); }; @@ -613,7 +613,7 @@ public void writeHeaderInDirtyTreeMustNotDeadlock() throws Exception Consumer headerWriter = pc -> pc.putBytes( "failed".getBytes() ); try ( GBPTree index = index( pageCache ).with( RecoveryCleanupWorkCollector.IGNORE ).build() ) { - index.checkpoint( IOLimiter.unlimited(), headerWriter ); + index.checkpoint( UNLIMITED, headerWriter ); } verifyHeader( pageCache, "failed".getBytes() ); @@ -811,7 +811,7 @@ public void checkPointShouldLockOutWriter() throws Exception // WHEN monitor.enabled = true; - Future checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) ); + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) ); monitor.barrier.awaitUninterruptibly(); // now we're in the smack middle of a checkpoint Future writerClose = executor.submit( throwing( () -> index.writer().close() ) ); @@ -842,7 +842,7 @@ public void checkPointShouldWaitForWriter() throws Exception } } ) ); barrier.awaitUninterruptibly(); - Future checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) ); + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) ); shouldWait( checkpoint ); // THEN @@ -963,7 +963,7 @@ public void correctlyShutdownIndexIsClean() throws IOException { writer.put( new MutableLong( 1L ), new MutableLong( 2L ) ); } - index.checkpoint( IOLimiter.unlimited() ); + index.checkpoint( UNLIMITED ); } try ( GBPTree index = index().build() ) { @@ -987,7 +987,7 @@ public void cleanJobShouldLockOutCheckpoint() throws Exception index.writer().close(); // THEN - Future checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) ); + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) ); shouldWait( checkpoint ); monitor.barrier.release(); @@ -1011,7 +1011,7 @@ public void cleanJobShouldLockOutCheckpointOnNoUpdate() throws Exception monitor.barrier.awaitUninterruptibly(); // THEN - Future checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) ); + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) ); shouldWait( checkpoint ); monitor.barrier.release(); @@ -1213,7 +1213,7 @@ public void cleanupFinished( long numberOfPagesVisited, long numberOfCleanedCras Future cleanup = executor.submit( throwing( collector::start ) ); shouldWait( cleanup ); - Future checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) ); + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( UNLIMITED ) ) ); shouldWait( checkpoint ); cleanupMonitor.barrier.release(); @@ -1262,7 +1262,7 @@ public void shouldNotCheckpointOnClose() throws Exception { writer.put( new MutableLong( 0 ), new MutableLong( 1 ) ); } - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); assertEquals( 1, checkpointCounter.count() ); } @@ -1280,7 +1280,7 @@ public void shouldCheckpointEvenIfNoChanges() throws Exception try ( GBPTree index = index().with( checkpointCounter ).build() ) { checkpointCounter.reset(); - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); // THEN assertEquals( 1, checkpointCounter.count() ); @@ -1322,7 +1322,7 @@ public void mustSeeUpdatesThatWasCheckpointed() throws Exception insert( index, key, value ); // WHEN - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); } // THEN @@ -1408,7 +1408,7 @@ public void indexMustBeCleanWhenClosedAfterCheckpoint() throws Exception { insert( index, 0, 1 ); - index.checkpoint( unlimited() ); + index.checkpoint( UNLIMITED ); } // WHEN @@ -1501,7 +1501,7 @@ public void cleanCrashPointersMustNotTriggerOnCleanStart() throws Exception { insert( index, 0, 1 ); - index.checkpoint( IOLimiter.unlimited() ); + index.checkpoint( UNLIMITED ); } // WHEN diff --git a/community/io/src/main/java/org/neo4j/io/IOUtils.java b/community/io/src/main/java/org/neo4j/io/IOUtils.java index 5804130b8f0e6..07b9bc9091540 100644 --- a/community/io/src/main/java/org/neo4j/io/IOUtils.java +++ b/community/io/src/main/java/org/neo4j/io/IOUtils.java @@ -42,7 +42,7 @@ private IOUtils() */ public static void closeAll( Collection closeables ) throws IOException { - closeAll( closeables.toArray( new AutoCloseable[closeables.size()] ) ); + closeAll( closeables.toArray( new AutoCloseable[0] ) ); } /** diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java index 45cd5d300e302..d976d9ccbd461 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/IOLimiter.java @@ -107,8 +107,27 @@ default void enableLimit() * An IOPSLimiter implementation that does not restrict the rate of IO. Use this implementation if you want the * flush to go as fast as possible. */ - static IOLimiter unlimited() + IOLimiter UNLIMITED = new IOLimiter() { - return ( previousStamp, recentlyCompletedIOs, flushable ) -> previousStamp; + @Override + public long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) + { + return previousStamp; + } + + @Override + public boolean isLimited() + { + return false; + } + }; + + + /** + * @return {@code true} if IO is currently limited + */ + default boolean isLimited() + { + return true; } } diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java index fe25e69124281..02dcc229f6b31 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/BackgroundThreadExecutor.java @@ -20,13 +20,15 @@ package org.neo4j.io.pagecache.impl.muninn; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * An executor for the background threads for the page caches. - * + *

* This is similar to an unbounded cached thread pool, except it uses daemon threads. - * + *

* There are only one of these (it's a singleton) to facilitate reusing the threads of closed page caches. * This is useful for making tests run faster. */ @@ -34,7 +36,7 @@ final class BackgroundThreadExecutor implements Executor { static final BackgroundThreadExecutor INSTANCE = new BackgroundThreadExecutor(); - private final Executor executor; + private final ExecutorService executor; private BackgroundThreadExecutor() { @@ -47,4 +49,8 @@ public void execute( Runnable command ) executor.execute( command ); } + public Future submit( Runnable command ) + { + return executor.submit( command ); + } } diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java index a91529f9edd33..334819a053c47 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCache.java @@ -29,7 +29,8 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -147,7 +148,7 @@ public class MuninnPageCache implements PageCache // the constructor of the PageCache, because the Executors have too many configuration options, many of which are // highly troublesome for our use case; caller-runs, bounded submission queues, bounded thread count, non-daemon // thread factories, etc. - private static final Executor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE; + private static final BackgroundThreadExecutor backgroundThreadExecutor = BackgroundThreadExecutor.INSTANCE; private static final List ignoredOpenOptions = Arrays.asList( (OpenOption) StandardOpenOption.APPEND, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SPARSE ); @@ -568,7 +569,7 @@ public void setPrintExceptionsOnClose( boolean enabled ) @Override public void flushAndForce() throws IOException { - flushAndForce( IOLimiter.unlimited() ); + flushAndForce( IOLimiter.UNLIMITED ); } @Override @@ -578,39 +579,84 @@ public void flushAndForce( IOLimiter limiter ) throws IOException { throw new IllegalArgumentException( "IOLimiter cannot be null" ); } - assertNotClosed(); List files = listExistingMappings(); - flushAllPages( files, limiter ); + + try ( MajorFlushEvent ignored = pageCacheTracer.beginCacheFlush() ) + { + if ( limiter.isLimited() ) + { + flushAllPages( files, limiter ); + } + else + { + flushAllPagesParallel( files, limiter ); + } + syncDevice(); + } clearEvictorException(); } private void flushAllPages( List files, IOLimiter limiter ) throws IOException { - try ( MajorFlushEvent cacheFlush = pageCacheTracer.beginCacheFlush() ) + for ( PagedFile file : files ) { - for ( PagedFile file : files ) + flushFile( (MuninnPagedFile) file, limiter ); + } + } + + private void flushAllPagesParallel( List files, IOLimiter limiter ) throws IOException + { + List> flushes = new ArrayList<>( files.size() ); + + // Submit all flushes to the background thread + for ( PagedFile file : files ) + { + flushes.add( backgroundThreadExecutor.submit( () -> { - MuninnPagedFile muninnPagedFile = (MuninnPagedFile) file; - try ( MajorFlushEvent fileFlush = pageCacheTracer.beginFileFlush( muninnPagedFile.swapper ) ) + try { - FlushEventOpportunity flushOpportunity = fileFlush.flushEventOpportunity(); - muninnPagedFile.flushAndForceInternal( flushOpportunity, false, limiter ); + flushFile( (MuninnPagedFile) file, limiter ); } - catch ( ClosedChannelException e ) + catch ( IOException e ) { - if ( muninnPagedFile.getRefCount() > 0 ) - { - // The file is not supposed to be closed, since we have a positive ref-count, yet we got a - // ClosedChannelException anyway? It's an odd situation, so let's tell the outside world about - // this failure. - throw e; - } - // Otherwise: The file was closed while we were trying to flush it. Since unmapping implies a flush - // anyway, we can safely assume that this is not a problem. The file was flushed, and it doesn't - // really matter how that happened. We'll ignore this exception. + throw new UncheckedIOException( e ); } + } ) ); + } + + // Wait for all to complete + for ( Future flush : flushes ) + { + try + { + flush.get(); } - syncDevice(); + catch ( InterruptedException | ExecutionException e ) + { + throw new IOException( e ); + } + } + } + + private void flushFile( MuninnPagedFile muninnPagedFile, IOLimiter limiter ) throws IOException + { + try ( MajorFlushEvent fileFlush = pageCacheTracer.beginFileFlush( muninnPagedFile.swapper ) ) + { + FlushEventOpportunity flushOpportunity = fileFlush.flushEventOpportunity(); + muninnPagedFile.flushAndForceInternal( flushOpportunity, false, limiter ); + } + catch ( ClosedChannelException e ) + { + if ( muninnPagedFile.getRefCount() > 0 ) + { + // The file is not supposed to be closed, since we have a positive ref-count, yet we got a + // ClosedChannelException anyway? It's an odd situation, so let's tell the outside world about + // this failure. + throw e; + } + // Otherwise: The file was closed while we were trying to flush it. Since unmapping implies a flush + // anyway, we can safely assume that this is not a problem. The file was flushed, and it doesn't + // really matter how that happened. We'll ignore this exception. } } diff --git a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java index 3b7a504e856d5..b85e5366f9c58 100644 --- a/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java +++ b/community/io/src/main/java/org/neo4j/io/pagecache/impl/muninn/MuninnPagedFile.java @@ -258,7 +258,7 @@ void closeSwapper() throws IOException @Override public void flushAndForce() throws IOException { - flushAndForce( IOLimiter.unlimited() ); + flushAndForce( IOLimiter.UNLIMITED ); } @Override @@ -288,7 +288,7 @@ void flushAndForceForClose() throws IOException } try ( MajorFlushEvent flushEvent = pageCacheTracer.beginFileFlush( swapper ) ) { - flushAndForceInternal( flushEvent.flushEventOpportunity(), true, IOLimiter.unlimited() ); + flushAndForceInternal( flushEvent.flushEventOpportunity(), true, IOLimiter.UNLIMITED ); syncDevice(); } pageCache.clearEvictorException(); diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java index 5b7f80071586a..fcf8752f5573a 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.function.IntSupplier; @@ -32,13 +34,17 @@ import org.neo4j.graphdb.mockfs.DelegatingFileSystemAbstraction; import org.neo4j.graphdb.mockfs.DelegatingStoreChannel; +import org.neo4j.io.IOUtils; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.OpenMode; import org.neo4j.io.fs.StoreChannel; +import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCacheTest; import org.neo4j.io.pagecache.PageCursor; +import org.neo4j.io.pagecache.PageSwapper; import org.neo4j.io.pagecache.PagedFile; import org.neo4j.io.pagecache.tracing.ConfigurablePageCursorTracerSupplier; +import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer; import org.neo4j.io.pagecache.tracing.DelegatingPageCacheTracer; import org.neo4j.io.pagecache.tracing.EvictionRunEvent; import org.neo4j.io.pagecache.tracing.MajorFlushEvent; @@ -677,6 +683,59 @@ void mustThrowIfMappingFileWouldOverflowReferenceCount() } ); } + @Test + void unlimitedShouldFlushInParallel() + { + assertTimeout( ofMillis( SEMI_LONG_TIMEOUT_MILLIS ), () -> + { + List mappedFiles = new ArrayList<>(); + mappedFiles.add( existingFile( "a" ) ); + mappedFiles.add( existingFile( "b" ) ); + getPageCache( fs, maxPages, new FlushRendezvousTracer( mappedFiles.size() ), PageCursorTracerSupplier.NULL ); + + List mappedPagedFiles = new ArrayList<>(); + for ( File mappedFile : mappedFiles ) + { + PagedFile pagedFile = pageCache.map( mappedFile, filePageSize ); + mappedPagedFiles.add( pagedFile ); + try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) + { + assertTrue( cursor.next() ); + cursor.putInt( 1 ); + } + } + + pageCache.flushAndForce( IOLimiter.UNLIMITED ); + + IOUtils.closeAll( mappedPagedFiles ); + } ); + } + + private static class FlushRendezvousTracer extends DefaultPageCacheTracer + { + private final CountDownLatch latch; + + FlushRendezvousTracer( int fileCountToWaitFor ) + { + latch = new CountDownLatch( fileCountToWaitFor ); + } + + @Override + public MajorFlushEvent beginFileFlush( PageSwapper swapper ) + { + latch.countDown(); + try + { + latch.await(); + } + catch ( InterruptedException e ) + { + e.printStackTrace(); + } + return MajorFlushEvent.NULL; + } + } + private void evictAllPages( MuninnPageCache pageCache ) throws IOException { PageList pages = pageCache.pages; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/labelscan/NativeLabelScanStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/labelscan/NativeLabelScanStore.java index d86851575c388..79d9013770658 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/labelscan/NativeLabelScanStore.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/labelscan/NativeLabelScanStore.java @@ -433,7 +433,7 @@ public void start() throws IOException numberOfNodes = fullStoreChangeStream.applyTo( writer ); } - index.checkpoint( IOLimiter.unlimited(), writeClean ); + index.checkpoint( IOLimiter.UNLIMITED, writeClean ); monitor.rebuilt( numberOfNodes ); needsRebuild = false; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java index 7978e993d1fc9..73e2dfa49999e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java @@ -304,12 +304,12 @@ private void markTreeAsFailed() throws IOException { failureBytes = ArrayUtils.EMPTY_BYTE_ARRAY; } - tree.checkpoint( IOLimiter.unlimited(), new FailureHeaderWriter( failureBytes ) ); + tree.checkpoint( IOLimiter.UNLIMITED, new FailureHeaderWriter( failureBytes ) ); } void markTreeAsOnline() throws IOException { - tree.checkpoint( IOLimiter.unlimited(), pc -> pc.putByte( BYTE_ONLINE ) ); + tree.checkpoint( IOLimiter.UNLIMITED, pc -> pc.putByte( BYTE_ONLINE ) ); } static class IndexUpdateApply, VALUE extends NativeIndexValue> diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java index 28905db20e22a..e11b52d5105eb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java @@ -171,7 +171,7 @@ public synchronized void create() throws IOException @Override void markTreeAsOnline() throws IOException { - tree.checkpoint( IOLimiter.unlimited(), settings.headerWriter( BYTE_ONLINE ) ); + tree.checkpoint( IOLimiter.UNLIMITED, settings.headerWriter( BYTE_ONLINE ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java index 55c494c78ab3d..4a482bb3a7405 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java @@ -972,7 +972,7 @@ public void shutdown() try { repopulateAllIndexes(); - labelScanStore.force( IOLimiter.unlimited() ); + labelScanStore.force( IOLimiter.UNLIMITED ); } catch ( IOException | IndexEntryConflictException e ) { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java index 924bf4e8dec9e..f37dbc7bf945d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java @@ -74,7 +74,7 @@ import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory; import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.io.IOUtils.closeAll; -import static org.neo4j.io.pagecache.IOLimiter.unlimited; +import static org.neo4j.io.pagecache.IOLimiter.UNLIMITED; import static org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore.getLabelScanStoreFile; import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME; import static org.neo4j.kernel.impl.store.StoreType.PROPERTY; @@ -456,17 +456,17 @@ public void flushAndForce() } if ( neoStores != null ) { - neoStores.flush( unlimited() ); + neoStores.flush( UNLIMITED ); flushIdFiles( neoStores, StoreType.values() ); } if ( temporaryNeoStores != null ) { - temporaryNeoStores.flush( unlimited() ); + temporaryNeoStores.flush( UNLIMITED ); flushIdFiles( temporaryNeoStores, TEMP_STORE_TYPES ); } if ( labelScanStore != null ) { - labelScanStore.force( unlimited() ); + labelScanStore.force( UNLIMITED ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java index b45bd131b65b3..bf3d3235bed2e 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java @@ -118,7 +118,7 @@ public void flushOfThePageCacheHappensOnlyOnceDuringShutdown() throws IOExceptio ds.stop(); ds.shutdown(); - verify( pageCache ).flushAndForce( IOLimiter.unlimited() ); + verify( pageCache ).flushAndForce( IOLimiter.UNLIMITED ); } @Test @@ -134,7 +134,7 @@ public void flushOfThePageCacheOnShutdownHappensIfTheDbIsHealthy() throws IOExce ds.stop(); ds.shutdown(); - verify( pageCache ).flushAndForce( IOLimiter.unlimited() ); + verify( pageCache ).flushAndForce( IOLimiter.UNLIMITED ); } @Test @@ -154,7 +154,7 @@ public void flushOfThePageCacheOnShutdownDoesNotHappenIfTheDbIsUnhealthy() throw ds.stop(); ds.shutdown(); - verify( pageCache, never() ).flushAndForce( IOLimiter.unlimited() ); + verify( pageCache, never() ).flushAndForce( IOLimiter.UNLIMITED ); } @Test diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/impl/labelscan/LabelScanStoreTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/impl/labelscan/LabelScanStoreTest.java index 6824f6e4d6138..f8121f8ac4dbb 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/impl/labelscan/LabelScanStoreTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/impl/labelscan/LabelScanStoreTest.java @@ -125,7 +125,7 @@ public void failToRetrieveWriterOnReadOnlyScanStore() public void forceShouldNotForceWriterOnReadOnlyScanStore() { createAndStartReadOnly(); - store.force( IOLimiter.unlimited() ); + store.force( IOLimiter.UNLIMITED ); } @Test diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java index 6f8a51abe001d..a09c05b8a627d 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java @@ -151,7 +151,7 @@ public void shouldNotForceBeforeCreate() throws IOException IndexProxy outer = newContractCheckingIndexProxy( inner ); // WHEN - outer.force( IOLimiter.unlimited() ); + outer.force( IOLimiter.UNLIMITED ); } @Test( expected = IllegalStateException.class ) @@ -164,7 +164,7 @@ public void shouldNotForceAfterClose() throws IOException // WHEN outer.start(); outer.close(); - outer.force( IOLimiter.unlimited() ); + outer.force( IOLimiter.UNLIMITED ); } @Test( expected = /* THEN */ IllegalStateException.class ) @@ -297,7 +297,7 @@ public void force( IOLimiter ioLimiter ) actionThreadReference.set( actionThread ); outer.start(); - Thread thread = runInSeparateThread( () -> outer.force( IOLimiter.unlimited() ) ); + Thread thread = runInSeparateThread( () -> outer.force( IOLimiter.UNLIMITED ) ); ThreadTestUtils.awaitThreadState( actionThread, TEST_TIMEOUT, Thread.State.TIMED_WAITING ); latch.countDown(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java index d309bf1e18d23..3333ca9923dc4 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java @@ -1052,11 +1052,11 @@ public void flushAllIndexesWhileSomeOfThemDropped() throws IOException IndexingService indexingService = createIndexServiceWithCustomIndexMap( indexMapReference ); - indexingService.forceAll( IOLimiter.unlimited() ); - verify( validIndex1, times( 1 ) ).force( IOLimiter.unlimited() ); - verify( validIndex2, times( 1 ) ).force( IOLimiter.unlimited() ); - verify( validIndex3, times( 1 ) ).force( IOLimiter.unlimited() ); - verify( validIndex4, times( 1 ) ).force( IOLimiter.unlimited() ); + indexingService.forceAll( IOLimiter.UNLIMITED ); + verify( validIndex1, times( 1 ) ).force( IOLimiter.UNLIMITED ); + verify( validIndex2, times( 1 ) ).force( IOLimiter.UNLIMITED ); + verify( validIndex3, times( 1 ) ).force( IOLimiter.UNLIMITED ); + verify( validIndex4, times( 1 ) ).force( IOLimiter.UNLIMITED ); } @Test @@ -1080,7 +1080,7 @@ public void failForceAllWhenOneOfTheIndexesFailToForce() throws IOException expectedException.expectMessage( "Unable to force" ); expectedException.expect( UnderlyingStorageException.class ); - indexingService.forceAll( IOLimiter.unlimited() ); + indexingService.forceAll( IOLimiter.UNLIMITED ); } @Test diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexAccessorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexAccessorTest.java index 99aaa5e653ffa..c2a2fe49ca0e5 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexAccessorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexAccessorTest.java @@ -214,7 +214,7 @@ public void shouldHandleRandomUpdates() throws Exception applyUpdatesToExpectedData( expectedData, batch ); // verifyUpdates forceAndCloseAccessor(); - verifyUpdates( expectedData.toArray( new IndexEntryUpdate[expectedData.size()] ) ); + verifyUpdates( expectedData.toArray( new IndexEntryUpdate[0] ) ); setupAccessor(); } } @@ -551,7 +551,7 @@ public void forceShouldCheckpointTree() throws Exception processAll( data ); // when - accessor.force( IOLimiter.unlimited() ); + accessor.force( IOLimiter.UNLIMITED ); accessor.close(); // then @@ -878,7 +878,7 @@ else if ( !expectedData.isEmpty() && factor < (1 - removeFactor) * addChangeRati @SuppressWarnings( "unchecked" ) private IndexEntryUpdate selectRandomItem( Set> expectedData ) { - return expectedData.toArray( new IndexEntryUpdate[expectedData.size()] )[random.nextInt( expectedData.size() )]; + return expectedData.toArray( new IndexEntryUpdate[0] )[random.nextInt( expectedData.size() )]; } @SafeVarargs @@ -896,7 +896,7 @@ final void processAll( IndexEntryUpdate... updates ) private void forceAndCloseAccessor() throws IOException { - accessor.force( IOLimiter.unlimited() ); + accessor.force( IOLimiter.UNLIMITED ); closeAccessor(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NumberFullScanNonUniqueIndexSamplerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NumberFullScanNonUniqueIndexSamplerTest.java index 031e18bb71bfb..1e5e7bc525d47 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NumberFullScanNonUniqueIndexSamplerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NumberFullScanNonUniqueIndexSamplerTest.java @@ -33,7 +33,6 @@ import org.neo4j.storageengine.api.schema.IndexSample; import static org.junit.Assert.assertEquals; - import static org.neo4j.kernel.impl.index.schema.LayoutTestUtil.countUniqueValues; import static org.neo4j.values.storable.Values.values; @@ -90,7 +89,7 @@ private void buildTree( Number[] values ) throws IOException nodeId++; } } - gbpTree.checkpoint( IOLimiter.unlimited() ); + gbpTree.checkpoint( IOLimiter.UNLIMITED ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java index c7a9be692c71f..d59c866bc7e6f 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java @@ -164,7 +164,7 @@ public void obtainCountsStoreResetterAfterFailedTransaction() throws Throwable @Test public void mustFlushStoresWithGivenIOLimiter() { - IOLimiter limiter = ( stamp, completedIOs, swapper ) -> 0; + IOLimiter limiter = IOLimiter.UNLIMITED; FileSystemAbstraction fs = fsRule.get(); AtomicReference observedLimiter = new AtomicReference<>(); PageCache pageCache = new DelegatingPageCache( pageCacheRule.getPageCache( fs ) ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointSchedulerTest.java index 3ec396838e082..26a1a25ff8ec3 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointSchedulerTest.java @@ -276,7 +276,7 @@ public void checkpointOnStopShouldFlushAsFastAsPossible() throws Throwable checkpointerStarter.get(); assertTrue( "Checkpointer should be created.", checkPointer.isCheckpointCreated() ); - assertTrue( "Limiter should be enabled in the end.", ioLimiter.isLimitEnabled() ); + assertTrue( "Limiter should be enabled in the end.", ioLimiter.isLimited() ); } @Test @@ -366,7 +366,8 @@ public void enableLimit() limitEnabled = true; } - boolean isLimitEnabled() + @Override + public boolean isLimited() { return limitEnabled; } @@ -389,7 +390,7 @@ private static class WaitUnlimitedCheckPointer implements CheckPointer public long checkPointIfNeeded( TriggerInfo triggerInfo ) { latch.countDown(); - while ( ioLimiter.isLimitEnabled() ) + while ( ioLimiter.isLimited() ) { //spin while limiter enabled } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointerImplTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointerImplTest.java index 01afa6539edcc..c24c1e5a7b242 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointerImplTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/checkpoint/CheckPointerImplTest.java @@ -269,7 +269,20 @@ public void tryCheckPointShouldWaitTheCurrentCheckPointingToCompleteNoRunCheckPo @Test public void mustUseIoLimiterFromFlushing() throws Throwable { - limiter = ( stamp, ios, flushable ) -> 42; + limiter = new IOLimiter() + { + @Override + public long maybeLimitIO( long previousStamp, int recentlyCompletedIOs, Flushable flushable ) + { + return 42; + } + + @Override + public boolean isLimited() + { + return true; + } + }; when( threshold.isCheckPointingNeeded( anyLong(), eq( INFO ) ) ).thenReturn( true, false ); mockTxIdStore(); CheckPointerImpl checkPointing = checkPointer(); @@ -297,6 +310,12 @@ public void enableLimit() { doneDisablingLimits.set( true ); } + + @Override + public boolean isLimited() + { + return doneDisablingLimits.get(); + } }; mockTxIdStore(); CheckPointerImpl checkPointer = checkPointer(); @@ -322,6 +341,12 @@ public void enableLimit() { doneDisablingLimits.set( true ); } + + @Override + public boolean isLimited() + { + return doneDisablingLimits.get(); + } }; mockTxIdStore(); CheckPointerImpl checkPointer = checkPointer(); @@ -357,6 +382,12 @@ public void enableLimit() { limitDisableCounter.getAndDecrement(); } + + @Override + public boolean isLimited() + { + return limitDisableCounter.get() != 0; + } }; mockTxIdStore(); diff --git a/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java b/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java index 1fb7600d94c38..3a107a62aca11 100644 --- a/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java +++ b/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java @@ -130,7 +130,7 @@ logService, mock( JobScheduler.class, RETURNS_MOCKS ), mock( TokenNameLookup.cla new StandardConstraintSemantics(), monitors, new Tracers( "null", NullLog.getInstance(), monitors, jobScheduler, clock ), mock( Procedures.class ), - IOLimiter.unlimited(), + IOLimiter.UNLIMITED, availabilityGuard, clock, new CanWrite(), new StoreCopyCheckPointMutex(), RecoveryCleanupWorkCollector.IMMEDIATE, diff --git a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexPopulationIT.java b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexPopulationIT.java index cb000c068639a..1766eaf1a1245 100644 --- a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexPopulationIT.java +++ b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexPopulationIT.java @@ -106,7 +106,7 @@ public void partitionedIndexPopulation() throws Exception try ( LuceneIndexAccessor indexAccessor = new LuceneIndexAccessor( uniqueIndex, descriptor ) ) { generateUpdates( indexAccessor, affectedNodes ); - indexAccessor.force( IOLimiter.unlimited() ); + indexAccessor.force( IOLimiter.UNLIMITED ); // now index is online and should contain updates data assertTrue( uniqueIndex.isOnline() ); diff --git a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneIndexProviderTest.java b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneIndexProviderTest.java index bb271527f0809..c39b360c05899 100644 --- a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneIndexProviderTest.java +++ b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneIndexProviderTest.java @@ -117,7 +117,7 @@ public void indexForceMustBeAllowedInReadOnlyMode() throws Exception new DirectoryFactory.InMemoryDirectoryFactory(), fs, graphDbDir ); // We assert that 'force' does not throw an exception - getIndexAccessor( readOnlyConfig, readOnlyIndexProvider ).force( IOLimiter.unlimited() ); + getIndexAccessor( readOnlyConfig, readOnlyIndexProvider ).force( IOLimiter.UNLIMITED ); } private void createEmptySchemaIndex( DirectoryFactory directoryFactory ) throws IOException diff --git a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexIT.java b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexIT.java index 190e7fe654887..c800396f5272e 100644 --- a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexIT.java +++ b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexIT.java @@ -88,7 +88,7 @@ public void snapshotForPartitionedIndex() throws Exception try ( LuceneIndexAccessor indexAccessor = createDefaultIndexAccessor() ) { generateUpdates( indexAccessor, 32 ); - indexAccessor.force( IOLimiter.unlimited() ); + indexAccessor.force( IOLimiter.UNLIMITED ); // When & Then List singlePartitionFileTemplates = Arrays.asList( ".cfe", ".cfs", ".si", "segments_1" ); diff --git a/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/CommunityEditionModule.java b/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/CommunityEditionModule.java index 33f06b642746e..9ace857cf821f 100644 --- a/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/CommunityEditionModule.java +++ b/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/CommunityEditionModule.java @@ -141,7 +141,7 @@ public CommunityEditionModule( PlatformModule platformModule ) coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); - ioLimiter = IOLimiter.unlimited(); + ioLimiter = IOLimiter.UNLIMITED; registerRecovery( platformModule.databaseInfo, life, dependencies ); diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupProtocolServiceIT.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupProtocolServiceIT.java index 87f1729e5f739..4c34e87cc8a91 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupProtocolServiceIT.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupProtocolServiceIT.java @@ -144,7 +144,7 @@ public void write( int b ) private static final Label LABEL = Label.label( "LABEL" ); private final Monitors monitors = new Monitors(); - private final IOLimiter limiter = IOLimiter.unlimited(); + private final IOLimiter limiter = IOLimiter.UNLIMITED; private FileSystemAbstraction fileSystem; private Path storeDir; private Path backupDir; diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java b/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java index 2042249c7e0ad..6bde14b13b7fc 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java @@ -1354,7 +1354,7 @@ public void force( HighlyAvailableGraphDatabase... except ) { if ( !exceptSet.contains( db ) ) { - IOLimiter limiter = IOLimiter.unlimited(); + IOLimiter limiter = IOLimiter.UNLIMITED; db.getDependencyResolver().resolveDependency( StorageEngine.class ).flushAndForce( limiter ); } } diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiter.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiter.java index 003e1dea38636..d293984dea4d6 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiter.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiter.java @@ -188,6 +188,12 @@ public void enableLimit() while ( !stateUpdater.compareAndSet( this, currentState, newState ) ); } + @Override + public boolean isLimited() + { + return getDisabledCounter( state ) > 0; + } + private long currentTimeMillis() { return System.currentTimeMillis(); diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiterTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiterTest.java index 8d71309d9c1b3..70a8aa52b81e0 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiterTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/transaction/log/checkpoint/ConfigurableIOLimiterTest.java @@ -57,8 +57,7 @@ private void createIOLimiter( Config config ) private void createIOLimiter( int limit ) { - Map settings = stringMap( - GraphDatabaseSettings.check_point_iops_limit.name(), "" + limit ); + Map settings = stringMap( GraphDatabaseSettings.check_point_iops_limit.name(), "" + limit ); createIOLimiter( Config.defaults( settings ) ); } @@ -230,4 +229,67 @@ public void dynamicConfigurationUpdateDisablingLimiterMustNotDisableLimiter() repeatedlyCallMaybeLimitIO( limiter, stamp, 10 ); assertThat( pauseNanosCounter.get(), greaterThan( TimeUnit.SECONDS.toNanos( 9 ) ) ); } + + @Test + public void configuredLimitMustReflectCurrentState() + { + createIOLimiter( 100 ); + + assertThat( limiter.isLimited(), is( true ) ); + multipleDisableShouldReportUnlimited( limiter ); + assertThat( limiter.isLimited(), is( true ) ); + } + + @Test + public void configuredDisabledLimitShouldBeUnlimited() + { + createIOLimiter( -1 ); + + assertThat( limiter.isLimited(), is( false ) ); + multipleDisableShouldReportUnlimited( limiter ); + assertThat( limiter.isLimited(), is( false ) ); + } + + @Test + public void unlimitedShouldAlwaysBeUnlimited() + { + IOLimiter limiter = IOLimiter.UNLIMITED; + + assertThat( limiter.isLimited(), is( false ) ); + multipleDisableShouldReportUnlimited( limiter ); + assertThat( limiter.isLimited(), is( false ) ); + + limiter.enableLimit(); + try + { + assertThat( limiter.isLimited(), is( false ) ); + } + finally + { + limiter.disableLimit(); + } + } + + private static void multipleDisableShouldReportUnlimited( IOLimiter limiter ) + { + limiter.disableLimit(); + try + { + assertThat( limiter.isLimited(), is( false ) ); + limiter.disableLimit(); + try + { + assertThat( limiter.isLimited(), is( false ) ); + } + finally + { + limiter.enableLimit(); + } + assertThat( limiter.isLimited(), is( false ) ); + } + finally + { + limiter.enableLimit(); + } + } } diff --git a/enterprise/neo4j-enterprise/src/test/java/org/neo4j/HalfAppliedConstraintRecoveryIT.java b/enterprise/neo4j-enterprise/src/test/java/org/neo4j/HalfAppliedConstraintRecoveryIT.java index 38cd0a66e7d9e..270d94275dd74 100644 --- a/enterprise/neo4j-enterprise/src/test/java/org/neo4j/HalfAppliedConstraintRecoveryIT.java +++ b/enterprise/neo4j-enterprise/src/test/java/org/neo4j/HalfAppliedConstraintRecoveryIT.java @@ -293,7 +293,7 @@ private GraphDatabaseAPI newDb() private static void flushStores( GraphDatabaseAPI db ) { db.getDependencyResolver().resolveDependency( RecordStorageEngine.class ) - .testAccessNeoStores().flush( IOLimiter.unlimited() ); + .testAccessNeoStores().flush( IOLimiter.UNLIMITED ); } private static void apply( GraphDatabaseAPI db, List transactions ) diff --git a/tools/src/test/java/org/neo4j/tools/org/neo4j/index/GBPTreePlayground.java b/tools/src/test/java/org/neo4j/tools/org/neo4j/index/GBPTreePlayground.java index 008c89d54814c..d105957328446 100644 --- a/tools/src/test/java/org/neo4j/tools/org/neo4j/index/GBPTreePlayground.java +++ b/tools/src/test/java/org/neo4j/tools/org/neo4j/index/GBPTreePlayground.java @@ -132,7 +132,7 @@ private class Checkpoint implements Command @Override public void run( String[] args, PrintStream out ) throws Exception { - tree.checkpoint( IOLimiter.unlimited() ); + tree.checkpoint( IOLimiter.UNLIMITED ); } @Override public String toString()