Skip to content

Commit

Permalink
BlockBasedIndexPopulator supports parallel threads adding scan batches
Browse files Browse the repository at this point in the history
Merging is done by multiple threads from an ExecutorService, ideally merging
jobs would be passed to JobScheduler but for now that is not available to us.

BlockBasedIndexPopulator becomes abstract and implemented by
GenericBlockBasedIndexPopulator, because we need to construct specific
index reader to be used by DeferredConflictCheckingIndexUpdater.

Stop hiding BlockBasedIndexPopulator behind WorkSyncedNativeIndexPopulator,
as we can now handle concurrent adds.

Merge factor is controlled by FeatureToggle.
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent 8723304 commit 6d305b0
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 43 deletions.
Expand Up @@ -578,7 +578,11 @@ void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException
{ {
if ( populationOngoing ) if ( populationOngoing )
{ {
populator.add( takeCurrentBatch() ); Collection<IndexEntryUpdate<?>> finalBatch = takeCurrentBatch();
if ( !finalBatch.isEmpty() )
{
populator.add( finalBatch );
}
populateFromQueue( 0, Long.MAX_VALUE ); populateFromQueue( 0, Long.MAX_VALUE );
if ( populations.contains( IndexPopulation.this ) ) if ( populations.contains( IndexPopulation.this ) )
{ {
Expand Down
Expand Up @@ -19,13 +19,20 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.index.internal.gbptree.Writer; import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.io.ByteUnit; import org.neo4j.io.ByteUnit;
import org.neo4j.io.IOUtils; import org.neo4j.io.IOUtils;
Expand All @@ -44,35 +51,53 @@
import org.neo4j.util.Preconditions; import org.neo4j.util.Preconditions;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


import static org.neo4j.helpers.collection.Iterables.asList;
import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex;


public class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> extends NativeIndexPopulator<KEY,VALUE> public abstract class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> extends NativeIndexPopulator<KEY,VALUE>
{ {
private static final String BLOCK_SIZE = FeatureToggles.getString( BlockBasedIndexPopulator.class, "blockSize", "1M" ); private static final String BLOCK_SIZE = FeatureToggles.getString( BlockBasedIndexPopulator.class, "blockSize", "1M" );
private static final int MERGE_FACTOR = FeatureToggles.getInteger( BlockBasedIndexPopulator.class, "mergeFactor", 8 );


// TODO some better ByteBuffers, right? // TODO some better ByteBuffers, right?
private static final ByteBufferFactory BYTE_BUFFER_FACTORY = ByteBuffer::allocate; private static final ByteBufferFactory BYTE_BUFFER_FACTORY = ByteBuffer::allocate;


private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings;
private final IndexDirectoryStructure directoryStructure; private final IndexDirectoryStructure directoryStructure;
private final SpaceFillingCurveConfiguration configuration;
private final boolean archiveFailedIndex; private final boolean archiveFailedIndex;
private final int blockSize; private final int blockSize;
private BlockStorage<KEY,VALUE> scanUpdates; private ThreadLocal<BlockStorage<KEY,VALUE>> scanUpdates;
private List<BlockStorage<KEY,VALUE>> allScanUpdates = new ArrayList<>();
private IndexUpdateStorage<KEY,VALUE> externalUpdates; private IndexUpdateStorage<KEY,VALUE> externalUpdates;
private boolean merged; private boolean merged;


BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor, BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor,
StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex ) IndexDirectoryStructure directoryStructure, boolean archiveFailedIndex )
{ {
super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) ); super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) );
this.spatialSettings = spatialSettings;
this.directoryStructure = directoryStructure; this.directoryStructure = directoryStructure;
this.configuration = configuration;
this.archiveFailedIndex = archiveFailedIndex; this.archiveFailedIndex = archiveFailedIndex;
this.blockSize = parseBlockSize(); this.blockSize = parseBlockSize();
this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage );
}

private synchronized BlockStorage<KEY,VALUE> newThreadLocalBlockStorage()
{
Preconditions.checkState( !merged, "Already merged" );
try
{
int id = allScanUpdates.size();
BlockStorage<KEY,VALUE> blockStorage =
new BlockStorage<>( layout, BYTE_BUFFER_FACTORY, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ),
BlockStorage.Monitor.NO_MONITOR, blockSize );
allScanUpdates.add( blockStorage );
return blockStorage;
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} }


private static int parseBlockSize() private static int parseBlockSize()
Expand All @@ -96,8 +121,6 @@ public void create()
super.create(); super.create();
try try
{ {
scanUpdates = new BlockStorage<>( layout, BYTE_BUFFER_FACTORY, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".temp" ),
BlockStorage.Monitor.NO_MONITOR, blockSize );
externalUpdates = new IndexUpdateStorage<>( layout, fileSystem, new File( storeFile.getParent(), storeFile.getName() + ".ext" ), externalUpdates = new IndexUpdateStorage<>( layout, fileSystem, new File( storeFile.getParent(), storeFile.getName() + ".ext" ),
BYTE_BUFFER_FACTORY.newBuffer( blockSize ) ); BYTE_BUFFER_FACTORY.newBuffer( blockSize ) );
} }
Expand All @@ -110,9 +133,10 @@ public void create()
@Override @Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates ) public void add( Collection<? extends IndexEntryUpdate<?>> updates )
{ {
BlockStorage<KEY,VALUE> blockStorage = scanUpdates.get();
for ( IndexEntryUpdate<?> update : updates ) for ( IndexEntryUpdate<?> update : updates )
{ {
storeUpdate( update, scanUpdates ); storeUpdate( update, blockStorage );
} }
} }


Expand Down Expand Up @@ -142,8 +166,29 @@ public void scanCompleted() throws IndexEntryConflictException
{ {
try try
{ {
scanUpdates.doneAdding(); ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() );
scanUpdates.merge(); List<Future<?>> mergeFutures = new ArrayList<>();
for ( BlockStorage<KEY,VALUE> scanUpdates : allScanUpdates )
{
mergeFutures.add( executorService.submit( () ->
{
scanUpdates.doneAdding();
scanUpdates.merge( MERGE_FACTOR );
return null;
} ) );
}
executorService.shutdown();
while ( !executorService.awaitTermination( 1, TimeUnit.SECONDS ) )
{
// just wait longer
// TODO check drop/close
}
// Let potential exceptions in the merge threads have a chance to propagate
for ( Future<?> mergeFuture : mergeFutures )
{
mergeFuture.get();
}

externalUpdates.doneAdding(); externalUpdates.doneAdding();
// don't merge and sort the external updates // don't merge and sort the external updates


Expand All @@ -158,6 +203,21 @@ public void scanCompleted() throws IndexEntryConflictException
{ {
throw new UncheckedIOException( e ); throw new UncheckedIOException( e );
} }
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new RuntimeException( "Got interrupted, so merge not completed", e );
}
catch ( ExecutionException e )
{
// Propagating merge exception from other thread
Throwable executionException = e.getCause();
if ( executionException instanceof RuntimeException )
{
throw (RuntimeException) executionException;
}
throw new RuntimeException( executionException );
}
} }


private void writeExternalUpdatesToTree() throws IOException private void writeExternalUpdatesToTree() throws IOException
Expand Down Expand Up @@ -189,33 +249,44 @@ private void writeExternalUpdatesToTree() throws IOException
private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictException private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictException
{ {
ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector(); ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector();
try ( Writer<KEY,VALUE> writer = tree.writer(); MergingBlockEntryReader<KEY,VALUE> allEntries = new MergingBlockEntryReader<>( layout );
BlockReader<KEY,VALUE> reader = scanUpdates.reader() ) for ( BlockStorage<KEY,VALUE> scanUpdates : allScanUpdates )
{ {
BlockEntryReader<KEY,VALUE> maybeBlock = reader.nextBlock(); try ( BlockReader<KEY,VALUE> reader = scanUpdates.reader() )
if ( maybeBlock != null )
{ {
try ( BlockEntryReader<KEY,VALUE> block = maybeBlock ) BlockEntryReader<KEY,VALUE> singleMergedBlock = reader.nextBlock();
if ( singleMergedBlock != null )
{ {
while ( block.next() ) allEntries.addSource( singleMergedBlock );
if ( reader.nextBlock() != null )
{ {
conflictDetector.controlConflictDetection( block.key() ); throw new IllegalStateException( "Final BlockStorage had multiple blocks" );
writer.merge( block.key(), block.value(), conflictDetector );
if ( conflictDetector.wasConflicting() )
{
conflictDetector.reportConflict( block.key().asValues() );
}
} }
} }
} }
} }

try ( Writer<KEY,VALUE> writer = tree.writer();
BlockEntryCursor<KEY,VALUE> reader = allEntries )
{
while ( reader.next() )
{
conflictDetector.controlConflictDetection( reader.key() );
writer.merge( reader.key(), reader.value(), conflictDetector );
if ( conflictDetector.wasConflicting() )
{
conflictDetector.reportConflict( reader.key().asValues() );
}
}
}
} }


@Override @Override
public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor ) public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor )
{ {
if ( merged ) if ( merged )
{ {
// Will need the reader from newReader, which a sub-class of this class implements
return super.newPopulatingUpdater( accessor ); return super.newPopulatingUpdater( accessor );
} }


Expand All @@ -241,17 +312,13 @@ public void close()
}; };
} }


@Override
NativeIndexReader<KEY,VALUE> newReader()
{
throw new UnsupportedOperationException( "Should not be needed because we're overriding the populating updater anyway" );
}

@Override @Override
public void close( boolean populationCompletedSuccessfully ) public void close( boolean populationCompletedSuccessfully )
{ {
// TODO Make responsive // TODO Make responsive
IOUtils.closeAllSilently( externalUpdates, scanUpdates ); List<Closeable> toClose = new ArrayList<>( asList( allScanUpdates ) );
toClose.add( externalUpdates );
IOUtils.closeAllUnchecked( toClose );
super.close( populationCompletedSuccessfully ); super.close( populationCompletedSuccessfully );
} }
} }
Expand Up @@ -119,19 +119,19 @@ private void flushAndResetBuffer() throws IOException
resetBufferedEntries(); resetBufferedEntries();
} }


public void merge() throws IOException public void merge( int mergeFactor ) throws IOException
{ {
File sourceFile = blockFile; File sourceFile = blockFile;
File tempFile = new File( blockFile.getParent(), blockFile.getName() + ".b" ); File tempFile = new File( blockFile.getParent(), blockFile.getName() + ".b" );
try try
{ {
int mergeFactor = 16;
File targetFile = tempFile; File targetFile = tempFile;
while ( numberOfBlocksInCurrentFile > 1 ) while ( numberOfBlocksInCurrentFile > 1 )
{ {
// Perform one complete merge iteration, merging all blocks from source into target. // Perform one complete merge iteration, merging all blocks from source into target.
// After this step, target will contain fewer blocks than source, but may need another merge iteration. // After this step, target will contain fewer blocks than source, but may need another merge iteration.
try ( BlockReader<KEY,VALUE> reader = reader( sourceFile ); StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) ) try ( BlockReader<KEY,VALUE> reader = reader( sourceFile );
StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) )
{ {
int blocksMergedSoFar = 0; int blocksMergedSoFar = 0;
int blocksInMergedFile = 0; int blocksInMergedFile = 0;
Expand Down
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.File;

import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexDirectoryStructure;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;

public class GenericBlockBasedIndexPopulator extends BlockBasedIndexPopulator<GenericKey,NativeIndexValue>
{
private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings;
private final SpaceFillingCurveConfiguration configuration;

GenericBlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<GenericKey,NativeIndexValue> layout,
IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex )
{
super( pageCache, fs, file, layout, monitor, descriptor, spatialSettings, directoryStructure, archiveFailedIndex );
this.spatialSettings = spatialSettings;
this.configuration = configuration;
}

@Override
NativeIndexReader<GenericKey,NativeIndexValue> newReader()
{
return new GenericNativeIndexReader( tree, layout, descriptor, spatialSettings, configuration );
}
}
Expand Up @@ -163,10 +163,8 @@ protected IndexPopulator newIndexPopulator( File storeFile, GenericLayout layout
} }
else else
{ {
NativeIndexPopulator actualPopulator = return new GenericBlockBasedIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(),
new BlockBasedIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(), directoryStructure(), configuration, archiveFailedIndex );
directoryStructure(), configuration, archiveFailedIndex );
return new WorkSyncedNativeIndexPopulator<>( actualPopulator );
} }
} }


Expand Down
Expand Up @@ -165,7 +165,7 @@ void shouldMergeWhenEmpty() throws IOException
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) ) try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) )
{ {
// when // when
storage.merge(); storage.merge( randomMergeFactor() );


// then // then
assertEquals( 0, monitor.mergeIterationCallCount ); assertEquals( 0, monitor.mergeIterationCallCount );
Expand All @@ -185,7 +185,7 @@ void shouldMergeSingleBlock() throws IOException
storage.doneAdding(); storage.doneAdding();


// when // when
storage.merge(); storage.merge( randomMergeFactor() );


// then // then
assertEquals( 0, monitor.mergeIterationCallCount ); assertEquals( 0, monitor.mergeIterationCallCount );
Expand All @@ -206,7 +206,7 @@ void shouldMergeMultipleBlocks() throws IOException
storage.doneAdding(); storage.doneAdding();


// when // when
storage.merge(); storage.merge( randomMergeFactor() );


// then // then
assertContents( layout, storage, asOneBigBlock( expectedBlocks ) ); assertContents( layout, storage, asOneBigBlock( expectedBlocks ) );
Expand Down Expand Up @@ -274,6 +274,11 @@ private Iterable<List<BlockEntry<MutableLong,MutableLong>>> asOneBigBlock( List<
return singletonList( all ); return singletonList( all );
} }


private int randomMergeFactor()
{
return random.nextInt( 2, 8 );
}

private void print( List<List<BlockEntry<MutableLong,MutableLong>>> expectedBlocks ) private void print( List<List<BlockEntry<MutableLong,MutableLong>>> expectedBlocks )
{ {
for ( List<BlockEntry<MutableLong,MutableLong>> expectedBlock : expectedBlocks ) for ( List<BlockEntry<MutableLong,MutableLong>> expectedBlock : expectedBlocks )
Expand Down

0 comments on commit 6d305b0

Please sign in to comment.