Skip to content

Commit

Permalink
Addressed code review comments for #6334
Browse files Browse the repository at this point in the history
  • Loading branch information
lutovich committed Feb 16, 2016
1 parent 78e5d21 commit a8977bd
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 60 deletions.
Expand Up @@ -144,29 +144,19 @@ public static <TYPE> void await( Supplier<TYPE> supplier, Predicate<TYPE> predic
public static void await( Supplier<Boolean> condition, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
{
long checkIntervalMillis = Math.max( unit.toMillis( timeout ) / 100, 10 );
long timeoutMillis = unit.toMillis( timeout );
await( condition, timeoutMillis, checkIntervalMillis, TimeUnit.MILLISECONDS );
}

public static void await( Supplier<Boolean> condition, long timeout, long checkInterval, TimeUnit unit )
throws TimeoutException, InterruptedException
{
long sleep = unit.toMillis( checkInterval );
long deadline = System.currentTimeMillis() + unit.toMillis( timeout );
do
{
if ( condition.get() )
{
return;
}
Thread.sleep( sleep );
Thread.sleep( 20 );
}
while ( System.currentTimeMillis() < deadline );
throw new TimeoutException( "Waited for " + timeout + " " + unit + ", but " + condition + " was not accepted." );
}


public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) throws InterruptedException
{
long sleep = unit.toMillis( checkInterval );
Expand Down
Expand Up @@ -108,6 +108,11 @@ void add( List<NodePropertyUpdate> updates )
*/
void markAsFailed( String failure ) throws IOException;

/**
* Add the given {@link NodePropertyUpdate update} to the sampler for this index.
*
* @param update update to include in sample
*/
void includeSample( NodePropertyUpdate update );

long sampleResult( DoubleLong.Out result );
Expand Down
Expand Up @@ -67,15 +67,12 @@
public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
{
public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
public static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
public static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
public static final String BATCH_SIZE_NAME = "batch_size";
static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
static final String BATCH_SIZE_NAME = "batch_size";

private static final String EOL = System.lineSeparator();
private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";
private static final String LUCENE_MERGE_THREAD_NAME_PREFIX = "Lucene Merge Thread";

private static final int TASKS_COMPLETION_CHECK_INTERVAL_MS = 100;

private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME, 100_000 );
Expand Down Expand Up @@ -168,10 +165,7 @@ private void awaitCompletion()
"flush tasks to complete." + EOL + this );

Supplier<Boolean> allSubmittedTasksCompleted = () -> activeTasks.get() == 0;
long awaitTimeoutMillis = TimeUnit.MINUTES.toMillis( AWAIT_TIMEOUT_MINUTES );

Predicates.await( allSubmittedTasksCompleted, awaitTimeoutMillis, TASKS_COMPLETION_CHECK_INTERVAL_MS,
TimeUnit.MILLISECONDS );
Predicates.await( allSubmittedTasksCompleted, AWAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES );
}
catch ( TimeoutException e )
{
Expand Down Expand Up @@ -207,8 +201,8 @@ private void flushIfNeeded( IndexDescriptor descriptor, List<NodePropertyUpdate>
{
if ( batch.size() >= BATCH_SIZE )
{
flush( descriptor, batch );
batchedUpdates.remove( descriptor );
flush( descriptor, batch );
}
}

Expand All @@ -223,11 +217,11 @@ private void flushAll()
Map.Entry<IndexDescriptor,List<NodePropertyUpdate>> entry = entries.next();
IndexDescriptor indexDescriptor = entry.getKey();
List<NodePropertyUpdate> updates = entry.getValue();
entries.remove();
if ( updates != null && !updates.isEmpty() )
{
flush( indexDescriptor, updates );
}
entries.remove();
}
}

Expand Down Expand Up @@ -295,7 +289,7 @@ private void shutdownExecutor( boolean now )
private void handleTimeout()
{
throw new IllegalStateException( "Index population tasks were not able to complete in " +
AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + findStackTraces() );
AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + allStackTraces() );
}

private void handleInterrupt()
Expand All @@ -320,29 +314,20 @@ private ExecutorService createThreadPool()
}

/**
* Finds all threads and corresponding stack traces relevant for this populator which can potentially cause the
* {@link ExecutorService executor} to not terminate in {@link #AWAIT_TIMEOUT_MINUTES} minutes. Such threads are
* ones used by executor itself and Lucene Merge threads which can cause stalls of writers.
* Finds all threads and corresponding stack traces which can potentially cause the
* {@link ExecutorService executor} to not terminate in {@link #AWAIT_TIMEOUT_MINUTES} minutes.
*
* @return thread dump as string for all relevant threads.
* @return thread dump as string.
*/
private static String findStackTraces()
private static String allStackTraces()
{
return Thread.getAllStackTraces()
.entrySet()
.stream()
.filter( entry -> isFlushOrLuceneMergeThread( entry.getKey() ) )
.map( entry -> Exceptions.stringify( entry.getKey(), entry.getValue() ) )
.collect( joining() );
}

private static boolean isFlushOrLuceneMergeThread( Thread thread )
{
String name = thread.getName().toLowerCase().trim();
return name.startsWith( FLUSH_THREAD_NAME_PREFIX.toLowerCase() ) ||
name.startsWith( LUCENE_MERGE_THREAD_NAME_PREFIX.toLowerCase() );
}

/**
* An {@link IndexPopulation} that does not insert updates one by one into the index but instead adds them to the
* map containing batches of updates for each index.
Expand Down
Expand Up @@ -23,6 +23,12 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

/**
* Factory that is able to create either {@link MultipleIndexPopulator} or {@link BatchingMultipleIndexPopulator}
* depending on the given config.
*
* @see GraphDatabaseSettings#multi_threaded_schema_index_population_enabled
*/
public abstract class MultiPopulatorFactory
{
private MultiPopulatorFactory()
Expand Down
Expand Up @@ -24,10 +24,10 @@
import org.junit.runners.model.Statement;

import java.util.Random;

import org.neo4j.test.Randoms.Configuration;

import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Exceptions.withMessage;

/**
Expand Down Expand Up @@ -105,6 +105,11 @@ public int nextInt( int n )
return random.nextInt( n );
}

public int nextInt( int origin, int bound )
{
return random.nextInt( (bound - origin) + 1 ) + origin;
}

public double nextGaussian()
{
return random.nextGaussian();
Expand Down
Expand Up @@ -34,8 +34,8 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.neo4j.function.Factory;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.helpers.Exceptions;
Expand All @@ -60,14 +60,14 @@ public abstract class AbstractLuceneIndex implements Closeable
protected final ReentrantLock partitionsLock = new ReentrantLock();

protected final PartitionedIndexStorage indexStorage;
private final Supplier<IndexWriterConfig> writerConfigSupplier;
private final Factory<IndexWriterConfig> writerConfigFactory;
private List<IndexPartition> partitions = new CopyOnWriteArrayList<>();
private volatile boolean open;

public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, Supplier<IndexWriterConfig> writerConfigSupplier )
public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, Factory<IndexWriterConfig> writerConfigFactory )
{
this.indexStorage = indexStorage;
this.writerConfigSupplier = writerConfigSupplier;
this.writerConfigFactory = writerConfigFactory;
}

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ public void open() throws IOException
for ( Map.Entry<File,Directory> indexDirectory : indexDirectories.entrySet() )
{
partitions.add( new IndexPartition( indexDirectory.getKey(), indexDirectory.getValue(),
writerConfigSupplier.get() ) );
writerConfigFactory.newInstance() ) );
}
open = true;
}
Expand Down Expand Up @@ -369,7 +369,8 @@ public IndexPartition addNewPartition() throws IOException
{
File partitionFolder = createNewPartitionFolder();
Directory directory = indexStorage.openDirectory( partitionFolder );
IndexPartition indexPartition = new IndexPartition( partitionFolder, directory, writerConfigSupplier.get() );
IndexPartition indexPartition = new IndexPartition( partitionFolder, directory,
writerConfigFactory.newInstance() );
partitions.add( indexPartition );
return indexPartition;
}
Expand Down
Expand Up @@ -28,8 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.function.Factory;
import org.neo4j.helpers.TaskCoordinator;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.index.AbstractLuceneIndex;
Expand Down Expand Up @@ -65,9 +65,9 @@ public class LuceneSchemaIndex extends AbstractLuceneIndex
private final TaskCoordinator taskCoordinator = new TaskCoordinator( 10, TimeUnit.MILLISECONDS );

public LuceneSchemaIndex( PartitionedIndexStorage indexStorage, IndexConfiguration config,
IndexSamplingConfig samplingConfig, Supplier<IndexWriterConfig> writerConfigSupplier )
IndexSamplingConfig samplingConfig, Factory<IndexWriterConfig> writerConfigFactory )
{
super( indexStorage, writerConfigSupplier );
super( indexStorage, writerConfigFactory );
this.config = config;
this.samplingConfig = samplingConfig;
}
Expand Down
Expand Up @@ -23,8 +23,8 @@
import org.apache.lucene.index.IndexWriterConfig;

import java.util.Map;
import java.util.function.Supplier;

import org.neo4j.function.Factory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.api.impl.index.IndexWriterConfigs;
import org.neo4j.kernel.api.impl.index.builder.AbstractLuceneIndexBuilder;
Expand All @@ -46,7 +46,7 @@ public class LuceneSchemaIndexBuilder extends AbstractLuceneIndexBuilder<LuceneS
{
private IndexSamplingConfig samplingConfig = new IndexSamplingConfig( new Config() );
private IndexConfiguration indexConfig = IndexConfiguration.NON_UNIQUE;
private Supplier<IndexWriterConfig> writerConfigSupplier = IndexWriterConfigs::standard;
private Factory<IndexWriterConfig> writerConfigFactory = IndexWriterConfigs::standard;

private LuceneSchemaIndexBuilder()
{
Expand Down Expand Up @@ -101,14 +101,14 @@ public LuceneSchemaIndexBuilder withIndexConfig( IndexConfiguration indexConfig
}

/**
* Specify {@link Supplier} of lucene {@link IndexWriterConfig} to create {@link IndexWriter}s.
* Specify {@link Factory} of lucene {@link IndexWriterConfig} to create {@link IndexWriter}s.
*
* @param writerConfigSupplier the supplier of writer configs
* @param writerConfigFactory the supplier of writer configs
* @return index builder
*/
public LuceneSchemaIndexBuilder withWriterConfig( Supplier<IndexWriterConfig> writerConfigSupplier )
public LuceneSchemaIndexBuilder withWriterConfig( Factory<IndexWriterConfig> writerConfigFactory )
{
this.writerConfigSupplier = writerConfigSupplier;
this.writerConfigFactory = writerConfigFactory;
return this;
}

Expand All @@ -130,7 +130,7 @@ public LuceneSchemaIndexBuilder uniqueIndex()
*/
public LuceneSchemaIndex build()
{
return new LuceneSchemaIndex( storageBuilder.build(), indexConfig, samplingConfig, writerConfigSupplier );
return new LuceneSchemaIndex( storageBuilder.build(), indexConfig, samplingConfig, writerConfigFactory );
}

}
Expand Up @@ -59,6 +59,8 @@ public void drop() throws IOException
@Override
public void add( List<NodePropertyUpdate> updates ) throws IndexEntryConflictException, IOException
{
// Lucene documents stored in a ThreadLocal and reused so we can't create an eager collection of documents here
// That is why we create a lazy Iterator and then Iterable
Iterator<Document> documents = updates.stream()
.map( LuceneIndexPopulator::updateAsDocument )
.iterator();
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import java.io.File;
import java.io.IOException;
Expand All @@ -29,7 +30,6 @@
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.consistency.ConsistencyCheckService;
Expand Down Expand Up @@ -91,12 +91,13 @@ public class MultipleIndexPopulationStressIT
{
private static final String[] TOKENS = new String[] {"One", "Two", "Three", "Four"};

@Rule
public final RandomRule random = new RandomRule();
@Rule
public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
@Rule
public final CleanupRule cleanup = new CleanupRule();

@Rule
public final RuleChain ruleChain = RuleChain.outerRule( random ).around( directory ).around( cleanup );

private final FileSystemAbstraction fs = new DefaultFileSystemAbstraction();

@Test
Expand All @@ -108,8 +109,7 @@ public void shouldPopulateMultipleIndexPopulatorsUnderStressSingleThreaded() thr
@Test
public void shouldPopulateMultipleIndexPopulatorsUnderStressMultiThreaded() throws Exception
{
int concurrentUpdatesQueueFlushThreshold = ThreadLocalRandom.current().nextInt( 100, 5000 );
System.out.println( "Concurrent Updates Queue Threshold: " + concurrentUpdatesQueueFlushThreshold );
int concurrentUpdatesQueueFlushThreshold = random.nextInt( 100, 5000 );
FeatureToggles.set( BatchingMultipleIndexPopulator.class, BatchingMultipleIndexPopulator.QUEUE_THRESHOLD_NAME,
concurrentUpdatesQueueFlushThreshold );
try
Expand Down

0 comments on commit a8977bd

Please sign in to comment.