Skip to content

Commit

Permalink
Track nodes changes that can influence index statistics in IndexStati…
Browse files Browse the repository at this point in the history
…sticsTest

In some randomised nodes changes it can happen that we will choose the node
that is already part of the index. That will generate index update on
transaction commit and will influence index updates statistics.
Those index updates were missed before and were causing sporadic test failures.
  • Loading branch information
MishaDemianenko committed Dec 27, 2017
1 parent d9be40f commit 03a8866
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 68 deletions.
Expand Up @@ -42,6 +42,8 @@


import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.exceptions.KernelException;
Expand Down Expand Up @@ -122,7 +124,8 @@ public void shouldProvideIndexStatisticsForDataCreatedWhenPopulationBeforeTheInd
createSomePersons(); createSomePersons();


// when // when
IndexDescriptor index = awaitOnline( createIndex( "Person", "name" ) ); IndexDescriptor index = createIndex( "Person", "name" );
awaitIndexesOnline();


// then // then
assertEquals( 0.75d, indexSelectivity( index ), DOUBLE_ERROR_TOLERANCE ); assertEquals( 0.75d, indexSelectivity( index ), DOUBLE_ERROR_TOLERANCE );
Expand All @@ -134,7 +137,8 @@ public void shouldProvideIndexStatisticsForDataCreatedWhenPopulationBeforeTheInd
public void shouldNotSeeDataCreatedAfterPopulation() throws KernelException public void shouldNotSeeDataCreatedAfterPopulation() throws KernelException
{ {
// given // given
IndexDescriptor index = awaitOnline( createIndex( "Person", "name" ) ); IndexDescriptor index = createIndex( "Person", "name" );
awaitIndexesOnline();


// when // when
createSomePersons(); createSomePersons();
Expand All @@ -151,7 +155,8 @@ public void shouldProvideIndexStatisticsForDataSeenDuringPopulationAndIgnoreData
{ {
// given // given
createSomePersons(); createSomePersons();
IndexDescriptor index = awaitOnline( createIndex( "Person", "name" ) ); IndexDescriptor index = createIndex( "Person", "name" );
awaitIndexesOnline();


// when // when
createSomePersons(); createSomePersons();
Expand All @@ -167,7 +172,9 @@ public void shouldRemoveIndexStatisticsAfterIndexIsDeleted() throws KernelExcept
{ {
// given // given
createSomePersons(); createSomePersons();
IndexDescriptor index = awaitOnline( createIndex( "Person", "name" ) ); IndexDescriptor index = createIndex( "Person", "name" );
awaitIndexesOnline();

SchemaStorage storage = new SchemaStorage( neoStores().getSchemaStore() ); SchemaStorage storage = new SchemaStorage( neoStores().getSchemaStore() );
long indexId = storage.indexGetForSchema( index ).getId(); long indexId = storage.indexGetForSchema( index ).getId();


Expand Down Expand Up @@ -198,7 +205,8 @@ public void shouldProvideIndexSelectivityWhenThereAreManyDuplicates() throws Exc
int created = repeatCreateNamedPeopleFor( NAMES.length * CREATION_MULTIPLIER ).length; int created = repeatCreateNamedPeopleFor( NAMES.length * CREATION_MULTIPLIER ).length;


// when // when
IndexDescriptor index = awaitOnline( createIndex( "Person", "name" ) ); IndexDescriptor index = createIndex( "Person", "name" );
awaitIndexesOnline();


// then // then
double expectedSelectivity = UNIQUE_NAMES / created; double expectedSelectivity = UNIQUE_NAMES / created;
Expand All @@ -216,7 +224,7 @@ public void shouldProvideIndexStatisticsWhenIndexIsBuiltViaPopulationAndConcurre
// when populating while creating // when populating while creating
IndexDescriptor index = createIndex( "Person", "name" ); IndexDescriptor index = createIndex( "Person", "name" );
final UpdatesTracker updatesTracker = executeCreations( index, CREATION_MULTIPLIER ); final UpdatesTracker updatesTracker = executeCreations( index, CREATION_MULTIPLIER );
awaitOnline( index ); awaitIndexesOnline();


// then // then
int seenWhilePopulating = initialNodes + updatesTracker.createdDuringPopulation(); int seenWhilePopulating = initialNodes + updatesTracker.createdDuringPopulation();
Expand All @@ -236,7 +244,7 @@ public void shouldProvideIndexStatisticsWhenIndexIsBuiltViaPopulationAndConcurre
// when populating while creating // when populating while creating
IndexDescriptor index = createIndex( "Person", "name" ); IndexDescriptor index = createIndex( "Person", "name" );
UpdatesTracker updatesTracker = executeCreationsAndDeletions( nodes, index, CREATION_MULTIPLIER ); UpdatesTracker updatesTracker = executeCreationsAndDeletions( nodes, index, CREATION_MULTIPLIER );
awaitOnline( index ); awaitIndexesOnline();


// then // then
int seenWhilePopulating = int seenWhilePopulating =
Expand All @@ -258,7 +266,7 @@ public void shouldProvideIndexStatisticsWhenIndexIsBuiltViaPopulationAndConcurre
// when populating while creating // when populating while creating
IndexDescriptor index = createIndex( "Person", "name" ); IndexDescriptor index = createIndex( "Person", "name" );
UpdatesTracker updatesTracker = executeCreationsAndUpdates( nodes, index, CREATION_MULTIPLIER ); UpdatesTracker updatesTracker = executeCreationsAndUpdates( nodes, index, CREATION_MULTIPLIER );
awaitOnline( index ); awaitIndexesOnline();


// then // then
int seenWhilePopulating = initialNodes + updatesTracker.createdDuringPopulation(); int seenWhilePopulating = initialNodes + updatesTracker.createdDuringPopulation();
Expand Down Expand Up @@ -294,7 +302,7 @@ public void shouldWorkWhileHavingHeavyConcurrentUpdates() throws Exception
{ {
result.add( future.get() ); result.add( future.get() );
} }
awaitOnline( index ); awaitIndexesOnline();


executorService.awaitTermination( 1, TimeUnit.SECONDS ); executorService.awaitTermination( 1, TimeUnit.SECONDS );
executorService.shutdown(); executorService.shutdown();
Expand All @@ -306,7 +314,7 @@ public void shouldWorkWhileHavingHeavyConcurrentUpdates() throws Exception
double expectedSelectivity = UNIQUE_NAMES / seenWhilePopulating; double expectedSelectivity = UNIQUE_NAMES / seenWhilePopulating;
assertCorrectIndexSelectivity( expectedSelectivity, indexSelectivity( index ), doubleTolerance ); assertCorrectIndexSelectivity( expectedSelectivity, indexSelectivity( index ), doubleTolerance );
assertCorrectIndexSize( "Tracker had " + result, seenWhilePopulating, indexSize( index ), tolerance ); assertCorrectIndexSize( "Tracker had " + result, seenWhilePopulating, indexSize( index ), tolerance );
int expectedIndexUpdates = result.deletedAfterPopulation() + result.createdAfterPopulation(); int expectedIndexUpdates = result.deletedAfterPopulation() + result.createdAfterPopulation() + result.updatedAfterPopulation();
assertCorrectIndexUpdates( "Tracker had " + result, expectedIndexUpdates, indexUpdates( index ), tolerance ); assertCorrectIndexUpdates( "Tracker had " + result, expectedIndexUpdates, indexUpdates( index ), tolerance );
} }


Expand All @@ -322,17 +330,20 @@ private void deleteNode( long nodeId ) throws KernelException
} }
} }


private void changeName( long nodeId, String propertyKeyName, Object newValue ) throws KernelException private boolean changeName( long nodeId, String propertyKeyName, Object newValue ) throws KernelException
{ {
boolean changeIndexedNode = false;
try ( Transaction tx = db.beginTx() ) try ( Transaction tx = db.beginTx() )
{ {
try ( Statement statement = bridge.get() ) Node node = db.getNodeById( nodeId );
if ( node.hasProperty( propertyKeyName ) )
{ {
int propertyKeyId = statement.tokenWriteOperations().propertyKeyGetOrCreateForName( propertyKeyName ); changeIndexedNode = true;
statement.dataWriteOperations().nodeSetProperty( nodeId, propertyKeyId, Values.of( newValue ) );
} }
node.setProperty( propertyKeyName, newValue );
tx.success(); tx.success();
} }
return changeIndexedNode;
} }


private int createNamedPeople( long[] nodes, int offset ) throws KernelException private int createNamedPeople( long[] nodes, int offset ) throws KernelException
Expand Down Expand Up @@ -507,41 +518,12 @@ private NeoStores neoStores()
.testAccessNeoStores(); .testAccessNeoStores();
} }


private IndexDescriptor awaitOnline( IndexDescriptor index ) throws KernelException private void awaitIndexesOnline()
{ {
long start = System.currentTimeMillis(); try ( Transaction transaction = db.beginTx() )
long end = start + TimeUnit.MINUTES.toMillis( 3 );
while ( System.currentTimeMillis() < end )
{ {
try ( Transaction tx = db.beginTx() ) db.schema().awaitIndexesOnline(3, TimeUnit.MINUTES );
{
try ( Statement statement = bridge.get() )
{
switch ( statement.readOperations().indexGetState( index ) )
{
case ONLINE:
return index;

case FAILED:
throw new IllegalStateException( "Index failed instead of becoming ONLINE" );

default:
break;
}
}
tx.success();

try
{
Thread.sleep( 100 );
}
catch ( InterruptedException e )
{
// ignored
}
}
} }
throw new IllegalStateException( "Index did not become ONLINE within reasonable time" );
} }


private UpdatesTracker executeCreations( IndexDescriptor index, int numberOfCreations ) throws KernelException private UpdatesTracker executeCreations( IndexDescriptor index, int numberOfCreations ) throws KernelException
Expand Down Expand Up @@ -584,13 +566,7 @@ private UpdatesTracker internalExecuteCreationsDeletionsAndUpdates( long[] nodes
int created = createNamedPeople( nodes, offset ); int created = createNamedPeople( nodes, offset );
offset += created; offset += created;
updatesTracker.increaseCreated( created ); updatesTracker.increaseCreated( created );

notifyIfPopulationCompleted( index, updatesTracker );
// check index online
if ( !updatesTracker.isPopulationCompleted() &&
indexOnlineMonitor.isIndexOnline( index ) )
{
updatesTracker.notifyPopulationCompleted();
}


// delete if allowed // delete if allowed
if ( allowDeletions && updatesTracker.created() % 5 == 0 ) if ( allowDeletions && updatesTracker.created() % 5 == 0 )
Expand All @@ -605,13 +581,7 @@ private UpdatesTracker internalExecuteCreationsDeletionsAndUpdates( long[] nodes
{ {
// ignore // ignore
} }

notifyIfPopulationCompleted( index, updatesTracker );
// check again index online
if ( !updatesTracker.isPopulationCompleted() &&
indexOnlineMonitor.isIndexOnline( index ) )
{
updatesTracker.notifyPopulationCompleted();
}
} }


// update if allowed // update if allowed
Expand All @@ -620,25 +590,37 @@ private UpdatesTracker internalExecuteCreationsDeletionsAndUpdates( long[] nodes
int randomIndex = random.nextInt( nodes.length ); int randomIndex = random.nextInt( nodes.length );
try try
{ {
changeName( nodes[randomIndex], "name", NAMES[randomIndex % NAMES.length] ); if ( changeName( nodes[randomIndex], "name", NAMES[randomIndex % NAMES.length] ) )
{
updatesTracker.increaseUpdated( 1 );
}
} }
catch ( EntityNotFoundException ex ) catch ( EntityNotFoundException | NotFoundException ex )
{ {
// ignore // ignore
} }

notifyIfPopulationCompleted( index, updatesTracker );
// check again index online
if ( !updatesTracker.isPopulationCompleted() && indexOnlineMonitor.isIndexOnline( index ) )
{
updatesTracker.notifyPopulationCompleted();
}
} }
} }
// make sure population complete has been notified // make sure population complete has been notified
updatesTracker.notifyPopulationCompleted(); updatesTracker.notifyPopulationCompleted();
return updatesTracker; return updatesTracker;
} }


private void notifyIfPopulationCompleted( IndexDescriptor index, UpdatesTracker updatesTracker )
{
if ( isCompletedPopulation( index, updatesTracker ) )
{
updatesTracker.notifyPopulationCompleted();
}
}

private boolean isCompletedPopulation( IndexDescriptor index, UpdatesTracker updatesTracker )
{
return !updatesTracker.isPopulationCompleted() &&
indexOnlineMonitor.isIndexOnline( index );
}

private void assertDoubleLongEquals( long expectedUniqueValue, long expectedSampledSize, private void assertDoubleLongEquals( long expectedUniqueValue, long expectedSampledSize,
DoubleLongRegister register ) DoubleLongRegister register )
{ {
Expand Down
Expand Up @@ -22,8 +22,10 @@
public class UpdatesTracker public class UpdatesTracker
{ {
private int created; private int created;
private int updated;
private int deleted; private int deleted;
private int createdDuringPopulation; private int createdDuringPopulation;
private int updatedDuringPopulation;
private int deletedDuringPopulation; private int deletedDuringPopulation;
private boolean populationCompleted; private boolean populationCompleted;


Expand All @@ -37,6 +39,11 @@ public void increaseDeleted( int num )
deleted += num; deleted += num;
} }


public void increaseUpdated( int num )
{
updated += num;
}

void notifyPopulationCompleted() void notifyPopulationCompleted()
{ {
if ( populationCompleted ) if ( populationCompleted )
Expand All @@ -46,6 +53,7 @@ void notifyPopulationCompleted()


populationCompleted = true; populationCompleted = true;
createdDuringPopulation = created; createdDuringPopulation = created;
updatedDuringPopulation = updated;
deletedDuringPopulation = deleted; deletedDuringPopulation = deleted;
} }


Expand All @@ -64,6 +72,11 @@ public int deleted()
return deleted; return deleted;
} }


public int getUpdated()
{
return updated;
}

public int createdDuringPopulation() public int createdDuringPopulation()
{ {
return createdDuringPopulation; return createdDuringPopulation;
Expand All @@ -74,6 +87,11 @@ public int deletedDuringPopulation()
return deletedDuringPopulation; return deletedDuringPopulation;
} }


public int getUpdatedDuringPopulation()
{
return updatedDuringPopulation;
}

public int createdAfterPopulation() public int createdAfterPopulation()
{ {
return created - createdDuringPopulation; return created - createdDuringPopulation;
Expand All @@ -84,13 +102,20 @@ public int deletedAfterPopulation()
return deleted - deletedDuringPopulation; return deleted - deletedDuringPopulation;
} }


public int updatedAfterPopulation()
{
return updated - updatedDuringPopulation;
}

public void add( UpdatesTracker updatesTracker ) public void add( UpdatesTracker updatesTracker )
{ {
assert isPopulationCompleted(); assert isPopulationCompleted();
assert updatesTracker.isPopulationCompleted(); assert updatesTracker.isPopulationCompleted();
this.created += updatesTracker.created; this.created += updatesTracker.created;
this.deleted += updatesTracker.deleted; this.deleted += updatesTracker.deleted;
this.updated += updatesTracker.updated;
this.createdDuringPopulation += updatesTracker.createdDuringPopulation; this.createdDuringPopulation += updatesTracker.createdDuringPopulation;
this.updatedDuringPopulation += updatesTracker.updatedDuringPopulation;
this.deletedDuringPopulation += updatesTracker.deletedDuringPopulation; this.deletedDuringPopulation += updatesTracker.deletedDuringPopulation;
} }


Expand All @@ -101,6 +126,7 @@ public String toString()
"created=" + created + "created=" + created +
", deleted=" + deleted + ", deleted=" + deleted +
", createdDuringPopulation=" + createdDuringPopulation + ", createdDuringPopulation=" + createdDuringPopulation +
", updatedDuringPopulation=" + updatedDuringPopulation +
", deletedDuringPopulation=" + deletedDuringPopulation + ", deletedDuringPopulation=" + deletedDuringPopulation +
", populationCompleted=" + populationCompleted + ", populationCompleted=" + populationCompleted +
'}'; '}';
Expand Down

0 comments on commit 03a8866

Please sign in to comment.