Skip to content

Commit

Permalink
BlockBasedIndexPopulator#drop free memory and improve test coverage
Browse files Browse the repository at this point in the history
BlockBasedIndexPopulator
- Drop also close bufferFactory to free allocated memory.
- Drop and Close steps are run through Runnables.runAll, making sure
  that we run all steps even if some fail.
- Extract method mergeScanUpdates() for readability.
- Constructor takes MemoryAllocationTracker to make testing possible.

BlockBasedIndexPopulatorTest
- Close populator in finally.
- Separate initiate populator and add data.
- Add tests for memory allocation.

GenericBlockBasedPopulatorTest (new)
- Make sure external updates are visible to reader, both added before
  and after scanCompleted.
  • Loading branch information
burqen committed Mar 7, 2019
1 parent a0de978 commit 1809421
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 93 deletions.
Expand Up @@ -52,6 +52,7 @@
import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache; import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache;
import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter; import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter;
import org.neo4j.memory.LocalMemoryTracker; import org.neo4j.memory.LocalMemoryTracker;
import org.neo4j.memory.MemoryAllocationTracker;
import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.PopulationProgress;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.util.FeatureToggles; import org.neo4j.util.FeatureToggles;
Expand All @@ -62,6 +63,7 @@
import static org.neo4j.kernel.impl.index.schema.BlockStorage.Monitor.NO_MONITOR; import static org.neo4j.kernel.impl.index.schema.BlockStorage.Monitor.NO_MONITOR;
import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex;
import static org.neo4j.util.concurrent.Runnables.runAll;


public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> extends NativeIndexPopulator<KEY,VALUE> public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> extends NativeIndexPopulator<KEY,VALUE>
{ {
Expand Down Expand Up @@ -95,7 +97,7 @@ public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,V
// written to in a synchronized method when creating new thread-local instances, read from when population completes // written to in a synchronized method when creating new thread-local instances, read from when population completes
private final List<ThreadLocalBlockStorage> allScanUpdates = new CopyOnWriteArrayList<>(); private final List<ThreadLocalBlockStorage> allScanUpdates = new CopyOnWriteArrayList<>();
private final ThreadLocal<ThreadLocalBlockStorage> scanUpdates; private final ThreadLocal<ThreadLocalBlockStorage> scanUpdates;
private final ByteBufferFactory bufferFactory = new UnsafeDirectByteBufferFactory( new LocalMemoryTracker() /*plug in actual tracker when available*/ ); private final ByteBufferFactory bufferFactory;
private IndexUpdateStorage<KEY,VALUE> externalUpdates; private IndexUpdateStorage<KEY,VALUE> externalUpdates;
// written in a synchronized method when creating new thread-local instances, read when processing external updates // written in a synchronized method when creating new thread-local instances, read when processing external updates
private volatile boolean scanCompleted; private volatile boolean scanCompleted;
Expand All @@ -112,13 +114,13 @@ public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,V
IndexDirectoryStructure directoryStructure, IndexDropAction dropAction, boolean archiveFailedIndex ) IndexDirectoryStructure directoryStructure, IndexDropAction dropAction, boolean archiveFailedIndex )
{ {
this( pageCache, fs, file, layout, monitor, descriptor, spatialSettings, directoryStructure, dropAction, archiveFailedIndex, parseBlockSize(), this( pageCache, fs, file, layout, monitor, descriptor, spatialSettings, directoryStructure, dropAction, archiveFailedIndex, parseBlockSize(),
MERGE_FACTOR, NO_MONITOR ); MERGE_FACTOR, NO_MONITOR, new LocalMemoryTracker() /*plug in actual tracker when available*/ );
} }


BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor, BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor,
StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, IndexDropAction dropAction, boolean archiveFailedIndex, IndexDirectoryStructure directoryStructure, IndexDropAction dropAction, boolean archiveFailedIndex,
int blockSize, int mergeFactor, BlockStorage.Monitor blockStorageMonitor ) int blockSize, int mergeFactor, BlockStorage.Monitor blockStorageMonitor, MemoryAllocationTracker memoryTracker )
{ {
super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) ); super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) );
this.directoryStructure = directoryStructure; this.directoryStructure = directoryStructure;
Expand All @@ -128,6 +130,7 @@ public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,V
this.mergeFactor = mergeFactor; this.mergeFactor = mergeFactor;
this.blockStorageMonitor = blockStorageMonitor; this.blockStorageMonitor = blockStorageMonitor;
this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage ); this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage );
this.bufferFactory = new UnsafeDirectByteBufferFactory( memoryTracker );
} }


private synchronized ThreadLocalBlockStorage newThreadLocalBlockStorage() private synchronized ThreadLocalBlockStorage newThreadLocalBlockStorage()
Expand Down Expand Up @@ -237,28 +240,7 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict
phaseTracker.enterPhase( PhaseTracker.Phase.MERGE ); phaseTracker.enterPhase( PhaseTracker.Phase.MERGE );
if ( !allScanUpdates.isEmpty() ) if ( !allScanUpdates.isEmpty() )
{ {
ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() ); mergeScanUpdates();
List<Future<?>> mergeFutures = new ArrayList<>();
for ( ThreadLocalBlockStorage part : allScanUpdates )
{
BlockStorage<KEY,VALUE> scanUpdates = part.blockStorage;
mergeFutures.add( executorService.submit( () ->
{
scanUpdates.doneAdding();
scanUpdates.merge( mergeFactor, cancellation );
return null;
} ) );
}
executorService.shutdown();
while ( !executorService.awaitTermination( 1, TimeUnit.SECONDS ) )
{
// just wait longer
}
// Let potential exceptions in the merge threads have a chance to propagate
for ( Future<?> mergeFuture : mergeFutures )
{
mergeFuture.get();
}
} }


externalUpdates.doneAdding(); externalUpdates.doneAdding();
Expand Down Expand Up @@ -297,6 +279,32 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict
} }
} }


private void mergeScanUpdates() throws InterruptedException, ExecutionException
{
ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() );
List<Future<?>> mergeFutures = new ArrayList<>();
for ( ThreadLocalBlockStorage part : allScanUpdates )
{
BlockStorage<KEY,VALUE> scanUpdates = part.blockStorage;
mergeFutures.add( executorService.submit( () ->
{
scanUpdates.doneAdding();
scanUpdates.merge( mergeFactor, cancellation );
return null;
} ) );
}
executorService.shutdown();
while ( !executorService.awaitTermination( 1, TimeUnit.SECONDS ) )
{
// just wait longer
}
// Let potential exceptions in the merge threads have a chance to propagate
for ( Future<?> mergeFuture : mergeFutures )
{
mergeFuture.get();
}
}

/** /**
* We will loop over all external updates once to add them to the tree. This is done without checking any uniqueness. * We will loop over all external updates once to add them to the tree. This is done without checking any uniqueness.
* If index is a uniqueness index we will then loop over external updates again and for each ADD or CHANGED update * If index is a uniqueness index we will then loop over external updates again and for each ADD or CHANGED update
Expand Down Expand Up @@ -473,32 +481,22 @@ private void assertOpen()
@Override @Override
public synchronized void drop() public synchronized void drop()
{ {
try runAll( "Failed while trying to drop index",
{ this::closeBlockStorage /* Close internal resources */,
// Close internal resources bufferFactory::close /* Free all allocated byte buffers */,
closeBlockStorage(); super::drop /* Super drop will close inherited resources */,
} () -> dropAction.drop( descriptor.getId(), archiveFailedIndex ) /* Cleanup files */
finally );
{
// Super drop will close inherited resources
super.drop();
// Cleanup files
dropAction.drop( descriptor.getId(), archiveFailedIndex );
}
} }


@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) public synchronized void close( boolean populationCompletedSuccessfully )
{ {
try runAll( "Failed while trying to close index",
{ this::closeBlockStorage /* Close internal resources */,
closeBlockStorage(); bufferFactory::close /* Free all allocated byte buffers */,
} () -> super.close( populationCompletedSuccessfully ) /* Super close will close inherited resources */
finally );
{
bufferFactory.close();
super.close( populationCompletedSuccessfully );
}
} }


// Always called from synchronized method // Always called from synchronized method
Expand Down

0 comments on commit 1809421

Please sign in to comment.