Skip to content

Commit

Permalink
Update index writer delete flush settings
Browse files Browse the repository at this point in the history
Add setting for max buffered delete terms, max buffered docs;
Introduce couple of additional feature toggle for already defined configuration parameters.
  • Loading branch information
MishaDemianenko committed Oct 17, 2016
1 parent 6cdf8a1 commit 26b5b0f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Expand Up @@ -70,10 +70,13 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
static final String BATCH_SIZE_NAME = "batch_size";
static final String MAXIMUM_NUMBER_OF_WORKERS_NAME = "population_workers_maximum";

private static final String EOL = System.lineSeparator();
private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";

private final int MAXIMUM_NUMBER_OF_WORKERS = FeatureToggles.getInteger( getClass(), MAXIMUM_NUMBER_OF_WORKERS_NAME,
Runtime.getRuntime().availableProcessors() - 1 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME,
getNumberOfPopulationWorkers() * 2 );
private final int AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger( getClass(), AWAIT_TIMEOUT_MINUTES_NAME, 30 );
Expand Down Expand Up @@ -139,9 +142,9 @@ protected void populateFromQueue( long currentlyIndexedNodeId )
@Override
public String toString()
{
String updatesString = batchedUpdates.entrySet()
String updatesString = batchedUpdates.values()
.stream()
.map( entry -> entry.getKey() + " - " + entry.getValue().size() + " updates" )
.map( entry -> entry.size() + " updates" )
.collect( joining( ", ", "[", "]" ) );

return "BatchingMultipleIndexPopulator{activeTasks=" + activeTasks + ", executor=" + executor + ", " +
Expand Down Expand Up @@ -329,9 +332,9 @@ private static String allStackTraces()
*
* @return number of threads that will be used for index population
*/
private static int getNumberOfPopulationWorkers()
private int getNumberOfPopulationWorkers()
{
return Math.max( 2, Runtime.getRuntime().availableProcessors() - 1 );
return Math.max( 2, MAXIMUM_NUMBER_OF_WORKERS );
}

/**
Expand Down Expand Up @@ -378,7 +381,7 @@ public void run() throws E
{
delegate.run();
log.info( "Completed node store scan. " +
"Flushing all pending deletes." + EOL + BatchingMultipleIndexPopulator.this );
"Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this );
flushAll();
}
catch ( Throwable scanError )
Expand Down
Expand Up @@ -36,6 +36,10 @@ public final class IndexWriterConfigs
{
private static final int MAX_BUFFERED_DOCS =
FeatureToggles.getInteger( IndexWriterConfigs.class, "max_buffered_docs", 100000 );
private static final int POPULATION_MAX_BUFFERED_DOCS =
FeatureToggles.getInteger( IndexWriterConfigs.class, "population_max_buffered_docs", IndexWriterConfig.DISABLE_AUTO_FLUSH );
private static final int MAX_BUFFERED_DELETE_TERMS =
FeatureToggles.getInteger( IndexWriterConfigs.class, "max_buffered_delete_terms", 15000 );
private static final int MERGE_POLICY_MERGE_FACTOR =
FeatureToggles.getInteger( IndexWriterConfigs.class, "merge.factor", 2 );
private static final double MERGE_POLICY_NO_CFS_RATIO =
Expand All @@ -45,8 +49,10 @@ public final class IndexWriterConfigs
private static final boolean CODEC_BLOCK_TREE_ORDS_POSTING_FORMAT =
FeatureToggles.flag( IndexWriterConfigs.class, "block.tree.ords.posting.format", true );

private static final int POPULATION_RAM_BUFFER_SIZE_MB =
FeatureToggles.getInteger( IndexWriterConfigs.class, "population.ram.buffer.size", 50 );
private static final double STANDARD_RAM_BUFFER_SIZE_MB = FeatureToggles.getDouble( IndexWriterConfigs.class,
"standard.ram.buffer.size", IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB );
private static final double POPULATION_RAM_BUFFER_SIZE_MB = FeatureToggles.getDouble( IndexWriterConfigs.class,
"population.ram.buffer.size", 50 );

/**
* Default postings format for schema and label scan store indexes.
Expand All @@ -63,8 +69,10 @@ public static IndexWriterConfig standard()
IndexWriterConfig writerConfig = new IndexWriterConfig( LuceneDataSource.KEYWORD_ANALYZER );

writerConfig.setMaxBufferedDocs( MAX_BUFFERED_DOCS );
writerConfig.setMaxBufferedDeleteTerms( MAX_BUFFERED_DELETE_TERMS );
writerConfig.setIndexDeletionPolicy( new MultipleBackupDeletionPolicy() );
writerConfig.setUseCompoundFile( true );
writerConfig.setRAMBufferSizeMB( STANDARD_RAM_BUFFER_SIZE_MB );
writerConfig.setCodec(new Lucene54Codec()
{
@Override
Expand All @@ -87,7 +95,7 @@ public PostingsFormat getPostingsFormatForField( String field )
public static IndexWriterConfig population()
{
IndexWriterConfig writerConfig = standard();
writerConfig.setMaxBufferedDocs( IndexWriterConfig.DISABLE_AUTO_FLUSH );
writerConfig.setMaxBufferedDocs( POPULATION_MAX_BUFFERED_DOCS );
writerConfig.setRAMBufferSizeMB( POPULATION_RAM_BUFFER_SIZE_MB );
return writerConfig;
}
Expand Down

0 comments on commit 26b5b0f

Please sign in to comment.