Skip to content

Commit

Permalink
Fix race between index population job and index map update
Browse files Browse the repository at this point in the history
Previously IndexingService on index creation was starting index population job
inside function that was doing index map state update.
And in cases when job were started before map state update was finished
the job will miss any updates to index map that was caused by index create
request that actually created that particular population job execution.
This PR changes that behaviour and will start job population, if required,
after index state map update.
  • Loading branch information
MishaDemianenko committed Nov 27, 2017
1 parent 12fd2b9 commit c91ea5e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 57 deletions.
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;


import org.neo4j.function.ThrowingFunction;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.TokenNameLookup; import org.neo4j.kernel.api.TokenNameLookup;
Expand Down Expand Up @@ -439,56 +440,9 @@ public Iterable<IndexEntryUpdate<LabelSchemaDescriptor>> convertToIndexUpdates(
*/ */
public void createIndexes( IndexRule... rules ) throws IOException public void createIndexes( IndexRule... rules ) throws IOException
{ {
indexMapRef.modify( indexMap -> IndexPopulationStarter populationStarter = new IndexPopulationStarter( rules );
{ indexMapRef.modify( populationStarter );
IndexPopulationJob populationJob = null; populationStarter.startPopulation();

for ( IndexRule rule : rules )
{
long ruleId = rule.getId();
IndexProxy index = indexMap.getIndexProxy( ruleId );
if ( index != null && state == State.NOT_STARTED )
{
// During recovery we might run into this scenario:
// - We're starting recovery on a database, where init() is called and all indexes that
// are found in the store, instantiated and put into the IndexMap. Among them is index X.
// - While we recover the database we bump into a transaction creating index Y, with the
// same IndexDescriptor, i.e. same label/property, as X. This is possible since this took
// place before the creation of X.
// - When Y is dropped in between this creation and the creation of X (it will have to be
// otherwise X wouldn't have had an opportunity to be created) the index is removed from
// the IndexMap, both by id AND descriptor.
//
// Because of the scenario above we need to put this created index into the IndexMap
// again, otherwise it will disappear from the IndexMap (at least for lookup by descriptor)
// and not be able to accept changes applied from recovery later on.
indexMap.putIndexProxy( ruleId, index );
continue;
}
final IndexDescriptor descriptor = rule.getIndexDescriptor();
SchemaIndexProvider.Descriptor providerDescriptor = rule.getProviderDescriptor();
boolean flipToTentative = rule.canSupportUniqueConstraint();
if ( state == State.RUNNING )
{
populationJob = populationJob == null ? newIndexPopulationJob() : populationJob;
index = indexProxyCreator.createPopulatingIndexProxy(
ruleId, descriptor, providerDescriptor, flipToTentative, monitor, populationJob );
index.start();
}
else
{
index = indexProxyCreator.createRecoveringIndexProxy( descriptor, providerDescriptor );
}

indexMap.putIndexProxy( rule.getId(), index );
}

if ( populationJob != null )
{
startIndexPopulation( populationJob );
}
return indexMap;
} );
} }


private void processUpdate( IndexUpdaterMap updaterMap, IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate ) private void processUpdate( IndexUpdaterMap updaterMap, IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate )
Expand Down Expand Up @@ -725,6 +679,70 @@ private void logIndexStateSummary( String method, Map<InternalIndexState,List<In
log.info( format( "IndexingService.%s: indexes not specifically mentioned above are %s", method, mostPopularState ) ); log.info( format( "IndexingService.%s: indexes not specifically mentioned above are %s", method, mostPopularState ) );
} }


private final class IndexPopulationStarter implements ThrowingFunction<IndexMap,IndexMap,IOException>
{
private final IndexRule[] rules;
private IndexPopulationJob populationJob;

IndexPopulationStarter( IndexRule[] rules )
{
this.rules = rules;
}

@Override
public IndexMap apply( IndexMap indexMap ) throws IOException
{
for ( IndexRule rule : rules )
{
long ruleId = rule.getId();
IndexProxy index = indexMap.getIndexProxy( ruleId );
if ( index != null && state == State.NOT_STARTED )
{
// During recovery we might run into this scenario:
// - We're starting recovery on a database, where init() is called and all indexes that
// are found in the store, instantiated and put into the IndexMap. Among them is index X.
// - While we recover the database we bump into a transaction creating index Y, with the
// same IndexDescriptor, i.e. same label/property, as X. This is possible since this took
// place before the creation of X.
// - When Y is dropped in between this creation and the creation of X (it will have to be
// otherwise X wouldn't have had an opportunity to be created) the index is removed from
// the IndexMap, both by id AND descriptor.
//
// Because of the scenario above we need to put this created index into the IndexMap
// again, otherwise it will disappear from the IndexMap (at least for lookup by descriptor)
// and not be able to accept changes applied from recovery later on.
indexMap.putIndexProxy( ruleId, index );
continue;
}
final IndexDescriptor descriptor = rule.getIndexDescriptor();
Descriptor providerDescriptor = rule.getProviderDescriptor();
boolean flipToTentative = rule.canSupportUniqueConstraint();
if ( state == State.RUNNING )
{
populationJob = populationJob == null ? newIndexPopulationJob() : populationJob;
index = indexProxyCreator.createPopulatingIndexProxy(
ruleId, descriptor, providerDescriptor, flipToTentative, monitor, populationJob );
index.start();
}
else
{
index = indexProxyCreator.createRecoveringIndexProxy( descriptor, providerDescriptor );
}

indexMap.putIndexProxy( rule.getId(), index );
}
return indexMap;
}

void startPopulation()
{
if ( populationJob != null )
{
startIndexPopulation( populationJob );
}
}
}

private final class IndexLogRecord private final class IndexLogRecord
{ {
private final long indexId; private final long indexId;
Expand Down
Expand Up @@ -87,9 +87,8 @@
import org.neo4j.test.rule.EmbeddedDatabaseRule; import org.neo4j.test.rule.EmbeddedDatabaseRule;
import org.neo4j.values.storable.Values; import org.neo4j.values.storable.Values;


import static org.junit.Assert.assertEquals;

import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;


//[NodePropertyUpdate[0, prop:0 add:Sweden, labelsBefore:[], labelsAfter:[0]]] //[NodePropertyUpdate[0, prop:0 add:Sweden, labelsBefore:[], labelsAfter:[0]]]
//[NodePropertyUpdate[1, prop:0 add:USA, labelsBefore:[], labelsAfter:[0]]] //[NodePropertyUpdate[1, prop:0 add:USA, labelsBefore:[], labelsAfter:[0]]]
Expand All @@ -110,12 +109,11 @@ public class MultiIndexPopulationConcurrentUpdatesIT
public EmbeddedDatabaseRule embeddedDatabase = new EmbeddedDatabaseRule(); public EmbeddedDatabaseRule embeddedDatabase = new EmbeddedDatabaseRule();


@Parameterized.Parameters( name = "{0}" ) @Parameterized.Parameters( name = "{0}" )
public static Collection<Object[]> parameters() public static Collection<SchemaIndexProvider.Descriptor> parameters()
{ {
return asList( return asList( LuceneSchemaIndexProviderFactory.PROVIDER_DESCRIPTOR,
new Object[]{LuceneSchemaIndexProviderFactory.PROVIDER_DESCRIPTOR}, NativeLuceneFusionSchemaIndexProviderFactory.DESCRIPTOR,
new Object[]{NativeLuceneFusionSchemaIndexProviderFactory.DESCRIPTOR}, InMemoryIndexProviderFactory.PROVIDER_DESCRIPTOR );
new Object[]{InMemoryIndexProviderFactory.PROVIDER_DESCRIPTOR} );
} }


@Parameterized.Parameter( 0 ) @Parameterized.Parameter( 0 )
Expand Down

0 comments on commit c91ea5e

Please sign in to comment.