Skip to content

Commit

Permalink
BlockBasedIndexPopulator reports more accurate progress
Browse files Browse the repository at this point in the history
Taking into account time spent in merging and building the tree.
  • Loading branch information
tinwelint committed Mar 6, 2019
1 parent 7fa3140 commit 31e8a3f
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 38 deletions.
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import org.neo4j.cursor.RawCursor; import org.neo4j.cursor.RawCursor;
import org.neo4j.index.internal.gbptree.Hit; import org.neo4j.index.internal.gbptree.Hit;
Expand All @@ -57,6 +58,8 @@
import org.neo4j.util.Preconditions; import org.neo4j.util.Preconditions;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


import static org.neo4j.helpers.collection.Iterables.first;
import static org.neo4j.kernel.impl.index.schema.BlockStorage.calculateNumberOfEntriesWrittenDuringMerges;
import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex;


Expand Down Expand Up @@ -89,16 +92,20 @@ public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,V
private final int mergeFactor; private final int mergeFactor;
private final BlockStorage.Monitor blockStorageMonitor; private final BlockStorage.Monitor blockStorageMonitor;
// written to in a synchronized method when creating new thread-local instances, read from when population completes // written to in a synchronized method when creating new thread-local instances, read from when population completes
private final List<BlockStorage<KEY,VALUE>> allScanUpdates = new CopyOnWriteArrayList<>(); private final List<ThreadLocalBlockStorage> allScanUpdates = new CopyOnWriteArrayList<>();
private final ThreadLocal<BlockStorage<KEY,VALUE>> scanUpdates; private final ThreadLocal<ThreadLocalBlockStorage> scanUpdates;
private final ByteBufferFactory bufferFactory = new UnsafeDirectByteBufferFactory( new LocalMemoryTracker() /*plug in actual tracker when available*/ ); private final ByteBufferFactory bufferFactory = new UnsafeDirectByteBufferFactory( new LocalMemoryTracker() /*plug in actual tracker when available*/ );
private IndexUpdateStorage<KEY,VALUE> externalUpdates; private IndexUpdateStorage<KEY,VALUE> externalUpdates;
// written in a synchronized method when creating new thread-local instances, read when processing external updates // written in a synchronized method when creating new thread-local instances, read when processing external updates
private volatile boolean merged; private volatile boolean scanCompleted;
private final CloseCancellation cancellation = new CloseCancellation(); private final CloseCancellation cancellation = new CloseCancellation();
// Will be instantiated right before merging and can be used to neatly await merge to complete // Will be instantiated right before merging and can be used to neatly await merge to complete
private volatile CountDownLatch mergeOngoingLatch; private volatile CountDownLatch mergeOngoingLatch;


// progress state
private volatile long numberOfAppliedScanUpdates;
private volatile long numberOfAppliedExternalUpdates;

BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor, BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor,
StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, boolean archiveFailedIndex ) IndexDirectoryStructure directoryStructure, boolean archiveFailedIndex )
Expand All @@ -120,16 +127,14 @@ public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,V
this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage ); this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage );
} }


private synchronized BlockStorage<KEY,VALUE> newThreadLocalBlockStorage() private synchronized ThreadLocalBlockStorage newThreadLocalBlockStorage()
{ {
Preconditions.checkState( !merged, "Already merged" );
Preconditions.checkState( !cancellation.cancelled(), "Already closed" ); Preconditions.checkState( !cancellation.cancelled(), "Already closed" );
Preconditions.checkState( !scanCompleted, "Scan has already been completed" );
try try
{ {
int id = allScanUpdates.size(); int id = allScanUpdates.size();
BlockStorage<KEY,VALUE> blockStorage = ThreadLocalBlockStorage blockStorage = new ThreadLocalBlockStorage( id );
new BlockStorage<>( layout, bufferFactory, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ),
blockStorageMonitor, blockSize );
allScanUpdates.add( blockStorage ); allScanUpdates.add( blockStorage );
return blockStorage; return blockStorage;
} }
Expand Down Expand Up @@ -174,7 +179,7 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates )
{ {
if ( !updates.isEmpty() ) if ( !updates.isEmpty() )
{ {
BlockStorage<KEY,VALUE> blockStorage = scanUpdates.get(); BlockStorage<KEY,VALUE> blockStorage = scanUpdates.get().blockStorage;
for ( IndexEntryUpdate<?> update : updates ) for ( IndexEntryUpdate<?> update : updates )
{ {
storeUpdate( update, blockStorage ); storeUpdate( update, blockStorage );
Expand Down Expand Up @@ -205,6 +210,7 @@ private void storeUpdate( IndexEntryUpdate<?> update, BlockStorage<KEY,VALUE> bl


private synchronized boolean markMergeStarted() private synchronized boolean markMergeStarted()
{ {
scanCompleted = true;
if ( cancellation.cancelled() ) if ( cancellation.cancelled() )
{ {
return false; return false;
Expand All @@ -230,8 +236,9 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict
{ {
ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() ); ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() );
List<Future<?>> mergeFutures = new ArrayList<>(); List<Future<?>> mergeFutures = new ArrayList<>();
for ( BlockStorage<KEY,VALUE> scanUpdates : allScanUpdates ) for ( ThreadLocalBlockStorage part : allScanUpdates )
{ {
BlockStorage<KEY,VALUE> scanUpdates = part.blockStorage;
mergeFutures.add( executorService.submit( () -> mergeFutures.add( executorService.submit( () ->
{ {
scanUpdates.doneAdding(); scanUpdates.doneAdding();
Expand Down Expand Up @@ -261,7 +268,6 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict
// Apply the external updates // Apply the external updates
phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL ); phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL );
writeExternalUpdatesToTree(); writeExternalUpdatesToTree();
merged = true;
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down Expand Up @@ -317,6 +323,7 @@ private void writeExternalUpdatesToTree() throws IOException, IndexEntryConflict
default: default:
throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() ); throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() );
} }
numberOfAppliedExternalUpdates++;
} }
} }


Expand Down Expand Up @@ -384,9 +391,9 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce
ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector(); ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector();
try ( MergingBlockEntryReader<KEY,VALUE> allEntries = new MergingBlockEntryReader<>( layout ) ) try ( MergingBlockEntryReader<KEY,VALUE> allEntries = new MergingBlockEntryReader<>( layout ) )
{ {
for ( BlockStorage<KEY,VALUE> scanUpdates : allScanUpdates ) for ( ThreadLocalBlockStorage part : allScanUpdates )
{ {
try ( BlockReader<KEY,VALUE> reader = scanUpdates.reader() ) try ( BlockReader<KEY,VALUE> reader = part.blockStorage.reader() )
{ {
BlockEntryReader<KEY,VALUE> singleMergedBlock = reader.nextBlock(); BlockEntryReader<KEY,VALUE> singleMergedBlock = reader.nextBlock();
if ( singleMergedBlock != null ) if ( singleMergedBlock != null )
Expand All @@ -411,6 +418,7 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce
{ {
conflictDetector.reportConflict( allEntries.key().asValues() ); conflictDetector.reportConflict( allEntries.key().asValues() );
} }
numberOfAppliedScanUpdates++;
} }
} }
} }
Expand All @@ -419,7 +427,7 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce
@Override @Override
public IndexUpdater newPopulatingUpdater() public IndexUpdater newPopulatingUpdater()
{ {
if ( merged ) if ( scanCompleted )
{ {
// Will need the reader from newReader, which a sub-class of this class implements // Will need the reader from newReader, which a sub-class of this class implements
return super.newPopulatingUpdater(); return super.newPopulatingUpdater();
Expand Down Expand Up @@ -510,15 +518,114 @@ private void closeBlockStorage()
} }
} }


List<Closeable> toClose = new ArrayList<>( allScanUpdates ); List<Closeable> toClose = allScanUpdates.stream().map( local -> local.blockStorage ).collect( Collectors.toCollection( ArrayList::new ) );
toClose.add( externalUpdates ); toClose.add( externalUpdates );
IOUtils.closeAllUnchecked( toClose ); IOUtils.closeAllUnchecked( toClose );
} }


@Override @Override
public PopulationProgress progress( PopulationProgress scanProgress ) public PopulationProgress progress( PopulationProgress scanProgress )
{ {
return scanProgress; // A general note on scanProgress.getTotal(). Before the scan is completed most progress parts will base their estimates on that value.
// It is known that it may be slightly higher than the actual value. This is fine, but it creates this small "jump" in the progress
// in the middle somewhere when it switches from scan to merge. This also exists in the most basic population progress reports, but
// there it will be less visible since it will jump from some close-to-100 percentage to 100% and ONLINE.

// This progress report will consist of a couple of smaller parts, weighted differently based on empirically collected values.
// The weights will not be absolutely correct in all environments, but they don't have to be either, it will just result in some
// slices of the percentage progression range progressing at slightly different paces. However, progression of progress reporting
// naturally fluctuates anyway due to data set and I/O etc. so this is not an actual problem.
PopulationProgress.MultiBuilder builder = PopulationProgress.multiple();

// Add scan progress (this one weights a bit heavier than the others)
builder.add( scanCompleted ? PopulationProgress.DONE : scanProgress, 4 );

// Add merge progress
if ( !allScanUpdates.isEmpty() )
{
// The parts are merged in parallel so just take the first one and it will represent the whole merge progress.
// It will be fairly accurate, but slightly off sometimes if other threads gets scheduling problems, i.e. if this part
// finish far apart from others.
ThreadLocalBlockStorage part = first( allScanUpdates );
long completed = 0;
long total;
if ( scanCompleted || part.blocksFlushed == 0 )
{
// We know the actual entry count to write during merge since we have been monitoring those values
completed = part.entriesMerged;
total = part.totalEntriesToMerge;
}
else
{
// Before scan completed the total will have to be scanProgress.getTotal(). Use the same logic that BlockStorage uses
// internally to calculate the total number of entries written during the merge. The estimate gets more and more accurate
// the longer the scan progresses.
long averageKeysPerBlock = part.keysFlushed / part.blocksFlushed;
long entryCount = scanProgress.getTotal();
total = calculateNumberOfEntriesWrittenDuringMerges( entryCount, entryCount / averageKeysPerBlock, MERGE_FACTOR );
}
builder.add( PopulationProgress.single( completed, total ), 1 );
}

// Add tree building (before scan completed it's based on scanProgress.getTotal() and after scan completed it's based on actual entry count)
long entryCount = scanCompleted ? allScanUpdates.stream().mapToLong( part -> part.count ).sum() : scanProgress.getCompleted();
builder.add( PopulationProgress.single( numberOfAppliedScanUpdates, entryCount ), 2 );

// Add external updates
builder.add( PopulationProgress.single( numberOfAppliedExternalUpdates, externalUpdates.count() ), 0.5f );

return builder.build();
}

/**
* Keeps track of a {@link BlockStorage} instance as well as monitoring some aspects of it to be able to provide a fairly accurate
* progress report from {@link BlockBasedIndexPopulator#progress(PopulationProgress)}.
*/
private class ThreadLocalBlockStorage extends BlockStorage.Monitor.Delegate
{
private final BlockStorage<KEY,VALUE> blockStorage;
private volatile long count;
private volatile long totalEntriesToMerge;
private volatile long entriesMerged;
private volatile long keysFlushed;
private volatile long blocksFlushed;

ThreadLocalBlockStorage( int id ) throws IOException
{
super( blockStorageMonitor );
this.blockStorage =
new BlockStorage<>( layout, bufferFactory, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ),
this, blockSize );
}

@Override
public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush )
{
super.blockFlushed( keyCount, numberOfBytes, positionAfterFlush );
keysFlushed += keyCount;
blocksFlushed++;
}

@Override
public void entryAdded( int entrySize )
{
super.entryAdded( entrySize );
count++;
}

@Override
public void mergeStarted( long totalEntriesToMerge )
{
super.mergeStarted( totalEntriesToMerge );
this.totalEntriesToMerge = totalEntriesToMerge;
}

@Override
public void entriesMerged( int entries )
{
super.entriesMerged( entries );
entriesMerged += entries;
}
} }


private static class CloseCancellation implements BlockStorage.Cancellation private static class CloseCancellation implements BlockStorage.Cancellation
Expand Down

0 comments on commit 31e8a3f

Please sign in to comment.