Skip to content

Commit

Permalink
Fixes a race issue with index population and concurrent updates
Browse files Browse the repository at this point in the history
After an index have been fully populated it will now flip into an OnlineIndexProxy
which have a forced idempotency mode instead of the normal mode. The next restart
and clean start of such an online index will use the normal mode. More details as to
why this is can be found in OnlineIndexProxy, copied into this commit message:

There are two online "modes", you might say...
- One is the pure starting of an already online index which was cleanly shut down and all that.
  This scenario is simple and doesn't need this idempotency mode.
- The other is the creation or starting from an uncompleted population, where there will be a point
  in the future where this index will flip from a populating index proxy to an online index proxy.
  This is the problematic part. You see... we have been accidentally relying on the short-lived node
  entity locks for this to work. The scenario where they have saved indexes from getting duplicate
  nodes in them (one from populator and the other from a "normal" update is where a populator is nearing
  its completion and wants to flip. Another thread is in the middle of applying a transaction which
  in the end will feed an update to this index. Index updates are applied after store updates, so
  the populator may see the created node and add it, flips and then the updates comes in to the normal
  online index and gets added again. The read lock here will have the populator wait for the transaction
  to fully apply, e.g. also wait for the index update to reach the population job before adding that node
  and flipping (the update mechanism in a populator is idempotent).
    This strategy has changed slightly in 3.0 where transactions can be applied in whole batches
  and index updates for the whole batch will be applied in the end. This is for everything except
  the above scenario because the short-lived entity locks are per transaction, not per batch, and must
  be so to not interfere with transactions creating constraints inside this batch. We do need to apply
  index updates in batches because nowadays slave update pulling and application isn't special in any
  way, it's simply applying transactions in batches and this needs to be very fast to not have instances
  fall behind in a cluster.
    So the sum of this is that during the session (until the next restart of the db) an index gets created
  it will be in this forced idempotency mode where it applies additions idempotently, which may be
  slightly more costly, but shouldn't make that big of a difference hopefully.
  • Loading branch information
tinwelint committed Jan 21, 2016
1 parent 368fb39 commit a1a1950
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 9 deletions.
Expand Up @@ -183,6 +183,9 @@ RelationshipIterator nodeGetRelationships( long nodeId,
long nodeGetFromUniqueIndexSeek( IndexDescriptor index, Object value ) throws IndexNotFoundKernelException, long nodeGetFromUniqueIndexSeek( IndexDescriptor index, Object value ) throws IndexNotFoundKernelException,
IndexBrokenKernelException; IndexBrokenKernelException;


int nodesCountIndexed( IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException;

boolean nodeExists( long nodeId ); boolean nodeExists( long nodeId );


boolean relationshipExists( long relId ); boolean relationshipExists( long relId );
Expand Down
Expand Up @@ -339,6 +339,13 @@ public long nodeGetFromUniqueIndexSeek(
return nodeId; return nodeId;
} }


@Override
public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException
{
return entityReadOperations.nodesCountIndexed( statement, index, nodeId, value );
}

@Override @Override
public boolean graphHasProperty( KernelStatement state, int propertyKeyId ) public boolean graphHasProperty( KernelStatement state, int propertyKeyId )
{ {
Expand Down
Expand Up @@ -213,6 +213,14 @@ public long nodeGetFromUniqueIndexSeek( KernelStatement state, IndexDescriptor i
return entityReadDelegate.nodeGetFromUniqueIndexSeek( state, index, value ); return entityReadDelegate.nodeGetFromUniqueIndexSeek( state, index, value );
} }


@Override
public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException
{
guard.check();
return entityReadDelegate.nodesCountIndexed( statement, index, nodeId, value );
}

@Override @Override
public boolean graphHasProperty( KernelStatement state, int propertyKeyId ) public boolean graphHasProperty( KernelStatement state, int propertyKeyId )
{ {
Expand Down
Expand Up @@ -576,6 +576,14 @@ public Cursor<NodeItem> nodeCursorGetFromUniqueIndexSeek( IndexDescriptor index,
return dataRead().nodeCursorGetFromUniqueIndexSeek( statement, index, value ); return dataRead().nodeCursorGetFromUniqueIndexSeek( statement, index, value );
} }


@Override
public int nodesCountIndexed( IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException
{
statement.assertOpen();
return dataRead().nodesCountIndexed( statement, index, nodeId, value );
}

// </DataReadCursors> // </DataReadCursors>


// <SchemaRead> // <SchemaRead>
Expand Down
Expand Up @@ -830,6 +830,14 @@ public PrimitiveLongIterator nodesGetFromIndexScan( KernelStatement state, Index
return filterIndexStateChangesForScanOrSeek( state, index, null, committed ); return filterIndexStateChangesForScanOrSeek( state, index, null, committed );
} }


@Override
public int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException
{
IndexReader reader = statement.getStoreStatement().getIndexReader( index );
return reader.countIndexedNodes( nodeId, value );
}

private PrimitiveLongIterator filterExactIndexMatches( final KernelStatement state, IndexDescriptor index, private PrimitiveLongIterator filterExactIndexMatches( final KernelStatement state, IndexDescriptor index,
Object value, PrimitiveLongIterator committed ) Object value, PrimitiveLongIterator committed )
{ {
Expand Down
Expand Up @@ -97,7 +97,7 @@ public IndexProxy createPopulatingIndexProxy( final long ruleId,
monitor.populationCompleteOn( descriptor ); monitor.populationCompleteOn( descriptor );
OnlineIndexProxy onlineProxy = new OnlineIndexProxy( OnlineIndexProxy onlineProxy = new OnlineIndexProxy(
descriptor, config, onlineAccessorFromProvider( providerDescriptor, ruleId, descriptor, config, onlineAccessorFromProvider( providerDescriptor, ruleId,
config, samplingConfig ), storeView, providerDescriptor config, samplingConfig ), storeView, providerDescriptor, true
); );
if ( constraint ) if ( constraint )
{ {
Expand Down Expand Up @@ -135,7 +135,7 @@ public IndexProxy createOnlineIndexProxy( long ruleId,
IndexAccessor onlineAccessor = IndexAccessor onlineAccessor =
onlineAccessorFromProvider( providerDescriptor, ruleId, config, samplingConfig ); onlineAccessorFromProvider( providerDescriptor, ruleId, config, samplingConfig );
IndexProxy proxy; IndexProxy proxy;
proxy = new OnlineIndexProxy( descriptor, config, onlineAccessor, storeView, providerDescriptor ); proxy = new OnlineIndexProxy( descriptor, config, onlineAccessor, storeView, providerDescriptor, false );
proxy = new ContractCheckingIndexProxy( proxy, true ); proxy = new ContractCheckingIndexProxy( proxy, true );
return proxy; return proxy;
} }
Expand Down
Expand Up @@ -46,14 +46,42 @@ public class OnlineIndexProxy implements IndexProxy
private final IndexConfiguration configuration; private final IndexConfiguration configuration;
private final IndexCountsRemover indexCountsRemover; private final IndexCountsRemover indexCountsRemover;


// About this flag: there are two online "modes", you might say...
// - One is the pure starting of an already online index which was cleanly shut down and all that.
// This scenario is simple and doesn't need this idempotency mode.
// - The other is the creation or starting from an uncompleted population, where there will be a point
// in the future where this index will flip from a populating index proxy to an online index proxy.
// This is the problematic part. You see... we have been accidentally relying on the short-lived node
// entity locks for this to work. The scenario where they have saved indexes from getting duplicate
// nodes in them (one from populator and the other from a "normal" update is where a populator is nearing
// its completion and wants to flip. Another thread is in the middle of applying a transaction which
// in the end will feed an update to this index. Index updates are applied after store updates, so
// the populator may see the created node and add it, index flips and then the updates comes in to the normal
// online index and gets added again. The read lock here will have the populator wait for the transaction
// to fully apply, e.g. also wait for the index update to reach the population job before adding that node
// and flipping (the update mechanism in a populator is idempotent).
// This strategy has changed slightly in 3.0 where transactions can be applied in whole batches
// and index updates for the whole batch will be applied in the end. This is fine for everything except
// the above scenario because the short-lived entity locks are per transaction, not per batch, and must
// be so to not interfere with transactions creating constraints inside this batch. We do need to apply
// index updates in batches because nowadays slave update pulling and application isn't special in any
// way, it's simply applying transactions in batches and this needs to be very fast to not have instances
// fall behind in a cluster.
// So the sum of this is that during the session (until the next restart of the db) an index gets created
// it will be in this forced idempotency mode where it applies additions idempotently, which may be
// slightly more costly, but shouldn't make that big of a difference hopefully.
private final boolean forcedIdempotentMode;

public OnlineIndexProxy( IndexDescriptor descriptor, IndexConfiguration configuration, IndexAccessor accessor, public OnlineIndexProxy( IndexDescriptor descriptor, IndexConfiguration configuration, IndexAccessor accessor,
IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor ) IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor,
boolean forcedIdempotentMode )
{ {
this.descriptor = descriptor; this.descriptor = descriptor;
this.storeView = storeView; this.storeView = storeView;
this.providerDescriptor = providerDescriptor; this.providerDescriptor = providerDescriptor;
this.accessor = accessor; this.accessor = accessor;
this.configuration = configuration; this.configuration = configuration;
this.forcedIdempotentMode = forcedIdempotentMode;
this.indexCountsRemover = new IndexCountsRemover( storeView, descriptor ); this.indexCountsRemover = new IndexCountsRemover( storeView, descriptor );
} }


Expand All @@ -65,7 +93,7 @@ public void start()
@Override @Override
public IndexUpdater newUpdater( final IndexUpdateMode mode ) public IndexUpdater newUpdater( final IndexUpdateMode mode )
{ {
return updateCountingUpdater( accessor.newUpdater( mode ) ); return updateCountingUpdater( accessor.newUpdater( forcedIdempotentMode ? IndexUpdateMode.RECOVERY : mode ) );
} }


private IndexUpdater updateCountingUpdater( final IndexUpdater indexUpdater ) private IndexUpdater updateCountingUpdater( final IndexUpdater indexUpdater )
Expand Down
Expand Up @@ -100,6 +100,9 @@ PrimitiveLongIterator nodesGetFromIndexScan( KernelStatement state, IndexDescrip
long nodeGetFromUniqueIndexSeek( KernelStatement state, IndexDescriptor index, Object value ) long nodeGetFromUniqueIndexSeek( KernelStatement state, IndexDescriptor index, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException; throws IndexNotFoundKernelException, IndexBrokenKernelException;


int nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long nodeId, Object value )
throws IndexNotFoundKernelException, IndexBrokenKernelException;

boolean graphHasProperty( KernelStatement state, int propertyKeyId ); boolean graphHasProperty( KernelStatement state, int propertyKeyId );


Object graphGetProperty( KernelStatement state, int propertyKeyId ); Object graphGetProperty( KernelStatement state, int propertyKeyId );
Expand Down
Expand Up @@ -179,7 +179,7 @@ public void shouldBringIndexOnlineAndFlipOverToIndexAccessor() throws Exception
InOrder order = inOrder( populator, accessor, updater); InOrder order = inOrder( populator, accessor, updater);
order.verify( populator ).create(); order.verify( populator ).create();
order.verify( populator ).close( true ); order.verify( populator ).close( true );
order.verify( accessor ).newUpdater( IndexUpdateMode.ONLINE ); order.verify( accessor ).newUpdater( IndexUpdateMode.RECOVERY );
order.verify( updater ).process( add( 10, "foo" ) ); order.verify( updater ).process( add( 10, "foo" ) );
order.verify( updater ).close(); order.verify( updater ).close();
} }
Expand Down Expand Up @@ -588,11 +588,11 @@ public void closingOfValidatedUpdatesShouldCloseUpdaters() throws Exception


IndexAccessor accessor1 = mock( IndexAccessor.class ); IndexAccessor accessor1 = mock( IndexAccessor.class );
IndexUpdater updater1 = mock( IndexUpdater.class ); IndexUpdater updater1 = mock( IndexUpdater.class );
when( accessor1.newUpdater( IndexUpdateMode.ONLINE ) ).thenReturn( updater1 ); when( accessor1.newUpdater( any( IndexUpdateMode.class ) ) ).thenReturn( updater1 );


IndexAccessor accessor2 = mock( IndexAccessor.class ); IndexAccessor accessor2 = mock( IndexAccessor.class );
IndexUpdater updater2 = mock( IndexUpdater.class ); IndexUpdater updater2 = mock( IndexUpdater.class );
when( accessor2.newUpdater( IndexUpdateMode.ONLINE ) ).thenReturn( updater2 ); when( accessor2.newUpdater( any( IndexUpdateMode.class ) ) ).thenReturn( updater2 );


when( indexProvider.getOnlineAccessor( eq( 1L ), any( IndexConfiguration.class ), when( indexProvider.getOnlineAccessor( eq( 1L ), any( IndexConfiguration.class ),
any( IndexSamplingConfig.class ) ) ).thenReturn( accessor1 ); any( IndexSamplingConfig.class ) ) ).thenReturn( accessor1 );
Expand Down
Expand Up @@ -44,7 +44,8 @@ public class OnlineIndexProxyTest
public void shouldRemoveIndexCountsWhenTheIndexItselfIsDropped() throws IOException public void shouldRemoveIndexCountsWhenTheIndexItselfIsDropped() throws IOException
{ {
// given // given
OnlineIndexProxy index = new OnlineIndexProxy( descriptor, config, accessor, storeView, providerDescriptor ); OnlineIndexProxy index = new OnlineIndexProxy( descriptor, config, accessor,
storeView, providerDescriptor, false );


// when // when
index.drop(); index.drop();
Expand Down
Expand Up @@ -240,7 +240,7 @@ InMemoryIndexImplementation snapshot()
} }


@Override @Override
public int countIndexedNodes( long nodeId, Object propertyValue ) protected int doCountIndexedNodes( long nodeId, Object propertyValue )
{ {
Set<Long> candidates = data().get( propertyValue ); Set<Long> candidates = data().get( propertyValue );
return candidates != null && candidates.contains( nodeId ) ? 1 : 0; return candidates != null && candidates.contains( nodeId ) ? 1 : 0;
Expand Down
Expand Up @@ -48,6 +48,14 @@ final void remove( long nodeId, Object propertyValue )
doRemove( encode( propertyValue ), nodeId ); doRemove( encode( propertyValue ), nodeId );
} }


@Override
public final int countIndexedNodes( long nodeId, Object propertyValue )
{
return doCountIndexedNodes( nodeId, encode( propertyValue ) );
}

protected abstract int doCountIndexedNodes( long nodeId, Object encode );

abstract PrimitiveLongIterator doIndexSeek( Object propertyValue ); abstract PrimitiveLongIterator doIndexSeek( Object propertyValue );


abstract boolean doAdd( Object propertyValue, long nodeId, boolean applyIdempotently ); abstract boolean doAdd( Object propertyValue, long nodeId, boolean applyIdempotently );
Expand Down
158 changes: 158 additions & 0 deletions community/neo4j/src/test/java/schema/IndexPopulationFlipRaceIT.java
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package schema;

import org.junit.Rule;
import org.junit.Test;

import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.test.DatabaseRule;
import org.neo4j.test.EmbeddedDatabaseRule;
import org.neo4j.test.RandomRule;

import static org.junit.Assert.assertEquals;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.graphdb.Label.label;

public class IndexPopulationFlipRaceIT
{
private static final int NODES_PER_INDEX = 10;

@Rule
public final DatabaseRule db = new EmbeddedDatabaseRule();
@Rule
public final RandomRule random = new RandomRule();

@Test
public void shouldAtomicallyFlipMultipleIndexes() throws Exception
{
// A couple of times since this is probabilistic, but also because there seems to be a difference
// in timings between the first time and all others... which is perhaps super obvious to some, but not to me.
for ( int i = 0; i < 10; i++ )
{
// GIVEN
createIndexesButDontWaitForThemToFullyPopulate( i );

// WHEN
Pair<long[],long[]> data = createDataThatGoesIntoToThoseIndexes( i );

// THEN
awaitIndexes();
verifyThatThereAreExactlyOneIndexEntryPerNodeInTheIndexes( i, data );
}
}

private void awaitIndexes()
{
try ( Transaction tx = db.beginTx() )
{
db.schema().awaitIndexesOnline( 10, SECONDS );
tx.success();
}
}

private void createIndexesButDontWaitForThemToFullyPopulate( int i )
{
try ( Transaction tx = db.beginTx() )
{
db.schema().indexFor( labelA( i ) ).on( keyA( i ) ).create();

if ( random.nextBoolean() )
{
db.schema().indexFor( labelB( i ) ).on( keyB( i ) ).create();
}
else
{
db.schema().constraintFor( labelB( i ) ).assertPropertyIsUnique( keyB( i ) ).create();
}
tx.success();
}
}

private String keyB( int i )
{
return "key_b" + i;
}

private Label labelB( int i )
{
return label( "Label_b" + i );
}

private String keyA( int i )
{
return "key_a" + i;
}

private Label labelA( int i )
{
return label( "Label_a" + i );
}

private Pair<long[],long[]> createDataThatGoesIntoToThoseIndexes( int i )
{
long[] dataA = new long[NODES_PER_INDEX];
long[] dataB = new long[NODES_PER_INDEX];
for ( int t = 0; t < NODES_PER_INDEX; t++ )
{
try ( Transaction tx = db.beginTx() )
{
Node nodeA = db.createNode( labelA( i ) );
nodeA.setProperty( keyA( i ), dataA[t] = nodeA.getId() );
Node nodeB = db.createNode( labelB( i ) );
nodeB.setProperty( keyB( i ), dataB[t] = nodeB.getId() );
tx.success();
}
}
return Pair.of( dataA, dataB );
}

private void verifyThatThereAreExactlyOneIndexEntryPerNodeInTheIndexes( int i, Pair<long[],long[]> data )
throws Exception
{
try ( KernelTransaction tx = db.getDependencyResolver().resolveDependency( KernelAPI.class ).newTransaction();
Statement statement = tx.acquireStatement() )
{
int labelAId = statement.readOperations().labelGetForName( labelA( i ).name() );
int keyAId = statement.readOperations().propertyKeyGetForName( keyA( i ) );
int labelBId = statement.readOperations().labelGetForName( labelB( i ).name() );
int keyBId = statement.readOperations().propertyKeyGetForName( keyB( i ) );

for ( int j = 0; j < NODES_PER_INDEX; j++ )
{
long nodeAId = data.first()[j];
assertEquals( 1, statement.readOperations().nodesCountIndexed(
new IndexDescriptor( labelAId, keyAId ), nodeAId, nodeAId ) );
long nodeBId = data.other()[j];
assertEquals( 1, statement.readOperations().nodesCountIndexed(
new IndexDescriptor( labelBId, keyBId ), nodeBId, nodeBId ) );
}
}
}
}

0 comments on commit a1a1950

Please sign in to comment.