From de7b0034c1847c72ca017f2aa91f07208b3c6430 Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Wed, 4 Jul 2018 09:02:17 +0200 Subject: [PATCH] BatchInserterImpl always close populator on shutdown Previously if exceptions was thrown when rebuilding indexes, for example because of uniqueness constraint violation, then populator would not be closed and open index files would prevent page cache from shutting down. --- .../internal/BatchInserterImpl.java | 117 +++++++++++------- 1 file changed, 75 insertions(+), 42 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java index afabb92b7c058..37d2e5baf2cd0 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java @@ -45,6 +45,7 @@ import org.neo4j.graphdb.schema.ConstraintDefinition; import org.neo4j.graphdb.schema.IndexCreator; import org.neo4j.graphdb.schema.IndexDefinition; +import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.IteratorWrapper; import org.neo4j.helpers.collection.Visitor; @@ -490,59 +491,80 @@ private void repopulateAllIndexes() throws IOException, IndexEntryConflictExcept final IndexRule[] rules = getIndexesNeedingPopulation(); final List populators = new ArrayList<>( rules.length ); - - final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[rules.length]; - - for ( int i = 0; i < rules.length; i++ ) + try { - IndexRule rule = rules[i]; - SchemaIndexDescriptor index = rule.getIndexDescriptor(); - descriptors[i] = index.schema(); - IndexPopulator populator = schemaIndexProviders.lookup( rule.getProviderDescriptor() ) - .getPopulator( rule.getId(), index, new IndexSamplingConfig( config ) ); - populator.create(); - populators.add( new IndexPopulatorWithSchema( populator, index ) ); - } + final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[rules.length]; - Visitor propertyUpdateVisitor = updates -> - { - // Do a lookup from which property has changed to a list of indexes worried about that property. - // We do not need to load additional properties as the NodeUpdates for a full node store scan already - // include all properties for the node. - for ( IndexEntryUpdate indexUpdate : updates.forIndexKeys( populators ) ) + for ( int i = 0; i < rules.length; i++ ) { - try - { - indexUpdate.indexKey().add( indexUpdate ); - } - catch ( IndexEntryConflictException conflict ) + IndexRule rule = rules[i]; + SchemaIndexDescriptor index = rule.getIndexDescriptor(); + descriptors[i] = index.schema(); + IndexPopulator populator = schemaIndexProviders.lookup( rule.getProviderDescriptor() ) + .getPopulator( rule.getId(), index, new IndexSamplingConfig( config ) ); + populator.create(); + populators.add( new IndexPopulatorWithSchema( populator, index ) ); + } + + Visitor propertyUpdateVisitor = updates -> + { + // Do a lookup from which property has changed to a list of indexes worried about that property. + // We do not need to load additional properties as the NodeUpdates for a full node store scan already + // include all properties for the node. + for ( IndexEntryUpdate indexUpdate : updates.forIndexKeys( populators ) ) { - throw conflict.notAllowed( indexUpdate.indexKey().index() ); + try + { + indexUpdate.indexKey().add( indexUpdate ); + } + catch ( IndexEntryConflictException conflict ) + { + throw conflict.notAllowed( indexUpdate.indexKey().index() ); + } } - } - return true; - }; + return true; + }; - List descriptorList = Arrays.asList( descriptors ); - int[] labelIds = descriptorList.stream() - .mapToInt( SchemaDescriptor::keyId ) - .toArray(); + List descriptorList = Arrays.asList( descriptors ); + int[] labelIds = descriptorList.stream() + .mapToInt( SchemaDescriptor::keyId ) + .toArray(); - int[] propertyKeyIds = descriptorList.stream() - .flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) ) - .toArray(); + int[] propertyKeyIds = descriptorList.stream() + .flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) ) + .toArray(); - try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() ) - { - StoreScan storeScan = indexStoreView.visitNodes( labelIds, - propertyKeyId -> PrimitiveIntCollections.contains( propertyKeyIds, propertyKeyId ), - propertyUpdateVisitor, labelUpdateVisitor, true ); - storeScan.run(); + try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() ) + { + StoreScan storeScan = indexStoreView.visitNodes( labelIds, + propertyKeyId -> PrimitiveIntCollections.contains( propertyKeyIds, propertyKeyId ), + propertyUpdateVisitor, labelUpdateVisitor, true ); + storeScan.run(); + for ( IndexPopulatorWithSchema populator : populators ) + { + populator.verifyDeferredConstraints( indexStoreView ); + populator.setPopulationCompletedSuccessfully( true ); + } + } + } + finally + { + Throwable throwable = null; for ( IndexPopulatorWithSchema populator : populators ) { - populator.verifyDeferredConstraints( indexStoreView ); - populator.close( true ); + try + { + populator.close(); + } + catch ( Throwable t ) + { + throwable = Exceptions.chain( throwable, t ); + } + if ( throwable != null ) + { + throw new RuntimeException( throwable ); + } } } } @@ -1308,6 +1330,7 @@ private static class IndexPopulatorWithSchema extends IndexPopulator.Adapter imp private final IndexPopulator populator; private final SchemaIndexDescriptor index; private Collection> batchedUpdates = new ArrayList<>( batchSize ); + private boolean populationCompletedSuccessfully; IndexPopulatorWithSchema( IndexPopulator populator, SchemaIndexDescriptor index ) { @@ -1344,6 +1367,16 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) populator.verifyDeferredConstraints( propertyAccessor ); } + void setPopulationCompletedSuccessfully( boolean value ) + { + this.populationCompletedSuccessfully = value; + } + + void close() throws IOException + { + close( populationCompletedSuccessfully ); + } + @Override public void close( boolean populationCompletedSuccessfully ) throws IOException {