From 6d305b0e62a2a95b30ce6b0906d31f437a35d89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Thu, 7 Feb 2019 11:52:03 +0100 Subject: [PATCH] BlockBasedIndexPopulator supports parallel threads adding scan batches Merging is done by multiple threads from an ExecutorService, ideally merging jobs would be passed to JobScheduler but for now that is not available to us. BlockBasedIndexPopulator becomes abstract and implemented by GenericBlockBasedIndexPopulator, because we need to construct specific index reader to be used by DeferredConflictCheckingIndexUpdater. Stop hiding BlockBasedIndexPopulator behind WorkSyncedNativeIndexPopulator, as we can now handle concurrent adds. Merge factor is controlled by FeatureToggle. --- .../api/index/MultipleIndexPopulator.java | 6 +- .../schema/BlockBasedIndexPopulator.java | 131 +++++++++++++----- .../impl/index/schema/BlockStorage.java | 6 +- .../GenericBlockBasedIndexPopulator.java | 51 +++++++ .../schema/GenericNativeIndexProvider.java | 6 +- .../impl/index/schema/BlockStorageTest.java | 11 +- 6 files changed, 168 insertions(+), 43 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericBlockBasedIndexPopulator.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java index 43eb51d2e992a..f400d3a1446cc 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java @@ -578,7 +578,11 @@ void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException { if ( populationOngoing ) { - populator.add( takeCurrentBatch() ); + Collection> finalBatch = takeCurrentBatch(); + if ( !finalBatch.isEmpty() ) + { + populator.add( finalBatch ); + } populateFromQueue( 0, Long.MAX_VALUE ); if ( populations.contains( IndexPopulation.this ) ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java index d15d12b9934b9..5fa1945c54757 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java @@ -19,13 +19,20 @@ */ package org.neo4j.kernel.impl.index.schema; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration; import org.neo4j.index.internal.gbptree.Writer; import org.neo4j.io.ByteUnit; import org.neo4j.io.IOUtils; @@ -44,35 +51,53 @@ import org.neo4j.util.Preconditions; import org.neo4j.values.storable.Value; +import static org.neo4j.helpers.collection.Iterables.asList; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex; -public class BlockBasedIndexPopulator,VALUE extends NativeIndexValue> extends NativeIndexPopulator +public abstract class BlockBasedIndexPopulator,VALUE extends NativeIndexValue> extends NativeIndexPopulator { private static final String BLOCK_SIZE = FeatureToggles.getString( BlockBasedIndexPopulator.class, "blockSize", "1M" ); + private static final int MERGE_FACTOR = FeatureToggles.getInteger( BlockBasedIndexPopulator.class, "mergeFactor", 8 ); // TODO some better ByteBuffers, right? private static final ByteBufferFactory BYTE_BUFFER_FACTORY = ByteBuffer::allocate; - private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings; private final IndexDirectoryStructure directoryStructure; - private final SpaceFillingCurveConfiguration configuration; private final boolean archiveFailedIndex; private final int blockSize; - private BlockStorage scanUpdates; + private ThreadLocal> scanUpdates; + private List> allScanUpdates = new ArrayList<>(); private IndexUpdateStorage externalUpdates; private boolean merged; BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout layout, IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, - IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex ) + IndexDirectoryStructure directoryStructure, boolean archiveFailedIndex ) { super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) ); - this.spatialSettings = spatialSettings; this.directoryStructure = directoryStructure; - this.configuration = configuration; this.archiveFailedIndex = archiveFailedIndex; this.blockSize = parseBlockSize(); + this.scanUpdates = ThreadLocal.withInitial( this::newThreadLocalBlockStorage ); + } + + private synchronized BlockStorage newThreadLocalBlockStorage() + { + Preconditions.checkState( !merged, "Already merged" ); + try + { + int id = allScanUpdates.size(); + BlockStorage blockStorage = + new BlockStorage<>( layout, BYTE_BUFFER_FACTORY, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".scan-" + id ), + BlockStorage.Monitor.NO_MONITOR, blockSize ); + allScanUpdates.add( blockStorage ); + return blockStorage; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } } private static int parseBlockSize() @@ -96,8 +121,6 @@ public void create() super.create(); try { - scanUpdates = new BlockStorage<>( layout, BYTE_BUFFER_FACTORY, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".temp" ), - BlockStorage.Monitor.NO_MONITOR, blockSize ); externalUpdates = new IndexUpdateStorage<>( layout, fileSystem, new File( storeFile.getParent(), storeFile.getName() + ".ext" ), BYTE_BUFFER_FACTORY.newBuffer( blockSize ) ); } @@ -110,9 +133,10 @@ public void create() @Override public void add( Collection> updates ) { + BlockStorage blockStorage = scanUpdates.get(); for ( IndexEntryUpdate update : updates ) { - storeUpdate( update, scanUpdates ); + storeUpdate( update, blockStorage ); } } @@ -142,8 +166,29 @@ public void scanCompleted() throws IndexEntryConflictException { try { - scanUpdates.doneAdding(); - scanUpdates.merge(); + ExecutorService executorService = Executors.newFixedThreadPool( allScanUpdates.size() ); + List> mergeFutures = new ArrayList<>(); + for ( BlockStorage scanUpdates : allScanUpdates ) + { + mergeFutures.add( executorService.submit( () -> + { + scanUpdates.doneAdding(); + scanUpdates.merge( MERGE_FACTOR ); + return null; + } ) ); + } + executorService.shutdown(); + while ( !executorService.awaitTermination( 1, TimeUnit.SECONDS ) ) + { + // just wait longer + // TODO check drop/close + } + // Let potential exceptions in the merge threads have a chance to propagate + for ( Future mergeFuture : mergeFutures ) + { + mergeFuture.get(); + } + externalUpdates.doneAdding(); // don't merge and sort the external updates @@ -158,6 +203,21 @@ public void scanCompleted() throws IndexEntryConflictException { throw new UncheckedIOException( e ); } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new RuntimeException( "Got interrupted, so merge not completed", e ); + } + catch ( ExecutionException e ) + { + // Propagating merge exception from other thread + Throwable executionException = e.getCause(); + if ( executionException instanceof RuntimeException ) + { + throw (RuntimeException) executionException; + } + throw new RuntimeException( executionException ); + } } private void writeExternalUpdatesToTree() throws IOException @@ -189,26 +249,36 @@ private void writeExternalUpdatesToTree() throws IOException private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictException { ConflictDetectingValueMerger conflictDetector = getMainConflictDetector(); - try ( Writer writer = tree.writer(); - BlockReader reader = scanUpdates.reader() ) + MergingBlockEntryReader allEntries = new MergingBlockEntryReader<>( layout ); + for ( BlockStorage scanUpdates : allScanUpdates ) { - BlockEntryReader maybeBlock = reader.nextBlock(); - if ( maybeBlock != null ) + try ( BlockReader reader = scanUpdates.reader() ) { - try ( BlockEntryReader block = maybeBlock ) + BlockEntryReader singleMergedBlock = reader.nextBlock(); + if ( singleMergedBlock != null ) { - while ( block.next() ) + allEntries.addSource( singleMergedBlock ); + if ( reader.nextBlock() != null ) { - conflictDetector.controlConflictDetection( block.key() ); - writer.merge( block.key(), block.value(), conflictDetector ); - if ( conflictDetector.wasConflicting() ) - { - conflictDetector.reportConflict( block.key().asValues() ); - } + throw new IllegalStateException( "Final BlockStorage had multiple blocks" ); } } } } + + try ( Writer writer = tree.writer(); + BlockEntryCursor reader = allEntries ) + { + while ( reader.next() ) + { + conflictDetector.controlConflictDetection( reader.key() ); + writer.merge( reader.key(), reader.value(), conflictDetector ); + if ( conflictDetector.wasConflicting() ) + { + conflictDetector.reportConflict( reader.key().asValues() ); + } + } + } } @Override @@ -216,6 +286,7 @@ public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor ) { if ( merged ) { + // Will need the reader from newReader, which a sub-class of this class implements return super.newPopulatingUpdater( accessor ); } @@ -241,17 +312,13 @@ public void close() }; } - @Override - NativeIndexReader newReader() - { - throw new UnsupportedOperationException( "Should not be needed because we're overriding the populating updater anyway" ); - } - @Override public void close( boolean populationCompletedSuccessfully ) { // TODO Make responsive - IOUtils.closeAllSilently( externalUpdates, scanUpdates ); + List toClose = new ArrayList<>( asList( allScanUpdates ) ); + toClose.add( externalUpdates ); + IOUtils.closeAllUnchecked( toClose ); super.close( populationCompletedSuccessfully ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java index 78e41e4360801..beee3bc05a541 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorage.java @@ -119,19 +119,19 @@ private void flushAndResetBuffer() throws IOException resetBufferedEntries(); } - public void merge() throws IOException + public void merge( int mergeFactor ) throws IOException { File sourceFile = blockFile; File tempFile = new File( blockFile.getParent(), blockFile.getName() + ".b" ); try { - int mergeFactor = 16; File targetFile = tempFile; while ( numberOfBlocksInCurrentFile > 1 ) { // Perform one complete merge iteration, merging all blocks from source into target. // After this step, target will contain fewer blocks than source, but may need another merge iteration. - try ( BlockReader reader = reader( sourceFile ); StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) ) + try ( BlockReader reader = reader( sourceFile ); + StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) ) { int blocksMergedSoFar = 0; int blocksInMergedFile = 0; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericBlockBasedIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericBlockBasedIndexPopulator.java new file mode 100644 index 0000000000000..3ecac1a4b0b98 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericBlockBasedIndexPopulator.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.File; + +import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.api.index.IndexDirectoryStructure; +import org.neo4j.kernel.api.index.IndexProvider; +import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache; +import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; + +public class GenericBlockBasedIndexPopulator extends BlockBasedIndexPopulator +{ + private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings; + private final SpaceFillingCurveConfiguration configuration; + + GenericBlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout layout, + IndexProvider.Monitor monitor, StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings, + IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex ) + { + super( pageCache, fs, file, layout, monitor, descriptor, spatialSettings, directoryStructure, archiveFailedIndex ); + this.spatialSettings = spatialSettings; + this.configuration = configuration; + } + + @Override + NativeIndexReader newReader() + { + return new GenericNativeIndexReader( tree, layout, descriptor, spatialSettings, configuration ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java index f63f2c64a831b..d0c08bda187f0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/GenericNativeIndexProvider.java @@ -163,10 +163,8 @@ protected IndexPopulator newIndexPopulator( File storeFile, GenericLayout layout } else { - NativeIndexPopulator actualPopulator = - new BlockBasedIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(), - directoryStructure(), configuration, archiveFailedIndex ); - return new WorkSyncedNativeIndexPopulator<>( actualPopulator ); + return new GenericBlockBasedIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(), + directoryStructure(), configuration, archiveFailedIndex ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java index 56f92abb4b8d7..f96ab9cbda340 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java @@ -165,7 +165,7 @@ void shouldMergeWhenEmpty() throws IOException try ( BlockStorage storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) ) { // when - storage.merge(); + storage.merge( randomMergeFactor() ); // then assertEquals( 0, monitor.mergeIterationCallCount ); @@ -185,7 +185,7 @@ void shouldMergeSingleBlock() throws IOException storage.doneAdding(); // when - storage.merge(); + storage.merge( randomMergeFactor() ); // then assertEquals( 0, monitor.mergeIterationCallCount ); @@ -206,7 +206,7 @@ void shouldMergeMultipleBlocks() throws IOException storage.doneAdding(); // when - storage.merge(); + storage.merge( randomMergeFactor() ); // then assertContents( layout, storage, asOneBigBlock( expectedBlocks ) ); @@ -274,6 +274,11 @@ private Iterable>> asOneBigBlock( List< return singletonList( all ); } + private int randomMergeFactor() + { + return random.nextInt( 2, 8 ); + } + private void print( List>> expectedBlocks ) { for ( List> expectedBlock : expectedBlocks )