From 31e8a3f102688dad88d99cd86bfb047b8f31fcd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Tue, 26 Feb 2019 20:29:58 +0100 Subject: [PATCH] BlockBasedIndexPopulator reports more accurate progress Taking into account time spent in merging and building the tree. --- .../schema/BlockBasedIndexPopulator.java | 139 ++++++++++++++++-- .../impl/index/schema/BlockStorage.java | 125 ++++++++++++++-- .../impl/index/schema/IndexUpdateStorage.java | 8 + .../schema/BlockBasedIndexPopulatorTest.java | 48 +++++- .../impl/index/schema/BlockStorageTest.java | 32 +++- 5 files changed, 314 insertions(+), 38 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java index c3c0df83cc350..b927a0d093e00 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.neo4j.cursor.RawCursor; import org.neo4j.index.internal.gbptree.Hit; @@ -57,6 +58,8 @@ import org.neo4j.util.Preconditions; import org.neo4j.values.storable.Value; +import static org.neo4j.helpers.collection.Iterables.first; +import static org.neo4j.kernel.impl.index.schema.BlockStorage.calculateNumberOfEntriesWrittenDuringMerges; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; @@ -89,16 +92,20 @@ public abstract class BlockBasedIndexPopulator,V private final int mergeFactor; private final BlockStorage.Monitor blockStorageMonitor; // written to in a synchronized method when creating new thread-local instances, read from when population completes - private final List> allScanUpdates = new CopyOnWriteArrayList<>(); - private final ThreadLocal> scanUpdates; + private final List allScanUpdates = new CopyOnWriteArrayList<>(); + private final ThreadLocal scanUpdates; private final ByteBufferFactory bufferFactory = new UnsafeDirectByteBufferFactory( new LocalMemoryTracker() /*plug in actual tracker when available*/ ); private IndexUpdateStorage externalUpdates; // written in a synchronized method when creating new thread-local instances, read when processing external updates - private volatile boolean merged; + private volatile boolean scanCompleted; private final CloseCancellation cancellation = new CloseCancellation(); // Will be instantiated right before merging and can be used to neatly await merge to complete private volatile CountDownLatch mergeOngoingLatch; + // progress state + private volatile long numberOfAppliedScanUpdates; + private volatile long numberOfAppliedExternalUpdates; + BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout layout, IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, IndexDirectoryStructure directoryStructure, boolean archiveFailedIndex ) @@ -120,16 +127,14 @@ public abstract class BlockBasedIndexPopulator,V this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage ); } - private synchronized BlockStorage newThreadLocalBlockStorage() + private synchronized ThreadLocalBlockStorage newThreadLocalBlockStorage() { - Preconditions.checkState( !merged, "Already merged" ); Preconditions.checkState( !cancellation.cancelled(), "Already closed" ); + Preconditions.checkState( !scanCompleted, "Scan has already been completed" ); try { int id = allScanUpdates.size(); - BlockStorage blockStorage = - new BlockStorage<>( layout, bufferFactory, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ), - blockStorageMonitor, blockSize ); + ThreadLocalBlockStorage blockStorage = new ThreadLocalBlockStorage( id ); allScanUpdates.add( blockStorage ); return blockStorage; } @@ -174,7 +179,7 @@ public void add( Collection> updates ) { if ( !updates.isEmpty() ) { - BlockStorage blockStorage = scanUpdates.get(); + BlockStorage blockStorage = scanUpdates.get().blockStorage; for ( IndexEntryUpdate update : updates ) { storeUpdate( update, blockStorage ); @@ -205,6 +210,7 @@ private void storeUpdate( IndexEntryUpdate update, BlockStorage bl private synchronized boolean markMergeStarted() { + scanCompleted = true; if ( cancellation.cancelled() ) { return false; @@ -230,8 +236,9 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict { ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() ); List> mergeFutures = new ArrayList<>(); - for ( BlockStorage scanUpdates : allScanUpdates ) + for ( ThreadLocalBlockStorage part : allScanUpdates ) { + BlockStorage scanUpdates = part.blockStorage; mergeFutures.add( executorService.submit( () -> { scanUpdates.doneAdding(); @@ -261,7 +268,6 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict // Apply the external updates phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL ); writeExternalUpdatesToTree(); - merged = true; } catch ( IOException e ) { @@ -317,6 +323,7 @@ private void writeExternalUpdatesToTree() throws IOException, IndexEntryConflict default: throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() ); } + numberOfAppliedExternalUpdates++; } } @@ -384,9 +391,9 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce ConflictDetectingValueMerger conflictDetector = getMainConflictDetector(); try ( MergingBlockEntryReader allEntries = new MergingBlockEntryReader<>( layout ) ) { - for ( BlockStorage scanUpdates : allScanUpdates ) + for ( ThreadLocalBlockStorage part : allScanUpdates ) { - try ( BlockReader reader = scanUpdates.reader() ) + try ( BlockReader reader = part.blockStorage.reader() ) { BlockEntryReader singleMergedBlock = reader.nextBlock(); if ( singleMergedBlock != null ) @@ -411,6 +418,7 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce { conflictDetector.reportConflict( allEntries.key().asValues() ); } + numberOfAppliedScanUpdates++; } } } @@ -419,7 +427,7 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce @Override public IndexUpdater newPopulatingUpdater() { - if ( merged ) + if ( scanCompleted ) { // Will need the reader from newReader, which a sub-class of this class implements return super.newPopulatingUpdater(); @@ -510,7 +518,7 @@ private void closeBlockStorage() } } - List toClose = new ArrayList<>( allScanUpdates ); + List toClose = allScanUpdates.stream().map( local -> local.blockStorage ).collect( Collectors.toCollection( ArrayList::new ) ); toClose.add( externalUpdates ); IOUtils.closeAllUnchecked( toClose ); } @@ -518,7 +526,106 @@ private void closeBlockStorage() @Override public PopulationProgress progress( PopulationProgress scanProgress ) { - return scanProgress; + // A general note on scanProgress.getTotal(). Before the scan is completed most progress parts will base their estimates on that value. + // It is known that it may be slightly higher than the actual value. This is fine, but it creates this small "jump" in the progress + // in the middle somewhere when it switches from scan to merge. This also exists in the most basic population progress reports, but + // there it will be less visible since it will jump from some close-to-100 percentage to 100% and ONLINE. + + // This progress report will consist of a couple of smaller parts, weighted differently based on empirically collected values. + // The weights will not be absolutely correct in all environments, but they don't have to be either, it will just result in some + // slices of the percentage progression range progressing at slightly different paces. However, progression of progress reporting + // naturally fluctuates anyway due to data set and I/O etc. so this is not an actual problem. + PopulationProgress.MultiBuilder builder = PopulationProgress.multiple(); + + // Add scan progress (this one weights a bit heavier than the others) + builder.add( scanCompleted ? PopulationProgress.DONE : scanProgress, 4 ); + + // Add merge progress + if ( !allScanUpdates.isEmpty() ) + { + // The parts are merged in parallel so just take the first one and it will represent the whole merge progress. + // It will be fairly accurate, but slightly off sometimes if other threads gets scheduling problems, i.e. if this part + // finish far apart from others. + ThreadLocalBlockStorage part = first( allScanUpdates ); + long completed = 0; + long total; + if ( scanCompleted || part.blocksFlushed == 0 ) + { + // We know the actual entry count to write during merge since we have been monitoring those values + completed = part.entriesMerged; + total = part.totalEntriesToMerge; + } + else + { + // Before scan completed the total will have to be scanProgress.getTotal(). Use the same logic that BlockStorage uses + // internally to calculate the total number of entries written during the merge. The estimate gets more and more accurate + // the longer the scan progresses. + long averageKeysPerBlock = part.keysFlushed / part.blocksFlushed; + long entryCount = scanProgress.getTotal(); + total = calculateNumberOfEntriesWrittenDuringMerges( entryCount, entryCount / averageKeysPerBlock, MERGE_FACTOR ); + } + builder.add( PopulationProgress.single( completed, total ), 1 ); + } + + // Add tree building (before scan completed it's based on scanProgress.getTotal() and after scan completed it's based on actual entry count) + long entryCount = scanCompleted ? allScanUpdates.stream().mapToLong( part -> part.count ).sum() : scanProgress.getCompleted(); + builder.add( PopulationProgress.single( numberOfAppliedScanUpdates, entryCount ), 2 ); + + // Add external updates + builder.add( PopulationProgress.single( numberOfAppliedExternalUpdates, externalUpdates.count() ), 0.5f ); + + return builder.build(); + } + + /** + * Keeps track of a {@link BlockStorage} instance as well as monitoring some aspects of it to be able to provide a fairly accurate + * progress report from {@link BlockBasedIndexPopulator#progress(PopulationProgress)}. + */ + private class ThreadLocalBlockStorage extends BlockStorage.Monitor.Delegate + { + private final BlockStorage blockStorage; + private volatile long count; + private volatile long totalEntriesToMerge; + private volatile long entriesMerged; + private volatile long keysFlushed; + private volatile long blocksFlushed; + + ThreadLocalBlockStorage( int id ) throws IOException + { + super( blockStorageMonitor ); + this.blockStorage = + new BlockStorage<>( layout, bufferFactory, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ), + this, blockSize ); + } + + @Override + public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush ) + { + super.blockFlushed( keyCount, numberOfBytes, positionAfterFlush ); + keysFlushed += keyCount; + blocksFlushed++; + } + + @Override + public void entryAdded( int entrySize ) + { + super.entryAdded( entrySize ); + count++; + } + + @Override + public void mergeStarted( long totalEntriesToMerge ) + { + super.mergeStarted( totalEntriesToMerge ); + this.totalEntriesToMerge = totalEntriesToMerge; + } + + @Override + public void entriesMerged( int entries ) + { + super.entriesMerged( entries ); + entriesMerged += entries; + } } private static class CloseCancellation implements BlockStorage.Cancellation diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java index 3425351cefca4..be4ea31a1f666 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; +import java.util.function.IntConsumer; import org.neo4j.index.internal.gbptree.Layout; import org.neo4j.io.IOUtils; @@ -60,9 +61,10 @@ class BlockStorage implements Closeable private final int blockSize; private final ByteBufferFactory bufferFactory; private final File blockFile; - private int numberOfBlocksInCurrentFile; + private long numberOfBlocksInCurrentFile; private int currentBufferSize; private boolean doneAdding; + private long entryCount; BlockStorage( Layout layout, ByteBufferFactory bufferFactory, FileSystemAbstraction fs, File blockFile, Monitor monitor, int blockSize ) throws IOException @@ -120,7 +122,7 @@ private void flushAndResetBuffer() throws IOException bufferedEntries.sortThis( comparator ); ListBasedBlockEntryCursor entries = new ListBasedBlockEntryCursor<>( bufferedEntries ); - writeBlock( storeChannel, entries, blockSize, bufferedEntries.size(), NOT_CANCELLABLE ); + writeBlock( storeChannel, entries, blockSize, bufferedEntries.size(), NOT_CANCELLABLE, count -> entryCount += count ); // Append to file monitor.blockFlushed( bufferedEntries.size(), currentBufferSize, storeChannel.position() ); @@ -144,6 +146,7 @@ private void flushAndResetBuffer() throws IOException */ public void merge( int mergeFactor, Cancellation cancellation ) throws IOException { + monitor.mergeStarted( calculateNumberOfEntriesWrittenDuringMerges( entryCount, numberOfBlocksInCurrentFile, mergeFactor ) ); File sourceFile = blockFile; File tempFile = new File( blockFile.getParent(), blockFile.getName() + ".b" ); try @@ -156,8 +159,8 @@ public void merge( int mergeFactor, Cancellation cancellation ) throws IOExcepti try ( BlockReader reader = reader( sourceFile ); StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) ) { - int blocksMergedSoFar = 0; - int blocksInMergedFile = 0; + long blocksMergedSoFar = 0; + long blocksInMergedFile = 0; while ( !cancellation.cancelled() && blocksMergedSoFar < numberOfBlocksInCurrentFile ) { blocksMergedSoFar += performSingleMerge( mergeFactor, reader, targetChannel, cancellation ); @@ -187,6 +190,26 @@ public void merge( int mergeFactor, Cancellation cancellation ) throws IOExcepti } } + /** + * Calculates number of entries that will be written, given an entry count, number of blocks and a merge factor. + * During merge entries are merged and written, potentially multiple times depending on number of blocks and merge factor. + * + * @param entryCount number of entries to merge. + * @param numberOfBlocks number of blocks that these entries exist in. + * @param mergeFactor the merge factor to use when merging. + * @return number of entries written in total when merging these entries, which exists in the given number of blocks, + * merged with the given merge factor. + */ + static long calculateNumberOfEntriesWrittenDuringMerges( long entryCount, long numberOfBlocks, int mergeFactor ) + { + int singleMerges = 1; + for ( long blocks = numberOfBlocks; blocks > mergeFactor; blocks /= mergeFactor ) + { + singleMerges++; + } + return singleMerges * entryCount; + } + /** * Merge some number of blocks, how many is decided by mergeFactor, into a single sorted block. This is done by opening {@link BlockEntryReader} on each * block that we want to merge and give them to a {@link MergingBlockEntryReader}. The {@link BlockEntryReader}s are pulled from a {@link BlockReader} @@ -196,7 +219,7 @@ public void merge( int mergeFactor, Cancellation cancellation ) throws IOExcepti * like a single large and sorted entry reader. * * The large block resulting from the merge is written down to targetChannel by calling - * {@link #writeBlock(StoreChannel, BlockEntryCursor, long, long, Cancellation)}. + * {@link #writeBlock(StoreChannel, BlockEntryCursor, long, long, Cancellation, IntConsumer)}. * * @param mergeFactor How many blocks to merge at the same time. Influence how much memory will be used because each merge block will have it's own * {@link ByteBuffer} that they read from. @@ -230,17 +253,17 @@ private int performSingleMerge( int mergeFactor, BlockReader reader, } } - writeBlock( targetChannel, merger, blockSize, entryCount, cancellation ); + writeBlock( targetChannel, merger, blockSize, entryCount, cancellation, monitor::entriesMerged ); monitor.mergedBlocks( blockSize, entryCount, blocksMerged ); return blocksMerged; } } private void writeBlock( StoreChannel targetChannel, BlockEntryCursor blockEntryCursor, long blockSize, long entryCount, - Cancellation cancellation ) throws IOException + Cancellation cancellation, IntConsumer entryCountReporter ) throws IOException { writeHeader( byteBuffer, blockSize, entryCount ); - long actualDataSize = writeEntries( targetChannel, byteBuffer, layout, blockEntryCursor, cancellation ); + long actualDataSize = writeEntries( targetChannel, byteBuffer, layout, blockEntryCursor, cancellation, entryCountReporter ); writeLastEntriesWithPadding( targetChannel, byteBuffer, blockSize - actualDataSize ); } @@ -251,17 +274,19 @@ private static void writeHeader( ByteBuffer byteBuffer, long blockSize, long ent } private static long writeEntries( StoreChannel targetChannel, ByteBuffer byteBuffer, Layout layout, - BlockEntryCursor blockEntryCursor, Cancellation cancellation ) throws IOException + BlockEntryCursor blockEntryCursor, Cancellation cancellation, IntConsumer entryCountReporter ) throws IOException { // Loop over block entries long actualDataSize = BLOCK_HEADER_SIZE; ByteArrayPageCursor pageCursor = new ByteArrayPageCursor( byteBuffer ); + int unreportedEntrySize = 0; while ( blockEntryCursor.next() ) { KEY key = blockEntryCursor.key(); VALUE value = blockEntryCursor.value(); int entrySize = BlockEntry.entrySize( layout, key, value ); actualDataSize += entrySize; + unreportedEntrySize++; if ( byteBuffer.remaining() < entrySize ) { @@ -275,10 +300,16 @@ private static long writeEntries( StoreChannel targetChannel, ByteB byteBuffer.flip(); targetChannel.writeAll( byteBuffer ); byteBuffer.clear(); + entryCountReporter.accept( unreportedEntrySize ); + unreportedEntrySize = 0; } BlockEntry.write( pageCursor, layout, key, value ); } + if ( unreportedEntrySize > 0 ) + { + entryCountReporter.accept( unreportedEntrySize ); + } return actualDataSize; } @@ -325,9 +356,21 @@ public interface Monitor void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush ); - void mergeIterationFinished( int numberOfBlocksBefore, int numberOfBlocksAfter ); + /** + * @param totalEntriesToMerge total entries that will be written, even accounting for that entries may need to be + * written multiple times back and forth. + */ + void mergeStarted( long totalEntriesToMerge ); - void mergedBlocks( long resultingBlockSize, long resultingEntryCount, int numberOfBlocks ); + /** + * @param entries number of entries merged since last call. The sum of this value from all calls to this method + * will in the end match the value provided in {@link #mergeStarted(long)}. + */ + void entriesMerged( int entries ); + + void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ); + + void mergedBlocks( long resultingBlockSize, long resultingEntryCount, long numberOfBlocks ); class Adapter implements Monitor { @@ -342,16 +385,72 @@ public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFl } @Override - public void mergeIterationFinished( int numberOfBlocksBefore, int numberOfBlocksAfter ) + public void mergeStarted( long totalEntriesToMerge ) + { // no-op + } + + @Override + public void entriesMerged( int entries ) + { // no-op + } + + @Override + public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ) { // no-op } @Override - public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, int numberOfBlocks ) + public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, long numberOfBlocks ) { // no-op } } + class Delegate implements Monitor + { + private final Monitor actual; + + @Override + public void entryAdded( int entrySize ) + { + actual.entryAdded( entrySize ); + } + + @Override + public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush ) + { + actual.blockFlushed( keyCount, numberOfBytes, positionAfterFlush ); + } + + @Override + public void mergeStarted( long totalEntriesToMerge ) + { + actual.mergeStarted( totalEntriesToMerge ); + } + + @Override + public void entriesMerged( int entries ) + { + actual.entriesMerged( entries ); + } + + @Override + public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ) + { + actual.mergeIterationFinished( numberOfBlocksBefore, numberOfBlocksAfter ); + } + + @Override + public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, long numberOfBlocks ) + { + actual.mergedBlocks( resultingBlockSize, resultingEntryCount, numberOfBlocks ); + } + + public Delegate( Monitor actual ) + { + this.actual = actual; + } + } + Monitor NO_MONITOR = new Adapter(); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/IndexUpdateStorage.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/IndexUpdateStorage.java index ebb4a8aa33b42..4d7dc560c7a50 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/IndexUpdateStorage.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/IndexUpdateStorage.java @@ -56,6 +56,7 @@ public class IndexUpdateStorage,VALUE extends Na private final KEY key1; private final KEY key2; private final VALUE value; + private volatile long count; IndexUpdateStorage( Layout layout, FileSystemAbstraction fs, File file, ByteBufferFactory byteBufferFactory, int blockSize ) throws IOException { @@ -102,6 +103,8 @@ public void add( IndexEntryUpdate update ) throws IOException pageCursor.putByte( (byte) updateMode.ordinal() ); IndexUpdateEntry.write( pageCursor, layout, updateMode, key1, key2, value ); + // a single thread, and the same thread every time, increments this count + count++; } void doneAdding() throws IOException @@ -128,6 +131,11 @@ private void flush() throws IOException buffer.clear(); } + long count() + { + return count; + } + @Override public void close() throws IOException { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java index eb1b364a5775a..23e2becdc62d2 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java @@ -39,12 +39,14 @@ import org.neo4j.kernel.impl.index.schema.config.ConfiguredSpaceFillingCurveSettingsCache; import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache; import org.neo4j.storageengine.api.schema.IndexDescriptorFactory; +import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.test.Barrier; import org.neo4j.test.Race; import org.neo4j.test.rule.PageCacheAndDependenciesRule; import org.neo4j.test.rule.concurrent.OtherThreadRule; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.neo4j.kernel.api.index.IndexDirectoryStructure.directoriesByProvider; import static org.neo4j.test.OtherThreadExecutor.command; @@ -68,7 +70,7 @@ public class BlockBasedIndexPopulatorTest public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Exception { // given - TrappingMonitor monitor = new TrappingMonitor(); + TrappingMonitor monitor = new TrappingMonitor( false ); BlockBasedIndexPopulator populator = instantiatePopulatorWithSomeData( monitor ); // when starting to merge (in a separate thread) @@ -85,6 +87,32 @@ public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Ex assertTrue( mergeFuture.isDone() ); } + @Test + public void shouldReportAccurateProgressThroughoutThePhases() throws Exception + { + // given + TrappingMonitor monitor = new TrappingMonitor( true ); + BlockBasedIndexPopulator populator = instantiatePopulatorWithSomeData( monitor ); + try + { + // when starting to merge (in a separate thread) + Future mergeFuture = t2.execute( command( () -> populator.scanCompleted( PhaseTracker.nullInstance ) ) ); + // and waiting for merge to get going + monitor.barrier.awaitUninterruptibly(); + // this is a bit fuzzy, but what we want is to assert that the scan doesn't represent 100% of the work + assertEquals( 0.5f, populator.progress( PopulationProgress.DONE ).getProgress(), 0.1f ); + monitor.barrier.release(); + monitor.mergeFinishedBarrier.awaitUninterruptibly(); + assertEquals( 0.7f, populator.progress( PopulationProgress.DONE ).getProgress(), 0.1f ); + monitor.mergeFinishedBarrier.release(); + mergeFuture.get(); + } + finally + { + populator.close( true ); + } + } + @Test public void shouldCorrectlyDecideToAwaitMergeDependingOnProgress() throws Throwable { @@ -138,11 +166,27 @@ private static Collection> batchOfUpdates() private static class TrappingMonitor extends BlockStorage.Monitor.Adapter { private final Barrier.Control barrier = new Barrier.Control(); + private final Barrier.Control mergeFinishedBarrier = new Barrier.Control(); + private final boolean alsoTrapAfterMergeCompleted; + + TrappingMonitor( boolean alsoTrapAfterMergeCompleted ) + { + this.alsoTrapAfterMergeCompleted = alsoTrapAfterMergeCompleted; + } @Override - public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, int numberOfBlocks ) + public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, long numberOfBlocks ) { barrier.reached(); } + + @Override + public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ) + { + if ( numberOfBlocksAfter == 1 && alsoTrapAfterMergeCompleted ) + { + mergeFinishedBarrier.reached(); + } + } } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java index 13805c414fdf0..dd9d46738352f 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java @@ -49,6 +49,7 @@ import static java.util.Collections.singletonList; import static java.util.Comparator.comparingLong; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -215,6 +216,8 @@ void shouldMergeMultipleBlocks() throws IOException // then assertContents( layout, storage, asOneBigBlock( expectedBlocks ) ); + assertThat( monitor.totalEntriesToMerge, greaterThanOrEqualTo( monitor.entryAddedCallCount ) ); + assertEquals( monitor.totalEntriesToMerge, monitor.entriesMerged ); } } @@ -275,7 +278,7 @@ void shouldNoticeCancelRequest() throws IOException, ExecutionException, Interru TrackingMonitor monitor = new TrackingMonitor() { @Override - public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, int numberOfBlocks ) + public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, long numberOfBlocks ) { super.mergedBlocks( resultingBlockSize, resultingEntryCount, numberOfBlocks ); barrier.reached(); @@ -412,7 +415,7 @@ private void assertContents( SimpleLongLayout layout, BlockStorage