Skip to content

Commit

Permalink
Small touch-ups based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Aug 16, 2017
1 parent d7f6772 commit bcee58e
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 30 deletions.
Expand Up @@ -41,8 +41,16 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

/**
* Utility for loading {@link SchemaIndexProvider} instances from {@link KernelExtensions}.
*/
public class SchemaIndexExtensionLoader
{
/**
* Used in scenarios where recovery isn't allowed.
*/
public static final RecoveryCleanupWorkCollector RECOVERY_PREVENTING_COLLECTOR = new RecoveryPreventingCollector();

public static SchemaIndexProviderMap loadSchemaIndexProviders( KernelExtensions extensions )
{
AllByPrioritySelectionStrategy<SchemaIndexProvider> indexProviderSelection = new AllByPrioritySelectionStrategy<>();
Expand Down Expand Up @@ -76,6 +84,4 @@ public void add( CleanupJob job )
}
}
}

public static final RecoveryCleanupWorkCollector RECOVERY_PREVENTING_COLLECTOR = new RecoveryPreventingCollector();
}
Expand Up @@ -30,17 +30,20 @@
/**
* Selects the candidate with highest priority (assumed to implement {@link Comparable}) and returns
* in {@link #select(Class, Iterable)}, but keeps the others for access too.
*
* @param <T> type of items expected to be provided into {@link #select(Class, Iterable)}. Due to signature of the
* {@link #select(Class, Iterable) select method} where an explicit and local {@code R} is defined a cast from
* {@code R} to {@code T} is required and so will fail if {@code T} isn't matching {@code R}.
*/
public class AllByPrioritySelectionStrategy<T> implements DependencyResolver.SelectionStrategy
public class AllByPrioritySelectionStrategy<T extends Comparable<T>> implements DependencyResolver.SelectionStrategy
{
@SuppressWarnings( "rawtypes" )
private List lowerPrioritizedCandidates = Collections.emptyList();
private List<T> lowerPrioritizedCandidates = Collections.emptyList();

@SuppressWarnings( {"rawtypes", "unchecked"} )
@SuppressWarnings( "unchecked" )
@Override
public <R> R select( Class<R> type, Iterable<R> candidates ) throws IllegalArgumentException
{
List<Comparable> all = (List<Comparable>) Iterables.asList( candidates );
List<T> all = (List<T>) Iterables.asList( candidates );
if ( all.isEmpty() )
{
throw new IllegalArgumentException( "Could not resolve dependency of type: " +
Expand All @@ -52,7 +55,6 @@ public <R> R select( Class<R> type, Iterable<R> candidates ) throws IllegalArgum
return highest;
}

@SuppressWarnings( "unchecked" )
public Iterable<T> lowerPrioritizedCandidates()
{
return lowerPrioritizedCandidates;
Expand Down
Expand Up @@ -64,6 +64,8 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
private static final String EOL = System.lineSeparator();
private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";

static final int BATCH_SIZE = FeatureToggles.getInteger( BatchingMultipleIndexPopulator.class, BATCH_SIZE_NAME, 10_000 );

private final int MAXIMUM_NUMBER_OF_WORKERS = FeatureToggles.getInteger( getClass(), MAXIMUM_NUMBER_OF_WORKERS_NAME,
Runtime.getRuntime().availableProcessors() - 1 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME,
Expand Down Expand Up @@ -151,39 +153,23 @@ private void awaitCompletion()
}
}

/**
* Add given {@link IndexEntryUpdate update} to the list of updates already present for the given
* {@link IndexPopulation population}. Flushes all updates if {@link #BATCH_SIZE} is reached.
*
* @param population the index population.
* @param update updates to add to the batch.
*/
private void batchUpdate( IndexPopulation population, IndexEntryUpdate<?> update )
{
if ( population.batch( update ) )
{
Collection<IndexEntryUpdate<?>> batchedUpdates = population.takeCurrentBatch();
flush( population, batchedUpdates );
}
}

/**
* Insert all batched updates into corresponding indexes.
*/
private void flushAll()
{
populations.forEach( population -> flush( population, population.takeCurrentBatch() ) );
populations.forEach( population -> flush( population ) );
}

/**
* Insert the given batch of updates into the index defined by the given {@link IndexPopulation}.
*
* @param population the index population.
* @param batch the list of updates to insert.
*/
private void flush( IndexPopulation population, Collection<IndexEntryUpdate<?>> batch )
private void flush( IndexPopulation population )
{
activeTasks.incrementAndGet();
Collection<IndexEntryUpdate<?>> batch = population.takeCurrentBatch();

executor.execute( () ->
{
Expand Down
Expand Up @@ -55,7 +55,7 @@

import static java.lang.String.format;
import static org.neo4j.collection.primitive.PrimitiveIntCollections.contains;
import static org.neo4j.kernel.impl.api.index.BatchingMultipleIndexPopulator.BATCH_SIZE_NAME;
import static org.neo4j.kernel.impl.api.index.BatchingMultipleIndexPopulator.BATCH_SIZE;
import static org.neo4j.kernel.impl.api.index.IndexPopulationFailure.failure;

/**
Expand Down Expand Up @@ -86,10 +86,8 @@
*/
public class MultipleIndexPopulator implements IndexPopulator
{

public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 );
private final int BATCH_SIZE = FeatureToggles.getInteger( BatchingMultipleIndexPopulator.class, BATCH_SIZE_NAME, 10_000 );

// Concurrency queue since multiple concurrent threads may enqueue updates into it. It is important for this queue
// to have fast #size() method since it might be drained in batches
Expand Down

0 comments on commit bcee58e

Please sign in to comment.