Skip to content

Commit

Permalink
QueryInvalidationIT made not flaky
Browse files Browse the repository at this point in the history
Flakiness was caused by the fact that code created index but did not wait for
this index to come online. Most of the times index appeared to be online before
any queries were executed, but sometimes flipping of index to ONLINE mode
interleaved with query executions. This caused flushes of schema and query
caches and failures of test assumptions about queries being re-planned.
  • Loading branch information
lutovich committed Aug 3, 2015
1 parent 6657a5d commit 9848f8f
Showing 1 changed file with 80 additions and 47 deletions.
Expand Up @@ -19,51 +19,50 @@
*/ */
package org.neo4j.cypher; package org.neo4j.cypher;


import org.junit.Rule;
import org.junit.Test;

import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;


import org.neo4j.cypher.internal.compiler.v2_2.CypherCacheHitMonitor; import org.neo4j.cypher.internal.compiler.v2_2.CypherCacheHitMonitor;
import org.neo4j.cypher.internal.compiler.v2_2.ast.Statement; import org.neo4j.cypher.internal.compiler.v2_2.ast.Statement;
import org.neo4j.graphdb.ExecutionPlanDescription; import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.helpers.Pair; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.DatabaseRule; import org.neo4j.test.DatabaseRule;
import org.neo4j.test.ImpermanentDatabaseRule; import org.neo4j.test.ImpermanentDatabaseRule;
import org.neo4j.test.RepeatRule;


import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;


import static org.neo4j.helpers.collection.IteratorUtil.single;

public class QueryInvalidationIT public class QueryInvalidationIT
{ {
public final @Rule DatabaseRule db = new ImpermanentDatabaseRule(); private static final int USERS = 10;
private static final int CONNECTIONS = 100;

@Rule
public final DatabaseRule db = new ImpermanentDatabaseRule();


@Test @Test
public void shouldRePlanAfterDataChangesFromAnEmptyDatabase() throws Exception public void shouldRePlanAfterDataChangesFromAnEmptyDatabase() throws Exception
{ {
// GIVEN // GIVEN
Random random = ThreadLocalRandom.current();
int USERS = 10, CONNECTIONS = 100;
TestMonitor monitor = new TestMonitor(); TestMonitor monitor = new TestMonitor();
db.resolveDependency( Monitors.class ).addMonitorListener( monitor ); db.resolveDependency( Monitors.class ).addMonitorListener( monitor );
// - setup schema - // - setup schema -
db.execute( "CREATE INDEX ON :User(userId)" ); createIndex();
// - execute the query without the existence data - // - execute the query without the existence data -
distantFriend( random, USERS ); executeDistantFriendsCountQuery( USERS );


long replanTime = System.currentTimeMillis() + 1_100; long replanTime = System.currentTimeMillis() + 1_800;


// - create data - // - create data -
createData( 0, USERS, CONNECTIONS, random ); createData( 0, USERS, CONNECTIONS );


// - after the query TTL has expired - // - after the query TTL has expired -
while ( System.currentTimeMillis() < replanTime ) while ( System.currentTimeMillis() < replanTime )
Expand All @@ -74,30 +73,28 @@ public void shouldRePlanAfterDataChangesFromAnEmptyDatabase() throws Exception
// WHEN // WHEN
monitor.reset(); monitor.reset();
// - execute the query again - // - execute the query again -
distantFriend( random, USERS ); executeDistantFriendsCountQuery( USERS );


// THEN // THEN
assertEquals( "Query should have been replanned.", 1, monitor.discards ); assertEquals( "Query should have been replanned.", 1, monitor.discards.get() );
} }


@Test @Test
public void shouldRePlanAfterDataChangesFromAPopulatedDatabase() throws Exception public void shouldRePlanAfterDataChangesFromAPopulatedDatabase() throws Exception
{ {
// GIVEN // GIVEN
Random random = ThreadLocalRandom.current();
int USERS = 10, CONNECTIONS = 100;
TestMonitor monitor = new TestMonitor(); TestMonitor monitor = new TestMonitor();
db.resolveDependency( Monitors.class ).addMonitorListener( monitor ); db.resolveDependency( Monitors.class ).addMonitorListener( monitor );
// - setup schema - // - setup schema -
db.execute( "CREATE INDEX ON :User(userId)" ); createIndex();
//create some data //create some data
createData( 0, USERS, CONNECTIONS, random ); createData( 0, USERS, CONNECTIONS );
distantFriend( random, USERS ); executeDistantFriendsCountQuery( USERS );


long replanTime = System.currentTimeMillis() + 1_100; long replanTime = System.currentTimeMillis() + 1_800;


//create more date //create more date
createData( USERS, USERS, CONNECTIONS, random ); createData( USERS, USERS, CONNECTIONS );


// - after the query TTL has expired - // - after the query TTL has expired -
while ( System.currentTimeMillis() < replanTime ) while ( System.currentTimeMillis() < replanTime )
Expand All @@ -108,68 +105,104 @@ public void shouldRePlanAfterDataChangesFromAPopulatedDatabase() throws Exceptio
// WHEN // WHEN
monitor.reset(); monitor.reset();
// - execute the query again - // - execute the query again -
distantFriend( random, USERS ); executeDistantFriendsCountQuery( USERS );


// THEN // THEN
assertEquals( "Query should have been replanned.", 1, monitor.discards ); assertEquals( "Query should have been replanned.", 1, monitor.discards.get() );
} }


private void createData(long startingUserId, int numUsers, int numConnections, Random random) private void createIndex()
{
try ( Transaction tx = db.beginTx() )
{
db.schema().indexFor( DynamicLabel.label( "User" ) ).on( "userId" ).create();
tx.success();
}
try ( Transaction tx = db.beginTx() )
{
db.schema().awaitIndexesOnline( 10, SECONDS );
tx.success();
}
}

private void createData( long startingUserId, int numUsers, int numConnections )
{ {
for ( long userId = startingUserId; userId < numUsers + startingUserId; userId++ ) for ( long userId = startingUserId; userId < numUsers + startingUserId; userId++ )
{ {
db.execute( "CREATE (newUser:User {userId: {userId}})", singletonMap( "userId", (Object) userId ) ); db.execute( "CREATE (newUser:User {userId: {userId}})", singletonMap( "userId", (Object) userId ) );
} }
Map<String, Object> params = new HashMap<>(); Map<String,Object> params = new HashMap<>();
for ( int i = 0; i < numConnections; i++ ) for ( int i = 0; i < numConnections; i++ )
{ {
long user1 = startingUserId + random.nextInt( numUsers ); long user1 = startingUserId + randomInt( numUsers );
long user2; long user2;
do do
{ {
user2 = startingUserId + random.nextInt( numUsers ); user2 = startingUserId + randomInt( numUsers );
} while ( user1 == user2 ); }
while ( user1 == user2 );
params.put( "user1", user1 ); params.put( "user1", user1 );
params.put( "user2", user2 ); params.put( "user2", user2 );
db.execute( "MATCH (user1:User { userId: {user1} }), (user2:User { userId: {user2} }) " + db.execute( "MATCH (user1:User { userId: {user1} }), (user2:User { userId: {user2} }) " +
"CREATE UNIQUE user1 -[:FRIEND]- user2", params ); "CREATE UNIQUE user1 -[:FRIEND]- user2", params );
} }
} }


private Pair<Long, ExecutionPlanDescription> distantFriend( Random random, int USERS ) private void executeDistantFriendsCountQuery( int userId )
{ {
Result result = db Map<String,Object> params = singletonMap( "userId", (Object) (long) randomInt( userId ) );
.execute( "MATCH (user:User { userId: {userId} } ) -[:FRIEND]- () -[:FRIEND]- (distantFriend) " +
"RETURN COUNT(distinct distantFriend)", try ( Result result = db.execute(
singletonMap( "userId", (Object) (long) random.nextInt( USERS ) ) ); "MATCH (user:User { userId: {userId} } ) -[:FRIEND]- () -[:FRIEND]- (distantFriend) " +
return Pair.of( (Long) single( single( result ).values() ), result.getExecutionPlanDescription() ); "RETURN COUNT(distinct distantFriend)", params ) )
{
while ( result.hasNext() )
{
result.next();
}
}
}

private static int randomInt( int max )
{
return ThreadLocalRandom.current().nextInt( max );
} }


private static class TestMonitor implements CypherCacheHitMonitor<Statement> private static class TestMonitor implements CypherCacheHitMonitor<Statement>
{ {
int hits, misses, discards; private final AtomicInteger hits = new AtomicInteger();
private final AtomicInteger misses = new AtomicInteger();
private final AtomicInteger discards = new AtomicInteger();

@Override
public void cacheHit( Statement key )
{
hits.incrementAndGet();
}


@Override @Override
public synchronized void cacheHit( Statement key ) public void cacheMiss( Statement key )
{ {
hits++; misses.incrementAndGet();
} }


@Override @Override
public synchronized void cacheMiss( Statement key ) public void cacheDiscard( Statement key )
{ {
misses++; discards.incrementAndGet();
} }


@Override @Override
public synchronized void cacheDiscard( Statement key ) public String toString()
{ {
discards++; return "TestMonitor{hits=" + hits + ", misses=" + misses + ", discards=" + discards + "}";
} }


public void reset() public void reset()
{ {
hits = misses = discards = 0; hits.set( 0 );
misses.set( 0 );
discards.set( 0 );
} }
} }
} }

0 comments on commit 9848f8f

Please sign in to comment.