Skip to content

Commit

Permalink
Number of workers dependend population queue size
Browse files Browse the repository at this point in the history
Queue size will be evaluated and will be equal to 2 * number_of_population_workers
  • Loading branch information
MishaDemianenko committed Mar 9, 2016
1 parent f42e12c commit f30a3ed
Showing 1 changed file with 13 additions and 2 deletions.
Expand Up @@ -75,7 +75,8 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";

private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME, 10_000 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME,
getNumberOfPopulationWorkers() * 2 );
private final int AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger( getClass(), AWAIT_TIMEOUT_MINUTES_NAME, 30 );
private final int BATCH_SIZE = FeatureToggles.getInteger( getClass(), BATCH_SIZE_NAME, 10_000 );

Expand Down Expand Up @@ -304,7 +305,7 @@ private List<NodePropertyUpdate> newBatch()

private ExecutorService createThreadPool()
{
int threads = Math.max( 2, Runtime.getRuntime().availableProcessors() - 1 );
int threads = getNumberOfPopulationWorkers();
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>( TASK_QUEUE_SIZE );
ThreadFactory threadFactory = daemon( FLUSH_THREAD_NAME_PREFIX );
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
Expand All @@ -327,6 +328,16 @@ private static String allStackTraces()
.collect( joining() );
}

/**
* Calculate number of workers that will perform index population
*
* @return number of threads that will be used for index population
*/
private static int getNumberOfPopulationWorkers()
{
return Math.max( 2, Runtime.getRuntime().availableProcessors() - 1 );
}

/**
* An {@link IndexPopulation} that does not insert updates one by one into the index but instead adds them to the
* map containing batches of updates for each index.
Expand Down

0 comments on commit f30a3ed

Please sign in to comment.