Skip to content

Commit

Permalink
BatchInserterImpl always close populator on shutdown
Browse files Browse the repository at this point in the history
Previously if exceptions was thrown when rebuilding indexes, for example
because of uniqueness constraint violation, then populator would not be
closed and open index files would prevent page cache from shutting down.
  • Loading branch information
burqen committed Jul 11, 2018
1 parent 530a018 commit de7b003
Showing 1 changed file with 75 additions and 42 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.graphdb.schema.IndexCreator;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.IteratorWrapper;
import org.neo4j.helpers.collection.Visitor;
Expand Down Expand Up @@ -490,59 +491,80 @@ private void repopulateAllIndexes() throws IOException, IndexEntryConflictExcept

final IndexRule[] rules = getIndexesNeedingPopulation();
final List<IndexPopulatorWithSchema> populators = new ArrayList<>( rules.length );

final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[rules.length];

for ( int i = 0; i < rules.length; i++ )
try
{
IndexRule rule = rules[i];
SchemaIndexDescriptor index = rule.getIndexDescriptor();
descriptors[i] = index.schema();
IndexPopulator populator = schemaIndexProviders.lookup( rule.getProviderDescriptor() )
.getPopulator( rule.getId(), index, new IndexSamplingConfig( config ) );
populator.create();
populators.add( new IndexPopulatorWithSchema( populator, index ) );
}
final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[rules.length];

Visitor<NodeUpdates, IOException> propertyUpdateVisitor = updates ->
{
// Do a lookup from which property has changed to a list of indexes worried about that property.
// We do not need to load additional properties as the NodeUpdates for a full node store scan already
// include all properties for the node.
for ( IndexEntryUpdate<IndexPopulatorWithSchema> indexUpdate : updates.forIndexKeys( populators ) )
for ( int i = 0; i < rules.length; i++ )
{
try
{
indexUpdate.indexKey().add( indexUpdate );
}
catch ( IndexEntryConflictException conflict )
IndexRule rule = rules[i];
SchemaIndexDescriptor index = rule.getIndexDescriptor();
descriptors[i] = index.schema();
IndexPopulator populator = schemaIndexProviders.lookup( rule.getProviderDescriptor() )
.getPopulator( rule.getId(), index, new IndexSamplingConfig( config ) );
populator.create();
populators.add( new IndexPopulatorWithSchema( populator, index ) );
}

Visitor<NodeUpdates,IOException> propertyUpdateVisitor = updates ->
{
// Do a lookup from which property has changed to a list of indexes worried about that property.
// We do not need to load additional properties as the NodeUpdates for a full node store scan already
// include all properties for the node.
for ( IndexEntryUpdate<IndexPopulatorWithSchema> indexUpdate : updates.forIndexKeys( populators ) )
{
throw conflict.notAllowed( indexUpdate.indexKey().index() );
try
{
indexUpdate.indexKey().add( indexUpdate );
}
catch ( IndexEntryConflictException conflict )
{
throw conflict.notAllowed( indexUpdate.indexKey().index() );
}
}
}
return true;
};
return true;
};

List<SchemaDescriptor> descriptorList = Arrays.asList( descriptors );
int[] labelIds = descriptorList.stream()
.mapToInt( SchemaDescriptor::keyId )
.toArray();
List<SchemaDescriptor> descriptorList = Arrays.asList( descriptors );
int[] labelIds = descriptorList.stream()
.mapToInt( SchemaDescriptor::keyId )
.toArray();

int[] propertyKeyIds = descriptorList.stream()
.flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) )
.toArray();
int[] propertyKeyIds = descriptorList.stream()
.flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) )
.toArray();

try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() )
{
StoreScan<IOException> storeScan = indexStoreView.visitNodes( labelIds,
propertyKeyId -> PrimitiveIntCollections.contains( propertyKeyIds, propertyKeyId ),
propertyUpdateVisitor, labelUpdateVisitor, true );
storeScan.run();
try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() )
{
StoreScan<IOException> storeScan = indexStoreView.visitNodes( labelIds,
propertyKeyId -> PrimitiveIntCollections.contains( propertyKeyIds, propertyKeyId ),
propertyUpdateVisitor, labelUpdateVisitor, true );
storeScan.run();

for ( IndexPopulatorWithSchema populator : populators )
{
populator.verifyDeferredConstraints( indexStoreView );
populator.setPopulationCompletedSuccessfully( true );
}
}
}
finally
{
Throwable throwable = null;
for ( IndexPopulatorWithSchema populator : populators )
{
populator.verifyDeferredConstraints( indexStoreView );
populator.close( true );
try
{
populator.close();
}
catch ( Throwable t )
{
throwable = Exceptions.chain( throwable, t );
}
if ( throwable != null )
{
throw new RuntimeException( throwable );
}
}
}
}
Expand Down Expand Up @@ -1308,6 +1330,7 @@ private static class IndexPopulatorWithSchema extends IndexPopulator.Adapter imp
private final IndexPopulator populator;
private final SchemaIndexDescriptor index;
private Collection<IndexEntryUpdate<?>> batchedUpdates = new ArrayList<>( batchSize );
private boolean populationCompletedSuccessfully;

IndexPopulatorWithSchema( IndexPopulator populator, SchemaIndexDescriptor index )
{
Expand Down Expand Up @@ -1344,6 +1367,16 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
populator.verifyDeferredConstraints( propertyAccessor );
}

void setPopulationCompletedSuccessfully( boolean value )
{
this.populationCompletedSuccessfully = value;
}

void close() throws IOException
{
close( populationCompletedSuccessfully );
}

@Override
public void close( boolean populationCompletedSuccessfully ) throws IOException
{
Expand Down

0 comments on commit de7b003

Please sign in to comment.