diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/api/index/GenericIndexProviderCompatibilitySuiteTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/api/index/GenericIndexProviderCompatibilitySuiteTest.java index 0bca82bca9964..e08a16662abb0 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/api/index/GenericIndexProviderCompatibilitySuiteTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/api/index/GenericIndexProviderCompatibilitySuiteTest.java @@ -26,9 +26,9 @@ import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.OperationalMode; +import org.neo4j.kernel.impl.index.schema.ConsistencyCheckableIndexPopulator; import org.neo4j.kernel.impl.index.schema.GenericNativeIndexProviderFactory; import org.neo4j.kernel.impl.index.schema.NativeIndexAccessor; -import org.neo4j.kernel.impl.index.schema.NativeIndexPopulator; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.SchemaIndex.NATIVE_BTREE10; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.default_schema_provider; @@ -73,6 +73,6 @@ public void consistencyCheck( IndexAccessor accessor ) @Override public void consistencyCheck( IndexPopulator populator ) { - ((NativeIndexPopulator) populator).consistencyCheck(); + ((ConsistencyCheckableIndexPopulator) populator).consistencyCheck(); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CollectingIndexUpdater.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CollectingIndexUpdater.java new file mode 100644 index 0000000000000..132aff70e39d3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CollectingIndexUpdater.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.util.ArrayList; +import java.util.Collection; + +import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; +import org.neo4j.kernel.api.index.IndexEntryUpdate; +import org.neo4j.kernel.api.index.IndexUpdater; + +public abstract class CollectingIndexUpdater,VALUE extends NativeIndexValue> implements IndexUpdater +{ + private boolean closed; + private final Collection> updates = new ArrayList<>(); + + @Override + public void process( IndexEntryUpdate update ) + { + assertOpen(); + updates.add( update ); + } + + @Override + public void close() throws IndexEntryConflictException + { + assertOpen(); + apply( updates ); + closed = true; + } + + protected abstract void apply( Collection> updates ) throws IndexEntryConflictException; + + private void assertOpen() + { + if ( closed ) + { + throw new IllegalStateException( "Updater has been closed" ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ConsistencyCheckableIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ConsistencyCheckableIndexPopulator.java new file mode 100644 index 0000000000000..079adbbaf0b04 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ConsistencyCheckableIndexPopulator.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +public interface ConsistencyCheckableIndexPopulator +{ + void consistencyCheck(); +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexPopulator.java index c32b183b75f09..ba4d0269c1f05 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexPopulator.java @@ -30,7 +30,6 @@ import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache; import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; @@ -41,16 +40,18 @@ class GenericNativeIndexPopulator extends NativeIndexPopulator layout, IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, - IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex ) + IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex, boolean temporary ) { super( pageCache, fs, storeFile, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) ); this.spatialSettings = spatialSettings; this.directoryStructure = directoryStructure; this.configuration = configuration; this.archiveFailedIndex = archiveFailedIndex; + this.temporary = temporary; } @Override @@ -60,7 +61,10 @@ public void create() { // Archive and delete the index, if it exists. The reason why this isn't done in the generic implementation is that for all other cases a // native index populator lives under a fusion umbrella and the archive function sits on the top-level fusion folder, not every single sub-folder. - deleteIndex( fileSystem, directoryStructure, descriptor.getId(), archiveFailedIndex ); + if ( !temporary ) + { + deleteIndex( fileSystem, directoryStructure, descriptor.getId(), archiveFailedIndex ); + } // Now move on to do the actual creation. super.create(); @@ -72,7 +76,7 @@ public void create() } @Override - IndexReader newReader() + NativeIndexReader newReader() { return new GenericNativeIndexReader( tree, layout, descriptor, spatialSettings, configuration ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java index a8dd355c32e60..9269dc7756486 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java @@ -120,8 +120,8 @@ public class GenericNativeIndexProvider extends NativeIndexProvider( storeFile, layout, file -> + new GenericNativeIndexPopulator( pageCache, fs, file, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(), + directoryStructure(), configuration, archiveFailedIndex, !file.equals( storeFile ) ) ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java index 5b23dbad21d03..cd7ec6219b88f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulator.java @@ -26,9 +26,7 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.neo4j.index.internal.gbptree.GBPTree; @@ -45,11 +43,8 @@ import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; -import org.neo4j.util.concurrent.Work; -import org.neo4j.util.concurrent.WorkSync; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; import static org.neo4j.storageengine.api.schema.IndexDescriptor.Type.GENERAL; @@ -62,7 +57,7 @@ * @param type of {@link NativeIndexValue}. */ public abstract class NativeIndexPopulator, VALUE extends NativeIndexValue> - extends NativeIndex implements IndexPopulator + extends NativeIndex implements IndexPopulator, ConsistencyCheckableIndexPopulator { public static final byte BYTE_FAILED = 0; static final byte BYTE_ONLINE = 1; @@ -73,8 +68,8 @@ public abstract class NativeIndexPopulator, VALU private final UniqueIndexSampler uniqueSampler; private final Consumer additionalHeaderWriter; - private WorkSync,IndexUpdateWork> additionsWorkSync; - private WorkSync,IndexUpdateWork> updatesWorkSync; + private ConflictDetectingValueMerger mainConflictDetector; + private ConflictDetectingValueMerger updatesConflictDetector; private byte[] failureBytes; private boolean dropped; @@ -121,12 +116,10 @@ protected synchronized void create( Consumer headerWriter ) // true: tree uniqueness is (value,entityId) // false: tree uniqueness is (value) <-- i.e. more strict - boolean compareIds = descriptor.type() == GENERAL; - additionsWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( compareIds ) ) ); - + mainConflictDetector = new ConflictDetectingValueMerger<>( descriptor.type() == GENERAL ); // for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates. // there are added conflict checks after updates have been applied. - updatesWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( true ) ) ); + updatesConflictDetector = new ConflictDetectingValueMerger<>( true ); } @Override @@ -147,7 +140,7 @@ public synchronized void drop() @Override public void add( Collection> updates ) throws IndexEntryConflictException { - applyWithWorkSync( additionsWorkSync, updates ); + processUpdates( updates, mainConflictDetector ); } @Override @@ -159,31 +152,12 @@ public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor @Override public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor ) { - IndexUpdater updater = new IndexUpdater() + IndexUpdater updater = new CollectingIndexUpdater() { - private boolean closed; - private final Collection> updates = new ArrayList<>(); - - @Override - public void process( IndexEntryUpdate update ) - { - assertOpen(); - updates.add( update ); - } - @Override - public void close() throws IndexEntryConflictException + protected void apply( Collection> updates ) throws IndexEntryConflictException { - applyWithWorkSync( updatesWorkSync, updates ); - closed = true; - } - - private void assertOpen() - { - if ( closed ) - { - throw new IllegalStateException( "Updater has been closed" ); - } + processUpdates( updates, updatesConflictDetector ); } }; @@ -196,7 +170,7 @@ private void assertOpen() return updater; } - abstract IndexReader newReader(); + abstract NativeIndexReader newReader(); @Override public synchronized void close( boolean populationCompletedSuccessfully ) @@ -227,28 +201,6 @@ public synchronized void close( boolean populationCompletedSuccessfully ) } } - private void applyWithWorkSync( WorkSync,IndexUpdateWork> workSync, - Collection> updates ) throws IndexEntryConflictException - { - try - { - workSync.apply( new IndexUpdateWork<>( updates ) ); - } - catch ( ExecutionException e ) - { - Throwable cause = e.getCause(); - if ( cause instanceof IOException ) - { - throw new UncheckedIOException( (IOException) cause ); - } - if ( cause instanceof IndexEntryConflictException ) - { - throw (IndexEntryConflictException) cause; - } - throw new RuntimeException( cause ); - } - } - private void assertNotDropped() { if ( dropped ) @@ -301,55 +253,19 @@ void markTreeAsOnline() tree.checkpoint( IOLimiter.UNLIMITED, new NativeIndexHeaderWriter( BYTE_ONLINE, additionalHeaderWriter ) ); } - static class IndexUpdateApply, VALUE extends NativeIndexValue> + private void processUpdates( Iterable> indexEntryUpdates, ConflictDetectingValueMerger conflictDetector ) + throws IndexEntryConflictException { - private final GBPTree tree; - private final KEY treeKey; - private final VALUE treeValue; - private final ConflictDetectingValueMerger conflictDetectingValueMerger; - - IndexUpdateApply( GBPTree tree, KEY treeKey, VALUE treeValue, ConflictDetectingValueMerger conflictDetectingValueMerger ) + try ( Writer writer = tree.writer() ) { - this.tree = tree; - this.treeKey = treeKey; - this.treeValue = treeValue; - this.conflictDetectingValueMerger = conflictDetectingValueMerger; - } - - void process( Iterable> indexEntryUpdates ) throws Exception - { - try ( Writer writer = tree.writer() ) + for ( IndexEntryUpdate indexEntryUpdate : indexEntryUpdates ) { - for ( IndexEntryUpdate indexEntryUpdate : indexEntryUpdates ) - { - NativeIndexUpdater.processUpdate( treeKey, treeValue, indexEntryUpdate, writer, conflictDetectingValueMerger ); - } + NativeIndexUpdater.processUpdate( treeKey, treeValue, indexEntryUpdate, writer, conflictDetector ); } } - } - - static class IndexUpdateWork, VALUE extends NativeIndexValue> - implements Work,IndexUpdateWork> - { - private final Collection> updates; - - IndexUpdateWork( Collection> updates ) - { - this.updates = updates; - } - - @Override - public IndexUpdateWork combine( IndexUpdateWork work ) - { - ArrayList> combined = new ArrayList<>( updates ); - combined.addAll( work.updates ); - return new IndexUpdateWork<>( combined ); - } - - @Override - public void apply( IndexUpdateApply indexUpdateApply ) throws Exception + catch ( IOException e ) { - indexUpdateApply.process( updates ); + throw new UncheckedIOException( e ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NumberIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NumberIndexPopulator.java index 96a97ea160aa9..e811c00ae6433 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NumberIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/NumberIndexPopulator.java @@ -24,7 +24,6 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.api.index.IndexProvider; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; @@ -38,7 +37,7 @@ class NumberIndexPopulator extends NativeIndexPopulator newReader() { return new NumberIndexReader<>( tree, layout, descriptor ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java new file mode 100644 index 0000000000000..69a4314267ac9 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; + +import org.neo4j.cursor.RawCursor; +import org.neo4j.index.internal.gbptree.Hit; +import org.neo4j.index.internal.gbptree.Writer; +import org.neo4j.internal.kernel.api.IndexOrder; +import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; +import org.neo4j.kernel.api.index.IndexEntryUpdate; +import org.neo4j.kernel.api.index.IndexPopulator; +import org.neo4j.kernel.api.index.IndexUpdater; +import org.neo4j.kernel.api.index.NodePropertyAccessor; +import org.neo4j.storageengine.api.schema.IndexSample; + +/** + * Takes a somewhat high-level approach to parallelizing index population. It could be done lower level and more efficiently, + * but this may be good enough. Basically since multiple threads comes in and call add w/ batches of updates, each thread + * builds its own tree and in the end a single thread merges all trees, which will be reasonably fast since everything is sorted, + * into one complete tree. + * + * @param keys in the tree backing this index + * @param values the tree backing this index + */ +class ParallelNativeIndexPopulator,VALUE extends NativeIndexValue> implements IndexPopulator, ConsistencyCheckableIndexPopulator +{ + private final IndexLayout layout; + private final ThreadLocal threadLocalPopulators; + // Just a complete list of all part populators, because we can't ask ThreadLocal to provide this to us. + private final List partPopulators = new CopyOnWriteArrayList<>(); + private NativeIndexPopulator completePopulator; + private String failure; + private volatile NodePropertyAccessor propertyAccessor; + // There are various access points considered to be the "first" after population is completed, + // be it verifyDeferredConstraints, sampleResult or some other call. Regardless all of those methods + // have to be able to merge the parts into the real index. This is what this flag is all about. + private volatile boolean merged; + // First thing in close(boolean) call is to set this flag with the sole purpose of preventing new parts + // from being created beyond that point. + private volatile boolean closed; + + ParallelNativeIndexPopulator( File baseIndexFile, IndexLayout layout, Function> populatorSupplier ) + { + this.layout = layout; + this.threadLocalPopulators = new ThreadLocal() + { + @Override + protected synchronized ThreadLocalPopulator initialValue() + { + if ( closed ) + { + throw new IllegalStateException( "Already closed" ); + } + + File file = new File( baseIndexFile + "-part-" + partPopulators.size() ); + NativeIndexPopulator populator = populatorSupplier.apply( file ); + ThreadLocalPopulator tlPopulator = new ThreadLocalPopulator( populator ); + partPopulators.add( tlPopulator ); + populator.create(); + return tlPopulator; + } + }; + this.completePopulator = populatorSupplier.apply( baseIndexFile ); + } + + @Override + public void create() + { + // Do the "create" logic that the populator normally does, which may include archiving of the existing failed index etc. + completePopulator.create(); + } + + @Override + public void drop() + { + partPopulators.forEach( p -> p.populator.drop() ); + completePopulator.drop(); + } + + @Override + public void add( Collection> scanBatch ) throws IndexEntryConflictException + { + ThreadLocalPopulator tlPopulator = threadLocalPopulators.get(); + + // First check if there are external updates to apply + tlPopulator.applyQueuedUpdates(); + + // Then apply the updates from the scan + tlPopulator.populator.add( scanBatch ); + } + + @Override + public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor ) + { + ensureMerged(); + partPopulators.forEach( p -> p.populator.verifyDeferredConstraints( nodePropertyAccessor ) ); + } + + @Override + public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor ) + { + // Don't have an explicit updatesPopulator, instead record these updates and then each populator will have to apply next time they notice. + propertyAccessor = accessor; + return new CollectingIndexUpdater() + { + @Override + protected void apply( Collection> updates ) + { + // Ensure there's at least one part populator active. This is for a case where an index population is started + // and the only data coming in is from the populating updater. + if ( partPopulators.isEmpty() ) + { + threadLocalPopulators.get(); + } + partPopulators.forEach( p -> p.updates.add( updates ) ); + } + }; + } + + @Override + public void close( boolean populationCompletedSuccessfully ) + { + closed = true; + try + { + if ( populationCompletedSuccessfully ) + { + // We're already merged at this point, so it's only to close the complete tree + ensureMerged(); + completePopulator.close( true ); + } + else + { + // We shouldn't be merged at this point, so clean up all the things + if ( failure != null ) + { + // failure can be null e.g. when dropping an index while it's populating, in that case it's just a close(false) call. + completePopulator.markAsFailed( failure ); + } + completePopulator.close( false ); + } + } + finally + { + partPopulators.forEach( p -> p.populator.drop() ); + } + } + + private void mergeParts() throws IOException, IndexEntryConflictException + { + KEY low = layout.newKey(); + low.initialize( Long.MIN_VALUE ); + low.initValuesAsLowest(); + KEY high = layout.newKey(); + high.initialize( Long.MAX_VALUE ); + high.initValuesAsHighest(); + KEY end = layout.newKey(); + NativeIndexReader[] partReaders = new NativeIndexReader[partPopulators.size()]; + RawCursor,IOException>[] partCursors = new RawCursor[partPopulators.size()]; + Object[] partHeads = new Object[partPopulators.size()]; + int ended = 0; + for ( int i = 0; i < partPopulators.size(); i++ ) + { + ThreadLocalPopulator tlPopulator = partPopulators.get( i ); + // Apply pending updates in this populator thread + tlPopulator.applyQueuedUpdates(); + NativeIndexReader reader = tlPopulator.populator.newReader(); + partReaders[i] = reader; + partCursors[i] = reader.makeIndexSeeker( low, high, IndexOrder.ASCENDING ); + } + + try ( Writer writer = completePopulator.tree.writer() ) + { + // An idea how to parallelize the below loop: + // - Have one thread running ahead, making comparisons and leaving a trail of candidateIndexes behind it. + // - The thread doing the merge gets batches of candidate indexes and picks and writes w/o comparing + + // As long there's stuff left to merge + while ( ended < partCursors.length ) + { + // Pick lowest among all candidates + KEY lowestCandidate = null; + int lowestCandidateIndex = -1; + for ( int i = 0; i < partCursors.length; i++ ) + { + KEY candidate = (KEY) partHeads[i]; + if ( candidate == end ) + { + continue; + } + + if ( candidate == null ) + { + if ( partCursors[i].next() ) + { + partHeads[i] = candidate = partCursors[i].get().key(); + } + else + { + partHeads[i] = end; + ended++; + } + } + if ( candidate != null ) + { + if ( lowestCandidate == null || layout.compare( candidate, lowestCandidate ) < 0 ) + { + lowestCandidate = candidate; + lowestCandidateIndex = i; + } + } + } + + if ( lowestCandidate != null ) + { + // Oh, we have something to insert + writer.put( lowestCandidate, partCursors[lowestCandidateIndex].get().value() ); + partHeads[lowestCandidateIndex] = null; + } + } + } + finally + { + for ( NativeIndexReader partReader : partReaders ) + { + partReader.close(); + } + } + } + + @Override + public void markAsFailed( String failure ) + { + this.failure = failure; + partPopulators.forEach( p -> p.populator.markAsFailed( failure ) ); + } + + @Override + public void includeSample( IndexEntryUpdate update ) + { + } + + @Override + public IndexSample sampleResult() + { + ensureMerged(); + return completePopulator.sampleResult(); + } + + @Override + public void consistencyCheck() + { + ensureMerged(); + completePopulator.consistencyCheck(); + } + + private void ensureMerged() + { + if ( !merged ) + { + merged = true; + try + { + mergeParts(); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + catch ( IndexEntryConflictException e ) + { + throw new IllegalStateException( e ); + } + } + } + + /** + * A thread-local NativeIndexPopulator with a queue of batched external updates. We keep these per thread because we + * don't want the index populator main thread to apply updates to all the parts. + */ + private class ThreadLocalPopulator + { + private final NativeIndexPopulator populator; + // Main populator thread adds and the thread owning this thread-local populator polls + private final Queue>> updates = new ConcurrentLinkedDeque<>(); + + ThreadLocalPopulator( NativeIndexPopulator populator ) + { + this.populator = populator; + } + + void applyQueuedUpdates() throws IndexEntryConflictException + { + if ( !updates.isEmpty() ) + { + try ( IndexUpdater updater = populator.newPopulatingUpdater( propertyAccessor ) ) + { + Collection> batch; + while ( (batch = updates.poll()) != null ) + { + for ( IndexEntryUpdate update : batch ) + { + updater.process( update ); + } + } + } + } + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java index a754e0b4007d3..595c56a2ec408 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java @@ -36,7 +36,6 @@ import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettings; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.values.storable.CoordinateReferenceSystem; @@ -155,7 +154,7 @@ static class PartPopulator extends NativeIndexPopulator newReader() { return new SpatialIndexPartReader<>( tree, layout, descriptor, configuration ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/StringIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/StringIndexPopulator.java index 4ef0f4c42f3ba..c109d11f6759c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/StringIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/StringIndexPopulator.java @@ -24,7 +24,6 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.api.index.IndexProvider; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; @@ -38,7 +37,7 @@ public class StringIndexPopulator extends NativeIndexPopulator newReader() { return new StringIndexReader( tree, layout, descriptor ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java index 462f7b79184e0..33dd87ccb31e7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java @@ -34,7 +34,6 @@ import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; -import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.values.storable.Value; @@ -146,7 +145,7 @@ static class PartPopulator> extends N } @Override - IndexReader newReader() + NativeIndexReader newReader() { return new TemporalIndexPartReader<>( tree, layout, descriptor ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulatorTest.java index 446a562ca0932..55e47ecd99ee1 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/NativeIndexPopulatorTest.java @@ -122,7 +122,7 @@ private static PopulatorFactory genericPopulatorFac { return ( pageCache, fs, storeFile, layout, monitor, descriptor ) -> new GenericNativeIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, spaceFillingCurveSettings, - SimpleIndexDirectoryStructures.onIndexFile( storeFile ), configuration, false ); + SimpleIndexDirectoryStructures.onIndexFile( storeFile ), configuration, false, false ); } @FunctionalInterface