Skip to content

Commit

Permalink
Parallel native index population
Browse files Browse the repository at this point in the history
Each thread adding batches of data from the index populator will add
into its own thread-local index part and in the end all those parts
will be merged into one complete index. The final merging is much
faster than the part-building due to all sources being sorted.
Due to the single-threaded merging in the end performance of this
parallel population will hit a plateau at some core count,
but scales reasonably well with current mainstream hardware.
  • Loading branch information
tinwelint committed Oct 10, 2018
1 parent 810f721 commit 022dbbf
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 120 deletions.
Expand Up @@ -26,9 +26,9 @@
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.OperationalMode; import org.neo4j.kernel.impl.factory.OperationalMode;
import org.neo4j.kernel.impl.index.schema.ConsistencyCheckableIndexPopulator;
import org.neo4j.kernel.impl.index.schema.GenericNativeIndexProviderFactory; import org.neo4j.kernel.impl.index.schema.GenericNativeIndexProviderFactory;
import org.neo4j.kernel.impl.index.schema.NativeIndexAccessor; import org.neo4j.kernel.impl.index.schema.NativeIndexAccessor;
import org.neo4j.kernel.impl.index.schema.NativeIndexPopulator;


import static org.neo4j.graphdb.factory.GraphDatabaseSettings.SchemaIndex.NATIVE_BTREE10; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.SchemaIndex.NATIVE_BTREE10;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.default_schema_provider; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.default_schema_provider;
Expand Down Expand Up @@ -73,6 +73,6 @@ public void consistencyCheck( IndexAccessor accessor )
@Override @Override
public void consistencyCheck( IndexPopulator populator ) public void consistencyCheck( IndexPopulator populator )
{ {
((NativeIndexPopulator) populator).consistencyCheck(); ((ConsistencyCheckableIndexPopulator) populator).consistencyCheck();
} }
} }
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.util.ArrayList;
import java.util.Collection;

import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;

public abstract class CollectingIndexUpdater<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> implements IndexUpdater
{
private boolean closed;
private final Collection<IndexEntryUpdate<?>> updates = new ArrayList<>();

@Override
public void process( IndexEntryUpdate<?> update )
{
assertOpen();
updates.add( update );
}

@Override
public void close() throws IndexEntryConflictException
{
assertOpen();
apply( updates );
closed = true;
}

protected abstract void apply( Collection<IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException;

private void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( "Updater has been closed" );
}
}
}
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

public interface ConsistencyCheckableIndexPopulator
{
void consistencyCheck();
}
Expand Up @@ -30,7 +30,6 @@
import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache; import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache;
import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter; import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;


import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex;
Expand All @@ -41,16 +40,18 @@ class GenericNativeIndexPopulator extends NativeIndexPopulator<GenericKey,Native
private final IndexDirectoryStructure directoryStructure; private final IndexDirectoryStructure directoryStructure;
private final SpaceFillingCurveConfiguration configuration; private final SpaceFillingCurveConfiguration configuration;
private final boolean archiveFailedIndex; private final boolean archiveFailedIndex;
private final boolean temporary;


GenericNativeIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, IndexLayout<GenericKey,NativeIndexValue> layout, GenericNativeIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, IndexLayout<GenericKey,NativeIndexValue> layout,
IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex ) IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex, boolean temporary )
{ {
super( pageCache, fs, storeFile, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) ); super( pageCache, fs, storeFile, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) );
this.spatialSettings = spatialSettings; this.spatialSettings = spatialSettings;
this.directoryStructure = directoryStructure; this.directoryStructure = directoryStructure;
this.configuration = configuration; this.configuration = configuration;
this.archiveFailedIndex = archiveFailedIndex; this.archiveFailedIndex = archiveFailedIndex;
this.temporary = temporary;
} }


@Override @Override
Expand All @@ -60,7 +61,10 @@ public void create()
{ {
// Archive and delete the index, if it exists. The reason why this isn't done in the generic implementation is that for all other cases a // Archive and delete the index, if it exists. The reason why this isn't done in the generic implementation is that for all other cases a
// native index populator lives under a fusion umbrella and the archive function sits on the top-level fusion folder, not every single sub-folder. // native index populator lives under a fusion umbrella and the archive function sits on the top-level fusion folder, not every single sub-folder.
deleteIndex( fileSystem, directoryStructure, descriptor.getId(), archiveFailedIndex ); if ( !temporary )
{
deleteIndex( fileSystem, directoryStructure, descriptor.getId(), archiveFailedIndex );
}


// Now move on to do the actual creation. // Now move on to do the actual creation.
super.create(); super.create();
Expand All @@ -72,7 +76,7 @@ public void create()
} }


@Override @Override
IndexReader newReader() NativeIndexReader<GenericKey,NativeIndexValue> newReader()
{ {
return new GenericNativeIndexReader( tree, layout, descriptor, spatialSettings, configuration ); return new GenericNativeIndexReader( tree, layout, descriptor, spatialSettings, configuration );
} }
Expand Down
Expand Up @@ -120,8 +120,8 @@ public class GenericNativeIndexProvider extends NativeIndexProvider<GenericKey,N
private final SpaceFillingCurveConfiguration configuration; private final SpaceFillingCurveConfiguration configuration;
private final boolean archiveFailedIndex; private final boolean archiveFailedIndex;


public GenericNativeIndexProvider( IndexDirectoryStructure.Factory directoryStructureFactory, PageCache pageCache, GenericNativeIndexProvider( IndexDirectoryStructure.Factory directoryStructureFactory, PageCache pageCache, FileSystemAbstraction fs, Monitor monitor,
FileSystemAbstraction fs, Monitor monitor, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, boolean readOnly, Config config ) RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, boolean readOnly, Config config )
{ {
super( DESCRIPTOR, directoryStructureFactory, pageCache, fs, monitor, recoveryCleanupWorkCollector, readOnly ); super( DESCRIPTOR, directoryStructureFactory, pageCache, fs, monitor, recoveryCleanupWorkCollector, readOnly );


Expand Down Expand Up @@ -153,8 +153,9 @@ GenericLayout layout( StoreIndexDescriptor descriptor, File storeFile )
@Override @Override
protected IndexPopulator newIndexPopulator( File storeFile, GenericLayout layout, StoreIndexDescriptor descriptor ) protected IndexPopulator newIndexPopulator( File storeFile, GenericLayout layout, StoreIndexDescriptor descriptor )
{ {
return new GenericNativeIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, return new ParallelNativeIndexPopulator<>( storeFile, layout, file ->
layout.getSpaceFillingCurveSettings(), directoryStructure(), configuration, archiveFailedIndex ); new GenericNativeIndexPopulator( pageCache, fs, file, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(),
directoryStructure(), configuration, archiveFailedIndex, !file.equals( storeFile ) ) );
} }


@Override @Override
Expand Down
Expand Up @@ -26,9 +26,7 @@
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer; import java.util.function.Consumer;


import org.neo4j.index.internal.gbptree.GBPTree; import org.neo4j.index.internal.gbptree.GBPTree;
Expand All @@ -45,11 +43,8 @@
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.api.index.NodePropertyAccessor;
import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler; import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.util.concurrent.Work;
import org.neo4j.util.concurrent.WorkSync;


import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.storageengine.api.schema.IndexDescriptor.Type.GENERAL; import static org.neo4j.storageengine.api.schema.IndexDescriptor.Type.GENERAL;
Expand All @@ -62,7 +57,7 @@
* @param <VALUE> type of {@link NativeIndexValue}. * @param <VALUE> type of {@link NativeIndexValue}.
*/ */
public abstract class NativeIndexPopulator<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> public abstract class NativeIndexPopulator<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
extends NativeIndex<KEY,VALUE> implements IndexPopulator extends NativeIndex<KEY,VALUE> implements IndexPopulator, ConsistencyCheckableIndexPopulator
{ {
public static final byte BYTE_FAILED = 0; public static final byte BYTE_FAILED = 0;
static final byte BYTE_ONLINE = 1; static final byte BYTE_ONLINE = 1;
Expand All @@ -73,8 +68,8 @@ public abstract class NativeIndexPopulator<KEY extends NativeIndexKey<KEY>, VALU
private final UniqueIndexSampler uniqueSampler; private final UniqueIndexSampler uniqueSampler;
private final Consumer<PageCursor> additionalHeaderWriter; private final Consumer<PageCursor> additionalHeaderWriter;


private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> additionsWorkSync; private ConflictDetectingValueMerger<KEY,VALUE> mainConflictDetector;
private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> updatesWorkSync; private ConflictDetectingValueMerger<KEY,VALUE> updatesConflictDetector;


private byte[] failureBytes; private byte[] failureBytes;
private boolean dropped; private boolean dropped;
Expand Down Expand Up @@ -121,12 +116,10 @@ protected synchronized void create( Consumer<PageCursor> headerWriter )


// true: tree uniqueness is (value,entityId) // true: tree uniqueness is (value,entityId)
// false: tree uniqueness is (value) <-- i.e. more strict // false: tree uniqueness is (value) <-- i.e. more strict
boolean compareIds = descriptor.type() == GENERAL; mainConflictDetector = new ConflictDetectingValueMerger<>( descriptor.type() == GENERAL );
additionsWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( compareIds ) ) );

// for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates. // for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates.
// there are added conflict checks after updates have been applied. // there are added conflict checks after updates have been applied.
updatesWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( true ) ) ); updatesConflictDetector = new ConflictDetectingValueMerger<>( true );
} }


@Override @Override
Expand All @@ -147,7 +140,7 @@ public synchronized void drop()
@Override @Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException
{ {
applyWithWorkSync( additionsWorkSync, updates ); processUpdates( updates, mainConflictDetector );
} }


@Override @Override
Expand All @@ -159,31 +152,12 @@ public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor
@Override @Override
public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor ) public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor )
{ {
IndexUpdater updater = new IndexUpdater() IndexUpdater updater = new CollectingIndexUpdater<KEY,VALUE>()
{ {
private boolean closed;
private final Collection<IndexEntryUpdate<?>> updates = new ArrayList<>();

@Override
public void process( IndexEntryUpdate<?> update )
{
assertOpen();
updates.add( update );
}

@Override @Override
public void close() throws IndexEntryConflictException protected void apply( Collection<IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException
{ {
applyWithWorkSync( updatesWorkSync, updates ); processUpdates( updates, updatesConflictDetector );
closed = true;
}

private void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( "Updater has been closed" );
}
} }
}; };


Expand All @@ -196,7 +170,7 @@ private void assertOpen()
return updater; return updater;
} }


abstract IndexReader newReader(); abstract NativeIndexReader<KEY,VALUE> newReader();


@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) public synchronized void close( boolean populationCompletedSuccessfully )
Expand Down Expand Up @@ -227,28 +201,6 @@ public synchronized void close( boolean populationCompletedSuccessfully )
} }
} }


private void applyWithWorkSync( WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> workSync,
Collection<? extends IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException
{
try
{
workSync.apply( new IndexUpdateWork<>( updates ) );
}
catch ( ExecutionException e )
{
Throwable cause = e.getCause();
if ( cause instanceof IOException )
{
throw new UncheckedIOException( (IOException) cause );
}
if ( cause instanceof IndexEntryConflictException )
{
throw (IndexEntryConflictException) cause;
}
throw new RuntimeException( cause );
}
}

private void assertNotDropped() private void assertNotDropped()
{ {
if ( dropped ) if ( dropped )
Expand Down Expand Up @@ -301,55 +253,19 @@ void markTreeAsOnline()
tree.checkpoint( IOLimiter.UNLIMITED, new NativeIndexHeaderWriter( BYTE_ONLINE, additionalHeaderWriter ) ); tree.checkpoint( IOLimiter.UNLIMITED, new NativeIndexHeaderWriter( BYTE_ONLINE, additionalHeaderWriter ) );
} }


static class IndexUpdateApply<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> private void processUpdates( Iterable<? extends IndexEntryUpdate<?>> indexEntryUpdates, ConflictDetectingValueMerger<KEY,VALUE> conflictDetector )
throws IndexEntryConflictException
{ {
private final GBPTree<KEY,VALUE> tree; try ( Writer<KEY,VALUE> writer = tree.writer() )
private final KEY treeKey;
private final VALUE treeValue;
private final ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger;

IndexUpdateApply( GBPTree<KEY,VALUE> tree, KEY treeKey, VALUE treeValue, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
{ {
this.tree = tree; for ( IndexEntryUpdate<?> indexEntryUpdate : indexEntryUpdates )
this.treeKey = treeKey;
this.treeValue = treeValue;
this.conflictDetectingValueMerger = conflictDetectingValueMerger;
}

void process( Iterable<? extends IndexEntryUpdate<?>> indexEntryUpdates ) throws Exception
{
try ( Writer<KEY,VALUE> writer = tree.writer() )
{ {
for ( IndexEntryUpdate<?> indexEntryUpdate : indexEntryUpdates ) NativeIndexUpdater.processUpdate( treeKey, treeValue, indexEntryUpdate, writer, conflictDetector );
{
NativeIndexUpdater.processUpdate( treeKey, treeValue, indexEntryUpdate, writer, conflictDetectingValueMerger );
}
} }
} }
} catch ( IOException e )

static class IndexUpdateWork<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
implements Work<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>>
{
private final Collection<? extends IndexEntryUpdate<?>> updates;

IndexUpdateWork( Collection<? extends IndexEntryUpdate<?>> updates )
{
this.updates = updates;
}

@Override
public IndexUpdateWork<KEY,VALUE> combine( IndexUpdateWork<KEY,VALUE> work )
{
ArrayList<IndexEntryUpdate<?>> combined = new ArrayList<>( updates );
combined.addAll( work.updates );
return new IndexUpdateWork<>( combined );
}

@Override
public void apply( IndexUpdateApply<KEY,VALUE> indexUpdateApply ) throws Exception
{ {
indexUpdateApply.process( updates ); throw new UncheckedIOException( e );
} }
} }


Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;


import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
Expand All @@ -38,7 +37,7 @@ class NumberIndexPopulator extends NativeIndexPopulator<NumberIndexKey,NativeInd
} }


@Override @Override
IndexReader newReader() NativeIndexReader<NumberIndexKey, NativeIndexValue> newReader()
{ {
return new NumberIndexReader<>( tree, layout, descriptor ); return new NumberIndexReader<>( tree, layout, descriptor );
} }
Expand Down

0 comments on commit 022dbbf

Please sign in to comment.