From a8977bd31e9e111afe845643339347781ca5d2e3 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 16 Feb 2016 15:24:31 +0100 Subject: [PATCH] Addressed code review comments for #6334 --- .../java/org/neo4j/function/Predicates.java | 12 +----- .../kernel/api/index/IndexPopulator.java | 5 +++ .../index/BatchingMultipleIndexPopulator.java | 37 ++++++------------- .../impl/api/index/MultiPopulatorFactory.java | 6 +++ .../test/java/org/neo4j/test/RandomRule.java | 7 +++- .../api/impl/index/AbstractLuceneIndex.java | 13 ++++--- .../api/impl/schema/LuceneSchemaIndex.java | 6 +-- .../impl/schema/LuceneSchemaIndexBuilder.java | 14 +++---- .../populator/LuceneIndexPopulator.java | 2 + .../MultipleIndexPopulationStressIT.java | 12 +++--- 10 files changed, 54 insertions(+), 60 deletions(-) diff --git a/community/common/src/main/java/org/neo4j/function/Predicates.java b/community/common/src/main/java/org/neo4j/function/Predicates.java index bad1b811d9436..4c3d30dda67fa 100644 --- a/community/common/src/main/java/org/neo4j/function/Predicates.java +++ b/community/common/src/main/java/org/neo4j/function/Predicates.java @@ -144,15 +144,6 @@ public static void await( Supplier supplier, Predicate predic public static void await( Supplier condition, long timeout, TimeUnit unit ) throws TimeoutException, InterruptedException { - long checkIntervalMillis = Math.max( unit.toMillis( timeout ) / 100, 10 ); - long timeoutMillis = unit.toMillis( timeout ); - await( condition, timeoutMillis, checkIntervalMillis, TimeUnit.MILLISECONDS ); - } - - public static void await( Supplier condition, long timeout, long checkInterval, TimeUnit unit ) - throws TimeoutException, InterruptedException - { - long sleep = unit.toMillis( checkInterval ); long deadline = System.currentTimeMillis() + unit.toMillis( timeout ); do { @@ -160,13 +151,12 @@ public static void await( Supplier condition, long timeout, long checkI { return; } - Thread.sleep( sleep ); + Thread.sleep( 20 ); } while ( System.currentTimeMillis() < deadline ); throw new TimeoutException( "Waited for " + timeout + " " + unit + ", but " + condition + " was not accepted." ); } - public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) throws InterruptedException { long sleep = unit.toMillis( checkInterval ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexPopulator.java index 9e0b179e8e6b2..570111fd73940 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexPopulator.java @@ -108,6 +108,11 @@ void add( List updates ) */ void markAsFailed( String failure ) throws IOException; + /** + * Add the given {@link NodePropertyUpdate update} to the sampler for this index. + * + * @param update update to include in sample + */ void includeSample( NodePropertyUpdate update ); long sampleResult( DoubleLong.Out result ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java index 21d5b67a6180d..053c93083b0f8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java @@ -67,15 +67,12 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator { public static final String QUEUE_THRESHOLD_NAME = "queue_threshold"; - public static final String TASK_QUEUE_SIZE_NAME = "task_queue_size"; - public static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes"; - public static final String BATCH_SIZE_NAME = "batch_size"; + static final String TASK_QUEUE_SIZE_NAME = "task_queue_size"; + static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes"; + static final String BATCH_SIZE_NAME = "batch_size"; private static final String EOL = System.lineSeparator(); private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread"; - private static final String LUCENE_MERGE_THREAD_NAME_PREFIX = "Lucene Merge Thread"; - - private static final int TASKS_COMPLETION_CHECK_INTERVAL_MS = 100; private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 ); private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME, 100_000 ); @@ -168,10 +165,7 @@ private void awaitCompletion() "flush tasks to complete." + EOL + this ); Supplier allSubmittedTasksCompleted = () -> activeTasks.get() == 0; - long awaitTimeoutMillis = TimeUnit.MINUTES.toMillis( AWAIT_TIMEOUT_MINUTES ); - - Predicates.await( allSubmittedTasksCompleted, awaitTimeoutMillis, TASKS_COMPLETION_CHECK_INTERVAL_MS, - TimeUnit.MILLISECONDS ); + Predicates.await( allSubmittedTasksCompleted, AWAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES ); } catch ( TimeoutException e ) { @@ -207,8 +201,8 @@ private void flushIfNeeded( IndexDescriptor descriptor, List { if ( batch.size() >= BATCH_SIZE ) { - flush( descriptor, batch ); batchedUpdates.remove( descriptor ); + flush( descriptor, batch ); } } @@ -223,11 +217,11 @@ private void flushAll() Map.Entry> entry = entries.next(); IndexDescriptor indexDescriptor = entry.getKey(); List updates = entry.getValue(); + entries.remove(); if ( updates != null && !updates.isEmpty() ) { flush( indexDescriptor, updates ); } - entries.remove(); } } @@ -295,7 +289,7 @@ private void shutdownExecutor( boolean now ) private void handleTimeout() { throw new IllegalStateException( "Index population tasks were not able to complete in " + - AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + findStackTraces() ); + AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + allStackTraces() ); } private void handleInterrupt() @@ -320,29 +314,20 @@ private ExecutorService createThreadPool() } /** - * Finds all threads and corresponding stack traces relevant for this populator which can potentially cause the - * {@link ExecutorService executor} to not terminate in {@link #AWAIT_TIMEOUT_MINUTES} minutes. Such threads are - * ones used by executor itself and Lucene Merge threads which can cause stalls of writers. + * Finds all threads and corresponding stack traces which can potentially cause the + * {@link ExecutorService executor} to not terminate in {@link #AWAIT_TIMEOUT_MINUTES} minutes. * - * @return thread dump as string for all relevant threads. + * @return thread dump as string. */ - private static String findStackTraces() + private static String allStackTraces() { return Thread.getAllStackTraces() .entrySet() .stream() - .filter( entry -> isFlushOrLuceneMergeThread( entry.getKey() ) ) .map( entry -> Exceptions.stringify( entry.getKey(), entry.getValue() ) ) .collect( joining() ); } - private static boolean isFlushOrLuceneMergeThread( Thread thread ) - { - String name = thread.getName().toLowerCase().trim(); - return name.startsWith( FLUSH_THREAD_NAME_PREFIX.toLowerCase() ) || - name.startsWith( LUCENE_MERGE_THREAD_NAME_PREFIX.toLowerCase() ); - } - /** * An {@link IndexPopulation} that does not insert updates one by one into the index but instead adds them to the * map containing batches of updates for each index. diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultiPopulatorFactory.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultiPopulatorFactory.java index 6a89485d7e9bc..3e16b0ad8ad9f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultiPopulatorFactory.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultiPopulatorFactory.java @@ -23,6 +23,12 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.LogProvider; +/** + * Factory that is able to create either {@link MultipleIndexPopulator} or {@link BatchingMultipleIndexPopulator} + * depending on the given config. + * + * @see GraphDatabaseSettings#multi_threaded_schema_index_population_enabled + */ public abstract class MultiPopulatorFactory { private MultiPopulatorFactory() diff --git a/community/kernel/src/test/java/org/neo4j/test/RandomRule.java b/community/kernel/src/test/java/org/neo4j/test/RandomRule.java index fa8a3c7e5f600..6d9007046ec0d 100644 --- a/community/kernel/src/test/java/org/neo4j/test/RandomRule.java +++ b/community/kernel/src/test/java/org/neo4j/test/RandomRule.java @@ -24,10 +24,10 @@ import org.junit.runners.model.Statement; import java.util.Random; + import org.neo4j.test.Randoms.Configuration; import static java.lang.System.currentTimeMillis; - import static org.neo4j.helpers.Exceptions.withMessage; /** @@ -105,6 +105,11 @@ public int nextInt( int n ) return random.nextInt( n ); } + public int nextInt( int origin, int bound ) + { + return random.nextInt( (bound - origin) + 1 ) + origin; + } + public double nextGaussian() { return random.nextGaussian(); diff --git a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/AbstractLuceneIndex.java b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/AbstractLuceneIndex.java index 0f67506a6e78f..85534fcac6e27 100644 --- a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/AbstractLuceneIndex.java +++ b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/AbstractLuceneIndex.java @@ -34,8 +34,8 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; +import org.neo4j.function.Factory; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.helpers.ArrayUtil; import org.neo4j.helpers.Exceptions; @@ -60,14 +60,14 @@ public abstract class AbstractLuceneIndex implements Closeable protected final ReentrantLock partitionsLock = new ReentrantLock(); protected final PartitionedIndexStorage indexStorage; - private final Supplier writerConfigSupplier; + private final Factory writerConfigFactory; private List partitions = new CopyOnWriteArrayList<>(); private volatile boolean open; - public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, Supplier writerConfigSupplier ) + public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, Factory writerConfigFactory ) { this.indexStorage = indexStorage; - this.writerConfigSupplier = writerConfigSupplier; + this.writerConfigFactory = writerConfigFactory; } /** @@ -98,7 +98,7 @@ public void open() throws IOException for ( Map.Entry indexDirectory : indexDirectories.entrySet() ) { partitions.add( new IndexPartition( indexDirectory.getKey(), indexDirectory.getValue(), - writerConfigSupplier.get() ) ); + writerConfigFactory.newInstance() ) ); } open = true; } @@ -369,7 +369,8 @@ public IndexPartition addNewPartition() throws IOException { File partitionFolder = createNewPartitionFolder(); Directory directory = indexStorage.openDirectory( partitionFolder ); - IndexPartition indexPartition = new IndexPartition( partitionFolder, directory, writerConfigSupplier.get() ); + IndexPartition indexPartition = new IndexPartition( partitionFolder, directory, + writerConfigFactory.newInstance() ); partitions.add( indexPartition ); return indexPartition; } diff --git a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndex.java b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndex.java index 43e88b208f537..53f92366247d9 100644 --- a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndex.java +++ b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndex.java @@ -28,8 +28,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import org.neo4j.function.Factory; import org.neo4j.helpers.TaskCoordinator; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.impl.index.AbstractLuceneIndex; @@ -65,9 +65,9 @@ public class LuceneSchemaIndex extends AbstractLuceneIndex private final TaskCoordinator taskCoordinator = new TaskCoordinator( 10, TimeUnit.MILLISECONDS ); public LuceneSchemaIndex( PartitionedIndexStorage indexStorage, IndexConfiguration config, - IndexSamplingConfig samplingConfig, Supplier writerConfigSupplier ) + IndexSamplingConfig samplingConfig, Factory writerConfigFactory ) { - super( indexStorage, writerConfigSupplier ); + super( indexStorage, writerConfigFactory ); this.config = config; this.samplingConfig = samplingConfig; } diff --git a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexBuilder.java b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexBuilder.java index 1ba9c0b69ebf6..068808399a2a0 100644 --- a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexBuilder.java +++ b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/LuceneSchemaIndexBuilder.java @@ -23,8 +23,8 @@ import org.apache.lucene.index.IndexWriterConfig; import java.util.Map; -import java.util.function.Supplier; +import org.neo4j.function.Factory; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.api.impl.index.IndexWriterConfigs; import org.neo4j.kernel.api.impl.index.builder.AbstractLuceneIndexBuilder; @@ -46,7 +46,7 @@ public class LuceneSchemaIndexBuilder extends AbstractLuceneIndexBuilder writerConfigSupplier = IndexWriterConfigs::standard; + private Factory writerConfigFactory = IndexWriterConfigs::standard; private LuceneSchemaIndexBuilder() { @@ -101,14 +101,14 @@ public LuceneSchemaIndexBuilder withIndexConfig( IndexConfiguration indexConfig } /** - * Specify {@link Supplier} of lucene {@link IndexWriterConfig} to create {@link IndexWriter}s. + * Specify {@link Factory} of lucene {@link IndexWriterConfig} to create {@link IndexWriter}s. * - * @param writerConfigSupplier the supplier of writer configs + * @param writerConfigFactory the supplier of writer configs * @return index builder */ - public LuceneSchemaIndexBuilder withWriterConfig( Supplier writerConfigSupplier ) + public LuceneSchemaIndexBuilder withWriterConfig( Factory writerConfigFactory ) { - this.writerConfigSupplier = writerConfigSupplier; + this.writerConfigFactory = writerConfigFactory; return this; } @@ -130,7 +130,7 @@ public LuceneSchemaIndexBuilder uniqueIndex() */ public LuceneSchemaIndex build() { - return new LuceneSchemaIndex( storageBuilder.build(), indexConfig, samplingConfig, writerConfigSupplier ); + return new LuceneSchemaIndex( storageBuilder.build(), indexConfig, samplingConfig, writerConfigFactory ); } } diff --git a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/populator/LuceneIndexPopulator.java b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/populator/LuceneIndexPopulator.java index b6f523002758c..6754c32ea21ab 100644 --- a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/populator/LuceneIndexPopulator.java +++ b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/schema/populator/LuceneIndexPopulator.java @@ -59,6 +59,8 @@ public void drop() throws IOException @Override public void add( List updates ) throws IndexEntryConflictException, IOException { + // Lucene documents stored in a ThreadLocal and reused so we can't create an eager collection of documents here + // That is why we create a lazy Iterator and then Iterable Iterator documents = updates.stream() .map( LuceneIndexPopulator::updateAsDocument ) .iterator(); diff --git a/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java b/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java index 28df66666dd88..c928611caef9c 100644 --- a/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java +++ b/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java @@ -21,6 +21,7 @@ import org.junit.Rule; import org.junit.Test; +import org.junit.rules.RuleChain; import java.io.File; import java.io.IOException; @@ -29,7 +30,6 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.consistency.ConsistencyCheckService; @@ -91,12 +91,13 @@ public class MultipleIndexPopulationStressIT { private static final String[] TOKENS = new String[] {"One", "Two", "Three", "Four"}; - @Rule public final RandomRule random = new RandomRule(); - @Rule public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() ); - @Rule public final CleanupRule cleanup = new CleanupRule(); + + @Rule + public final RuleChain ruleChain = RuleChain.outerRule( random ).around( directory ).around( cleanup ); + private final FileSystemAbstraction fs = new DefaultFileSystemAbstraction(); @Test @@ -108,8 +109,7 @@ public void shouldPopulateMultipleIndexPopulatorsUnderStressSingleThreaded() thr @Test public void shouldPopulateMultipleIndexPopulatorsUnderStressMultiThreaded() throws Exception { - int concurrentUpdatesQueueFlushThreshold = ThreadLocalRandom.current().nextInt( 100, 5000 ); - System.out.println( "Concurrent Updates Queue Threshold: " + concurrentUpdatesQueueFlushThreshold ); + int concurrentUpdatesQueueFlushThreshold = random.nextInt( 100, 5000 ); FeatureToggles.set( BatchingMultipleIndexPopulator.class, BatchingMultipleIndexPopulator.QUEUE_THRESHOLD_NAME, concurrentUpdatesQueueFlushThreshold ); try