diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java index b995544e79f5b..50e59414bd561 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java @@ -183,6 +183,9 @@ RelationshipIterator nodeGetRelationships( long nodeId, long nodeGetFromUniqueIndexSeek( IndexDescriptor index, Object value ) throws IndexNotFoundKernelException, IndexBrokenKernelException; + int nodesCountIndexed( IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException; + boolean nodeExists( long nodeId ); boolean relationshipExists( long relId ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java index 392db93caa297..bc76b4edacdc9 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java @@ -339,6 +339,13 @@ public long nodeGetFromUniqueIndexSeek( return nodeId; } + @Override + public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException + { + return entityReadOperations.nodesCountIndexed( statement, index, nodeId, value ); + } + @Override public boolean graphHasProperty( KernelStatement state, int propertyKeyId ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java index cbcfbd97c4c15..85ffbfa1dc93b 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java @@ -213,6 +213,14 @@ public long nodeGetFromUniqueIndexSeek( KernelStatement state, IndexDescriptor i return entityReadDelegate.nodeGetFromUniqueIndexSeek( state, index, value ); } + @Override + public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException + { + guard.check(); + return entityReadDelegate.nodesCountIndexed( statement, index, nodeId, value ); + } + @Override public boolean graphHasProperty( KernelStatement state, int propertyKeyId ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java index 51cb024e0f48a..79860c9a3be76 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java @@ -576,6 +576,14 @@ public Cursor nodeCursorGetFromUniqueIndexSeek( IndexDescriptor index, return dataRead().nodeCursorGetFromUniqueIndexSeek( statement, index, value ); } + @Override + public int nodesCountIndexed( IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException + { + statement.assertOpen(); + return dataRead().nodesCountIndexed( statement, index, nodeId, value ); + } + // // diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java index 7de8d8e6c0bec..085eed2622937 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java @@ -830,6 +830,14 @@ public PrimitiveLongIterator nodesGetFromIndexScan( KernelStatement state, Index return filterIndexStateChangesForScanOrSeek( state, index, null, committed ); } + @Override + public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException + { + IndexReader reader = statement.getStoreStatement().getIndexReader( index ); + return reader.countIndexedNodes( nodeId, value ); + } + private PrimitiveLongIterator filterExactIndexMatches( final KernelStatement state, IndexDescriptor index, Object value, PrimitiveLongIterator committed ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxyCreator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxyCreator.java index 19963c3d48162..12197479ae0a1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxyCreator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxyCreator.java @@ -97,7 +97,7 @@ public IndexProxy createPopulatingIndexProxy( final long ruleId, monitor.populationCompleteOn( descriptor ); OnlineIndexProxy onlineProxy = new OnlineIndexProxy( descriptor, config, onlineAccessorFromProvider( providerDescriptor, ruleId, - config, samplingConfig ), storeView, providerDescriptor + config, samplingConfig ), storeView, providerDescriptor, true ); if ( constraint ) { @@ -135,7 +135,7 @@ public IndexProxy createOnlineIndexProxy( long ruleId, IndexAccessor onlineAccessor = onlineAccessorFromProvider( providerDescriptor, ruleId, config, samplingConfig ); IndexProxy proxy; - proxy = new OnlineIndexProxy( descriptor, config, onlineAccessor, storeView, providerDescriptor ); + proxy = new OnlineIndexProxy( descriptor, config, onlineAccessor, storeView, providerDescriptor, false ); proxy = new ContractCheckingIndexProxy( proxy, true ); return proxy; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java index db5fd012f4a79..3431c5c0ac454 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java @@ -46,14 +46,42 @@ public class OnlineIndexProxy implements IndexProxy private final IndexConfiguration configuration; private final IndexCountsRemover indexCountsRemover; + // About this flag: there are two online "modes", you might say... + // - One is the pure starting of an already online index which was cleanly shut down and all that. + // This scenario is simple and doesn't need this idempotency mode. + // - The other is the creation or starting from an uncompleted population, where there will be a point + // in the future where this index will flip from a populating index proxy to an online index proxy. + // This is the problematic part. You see... we have been accidentally relying on the short-lived node + // entity locks for this to work. The scenario where they have saved indexes from getting duplicate + // nodes in them (one from populator and the other from a "normal" update is where a populator is nearing + // its completion and wants to flip. Another thread is in the middle of applying a transaction which + // in the end will feed an update to this index. Index updates are applied after store updates, so + // the populator may see the created node and add it, index flips and then the updates comes in to the normal + // online index and gets added again. The read lock here will have the populator wait for the transaction + // to fully apply, e.g. also wait for the index update to reach the population job before adding that node + // and flipping (the update mechanism in a populator is idempotent). + // This strategy has changed slightly in 3.0 where transactions can be applied in whole batches + // and index updates for the whole batch will be applied in the end. This is fine for everything except + // the above scenario because the short-lived entity locks are per transaction, not per batch, and must + // be so to not interfere with transactions creating constraints inside this batch. We do need to apply + // index updates in batches because nowadays slave update pulling and application isn't special in any + // way, it's simply applying transactions in batches and this needs to be very fast to not have instances + // fall behind in a cluster. + // So the sum of this is that during the session (until the next restart of the db) an index gets created + // it will be in this forced idempotency mode where it applies additions idempotently, which may be + // slightly more costly, but shouldn't make that big of a difference hopefully. + private final boolean forcedIdempotentMode; + public OnlineIndexProxy( IndexDescriptor descriptor, IndexConfiguration configuration, IndexAccessor accessor, - IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor ) + IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor, + boolean forcedIdempotentMode ) { this.descriptor = descriptor; this.storeView = storeView; this.providerDescriptor = providerDescriptor; this.accessor = accessor; this.configuration = configuration; + this.forcedIdempotentMode = forcedIdempotentMode; this.indexCountsRemover = new IndexCountsRemover( storeView, descriptor ); } @@ -65,7 +93,7 @@ public void start() @Override public IndexUpdater newUpdater( final IndexUpdateMode mode ) { - return updateCountingUpdater( accessor.newUpdater( mode ) ); + return updateCountingUpdater( accessor.newUpdater( forcedIdempotentMode ? IndexUpdateMode.RECOVERY : mode ) ); } private IndexUpdater updateCountingUpdater( final IndexUpdater indexUpdater ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java index 062ef342a0b3b..496a342471d38 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java @@ -100,6 +100,9 @@ PrimitiveLongIterator nodesGetFromIndexScan( KernelStatement state, IndexDescrip long nodeGetFromUniqueIndexSeek( KernelStatement state, IndexDescriptor index, Object value ) throws IndexNotFoundKernelException, IndexBrokenKernelException; + int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value ) + throws IndexNotFoundKernelException, IndexBrokenKernelException; + boolean graphHasProperty( KernelStatement state, int propertyKeyId ); Object graphGetProperty( KernelStatement state, int propertyKeyId ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java index 72cd318147a98..f06d48075a463 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexingServiceTest.java @@ -179,7 +179,7 @@ public void shouldBringIndexOnlineAndFlipOverToIndexAccessor() throws Exception InOrder order = inOrder( populator, accessor, updater); order.verify( populator ).create(); order.verify( populator ).close( true ); - order.verify( accessor ).newUpdater( IndexUpdateMode.ONLINE ); + order.verify( accessor ).newUpdater( IndexUpdateMode.RECOVERY ); order.verify( updater ).process( add( 10, "foo" ) ); order.verify( updater ).close(); } @@ -588,11 +588,11 @@ public void closingOfValidatedUpdatesShouldCloseUpdaters() throws Exception IndexAccessor accessor1 = mock( IndexAccessor.class ); IndexUpdater updater1 = mock( IndexUpdater.class ); - when( accessor1.newUpdater( IndexUpdateMode.ONLINE ) ).thenReturn( updater1 ); + when( accessor1.newUpdater( any( IndexUpdateMode.class ) ) ).thenReturn( updater1 ); IndexAccessor accessor2 = mock( IndexAccessor.class ); IndexUpdater updater2 = mock( IndexUpdater.class ); - when( accessor2.newUpdater( IndexUpdateMode.ONLINE ) ).thenReturn( updater2 ); + when( accessor2.newUpdater( any( IndexUpdateMode.class ) ) ).thenReturn( updater2 ); when( indexProvider.getOnlineAccessor( eq( 1L ), any( IndexConfiguration.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor1 ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxyTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxyTest.java index 58828e384030d..19640959ddb92 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxyTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxyTest.java @@ -44,7 +44,8 @@ public class OnlineIndexProxyTest public void shouldRemoveIndexCountsWhenTheIndexItselfIsDropped() throws IOException { // given - OnlineIndexProxy index = new OnlineIndexProxy( descriptor, config, accessor, storeView, providerDescriptor ); + OnlineIndexProxy index = new OnlineIndexProxy( descriptor, config, accessor, + storeView, providerDescriptor, false ); // when index.drop(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/HashBasedIndex.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/HashBasedIndex.java index 03a60e98c80fa..bb7f71677c78c 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/HashBasedIndex.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/HashBasedIndex.java @@ -240,7 +240,7 @@ InMemoryIndexImplementation snapshot() } @Override - public int countIndexedNodes( long nodeId, Object propertyValue ) + protected int doCountIndexedNodes( long nodeId, Object propertyValue ) { Set candidates = data().get( propertyValue ); return candidates != null && candidates.contains( nodeId ) ? 1 : 0; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/InMemoryIndexImplementation.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/InMemoryIndexImplementation.java index ad545850e882d..afaa849d3a08d 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/InMemoryIndexImplementation.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/inmemory/InMemoryIndexImplementation.java @@ -48,6 +48,14 @@ final void remove( long nodeId, Object propertyValue ) doRemove( encode( propertyValue ), nodeId ); } + @Override + public final int countIndexedNodes( long nodeId, Object propertyValue ) + { + return doCountIndexedNodes( nodeId, encode( propertyValue ) ); + } + + protected abstract int doCountIndexedNodes( long nodeId, Object encode ); + abstract PrimitiveLongIterator doIndexSeek( Object propertyValue ); abstract boolean doAdd( Object propertyValue, long nodeId, boolean applyIdempotently ); diff --git a/community/neo4j/src/test/java/schema/IndexPopulationFlipRaceIT.java b/community/neo4j/src/test/java/schema/IndexPopulationFlipRaceIT.java new file mode 100644 index 0000000000000..833c833bf2ea9 --- /dev/null +++ b/community/neo4j/src/test/java/schema/IndexPopulationFlipRaceIT.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package schema; + +import org.junit.Rule; +import org.junit.Test; + +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Transaction; +import org.neo4j.helpers.collection.Pair; +import org.neo4j.kernel.api.KernelAPI; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.Statement; +import org.neo4j.kernel.api.index.IndexDescriptor; +import org.neo4j.test.DatabaseRule; +import org.neo4j.test.EmbeddedDatabaseRule; +import org.neo4j.test.RandomRule; + +import static org.junit.Assert.assertEquals; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import static org.neo4j.graphdb.Label.label; + +public class IndexPopulationFlipRaceIT +{ + private static final int NODES_PER_INDEX = 10; + + @Rule + public final DatabaseRule db = new EmbeddedDatabaseRule(); + @Rule + public final RandomRule random = new RandomRule(); + + @Test + public void shouldAtomicallyFlipMultipleIndexes() throws Exception + { + // A couple of times since this is probabilistic, but also because there seems to be a difference + // in timings between the first time and all others... which is perhaps super obvious to some, but not to me. + for ( int i = 0; i < 10; i++ ) + { + // GIVEN + createIndexesButDontWaitForThemToFullyPopulate( i ); + + // WHEN + Pair data = createDataThatGoesIntoToThoseIndexes( i ); + + // THEN + awaitIndexes(); + verifyThatThereAreExactlyOneIndexEntryPerNodeInTheIndexes( i, data ); + } + } + + private void awaitIndexes() + { + try ( Transaction tx = db.beginTx() ) + { + db.schema().awaitIndexesOnline( 10, SECONDS ); + tx.success(); + } + } + + private void createIndexesButDontWaitForThemToFullyPopulate( int i ) + { + try ( Transaction tx = db.beginTx() ) + { + db.schema().indexFor( labelA( i ) ).on( keyA( i ) ).create(); + + if ( random.nextBoolean() ) + { + db.schema().indexFor( labelB( i ) ).on( keyB( i ) ).create(); + } + else + { + db.schema().constraintFor( labelB( i ) ).assertPropertyIsUnique( keyB( i ) ).create(); + } + tx.success(); + } + } + + private String keyB( int i ) + { + return "key_b" + i; + } + + private Label labelB( int i ) + { + return label( "Label_b" + i ); + } + + private String keyA( int i ) + { + return "key_a" + i; + } + + private Label labelA( int i ) + { + return label( "Label_a" + i ); + } + + private Pair createDataThatGoesIntoToThoseIndexes( int i ) + { + long[] dataA = new long[NODES_PER_INDEX]; + long[] dataB = new long[NODES_PER_INDEX]; + for ( int t = 0; t < NODES_PER_INDEX; t++ ) + { + try ( Transaction tx = db.beginTx() ) + { + Node nodeA = db.createNode( labelA( i ) ); + nodeA.setProperty( keyA( i ), dataA[t] = nodeA.getId() ); + Node nodeB = db.createNode( labelB( i ) ); + nodeB.setProperty( keyB( i ), dataB[t] = nodeB.getId() ); + tx.success(); + } + } + return Pair.of( dataA, dataB ); + } + + private void verifyThatThereAreExactlyOneIndexEntryPerNodeInTheIndexes( int i, Pair data ) + throws Exception + { + try ( KernelTransaction tx = db.getDependencyResolver().resolveDependency( KernelAPI.class ).newTransaction(); + Statement statement = tx.acquireStatement() ) + { + int labelAId = statement.readOperations().labelGetForName( labelA( i ).name() ); + int keyAId = statement.readOperations().propertyKeyGetForName( keyA( i ) ); + int labelBId = statement.readOperations().labelGetForName( labelB( i ).name() ); + int keyBId = statement.readOperations().propertyKeyGetForName( keyB( i ) ); + + for ( int j = 0; j < NODES_PER_INDEX; j++ ) + { + long nodeAId = data.first()[j]; + assertEquals( 1, statement.readOperations().nodesCountIndexed( + new IndexDescriptor( labelAId, keyAId ), nodeAId, nodeAId ) ); + long nodeBId = data.other()[j]; + assertEquals( 1, statement.readOperations().nodesCountIndexed( + new IndexDescriptor( labelBId, keyBId ), nodeBId, nodeBId ) ); + } + } + } +}