Skip to content

Commit

Permalink
Create Constraint ourside bolt session in BoltCoreEdgeIT.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall authored and davidegrohmann committed Oct 12, 2016
1 parent 582c0c3 commit 2f8812b
Showing 1 changed file with 63 additions and 72 deletions.
Expand Up @@ -19,33 +19,26 @@
*/
package org.neo4j.coreedge.scenarios;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Collectors;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.ClusterMember;
import org.neo4j.coreedge.discovery.CoreClusterMember;
import org.neo4j.coreedge.discovery.EdgeClusterMember;
import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory;
import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.RoutingNetworkSession;
import org.neo4j.driver.internal.logging.JULogging;
Expand All @@ -64,27 +57,24 @@
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.test.coreedge.ClusterRule;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class BoltCoreEdgeIT
{
private static final long DEFAULT_TIMEOUT_MS = 15_000;
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withNumberOfCoreMembers( 3 )
.withNumberOfEdgeMembers( 0 );
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 );

private Cluster cluster;

Expand All @@ -99,74 +89,75 @@ public void setup() throws Exception
public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception
{
// given
cluster = clusterRule.startCluster();

CoreClusterMember leader = cluster.awaitLeader();

try ( Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();
cluster.coreTx( ( db, tx ) ->
{
inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
{
// when
session.run( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" ).consume();
session.run( "MERGE (n:Person {name: 'Jim'})-[:BOOM]->(m)" ).consume();
Iterators.count( db.execute( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" ) );
tx.success();
} );

Record record = session.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
// when
int count = executeWriteAndReadThroughBolt( cluster.awaitLeader() );

// then
assertEquals( 1, record.get( "count" ).asInt() );
return null;
} );
}
// then
assertEquals( 1, count );
}

@Test
public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfFollower() throws Exception
{
// given
cluster = clusterRule.startCluster();
cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();
cluster.coreTx( ( db, tx ) ->
{
Iterators.count( db.execute( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" ) );
tx.success();
} );

CoreClusterMember follower = cluster.getDbWithRole(Role.FOLLOWER);
// when
int count = executeWriteAndReadThroughBolt( cluster.getDbWithRole( Role.FOLLOWER ) );

try ( Driver driver = GraphDatabase.driver( follower.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
// then
assertEquals( 1, count );
}

private int executeWriteAndReadThroughBolt( CoreClusterMember core ) throws TimeoutException, InterruptedException
{
try ( Driver driver = GraphDatabase.driver( core.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
{
inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
int count = inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
{
// when
session.run( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" ).consume();
session.run( "MERGE (n:Person {name: 'Jim'})-[:BOOM]->(m)" ).consume();

session.run( "MERGE (n:Person {name: 'Jim'})" ).consume();
Record record = session.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();

// then
assertEquals( 1, record.get( "count" ).asInt() );
return null;
return record.get( "count" ).asInt();
} );

return count;
}
}

@Test
public void shouldNotBeAbleToWriteOnAReadSession() throws Exception
{
// given
cluster = clusterRule.startCluster();
cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();

assertEventually( "Failed to execute write query on read server", () ->
{
triggerElection();
CoreClusterMember leader = cluster.awaitLeader();
Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) );

try ( Session session = driver.session(AccessMode.READ) )
try ( Session session = driver.session( AccessMode.READ ) )
{
// when
session.run( "CREATE (n:Person {name: 'Jim'})-[:BOOM]->(m)" ).consume();
session.run( "CREATE (n:Person {name: 'Jim'})" ).consume();
return false;
}
catch ( ClientException ex )
{
assertEquals( String.format( "Write queries cannot be performed in READ access mode.",
leader.boltAdvertisedAddress()), ex.getMessage() );
assertEquals( "Write queries cannot be performed in READ access mode.", ex.getMessage() );
return true;
}
finally
Expand All @@ -180,27 +171,27 @@ public void shouldNotBeAbleToWriteOnAReadSession() throws Exception
public void sessionShouldExpireOnLeaderSwitch() throws Exception
{
// given
cluster = clusterRule.startCluster();
cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();

CoreClusterMember leader = cluster.awaitLeader();

Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) );
try ( Session session = driver.session() )
{
session.run( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" ).consume();
session.run( "CREATE (n:Person {name: 'Jim'})" ).consume();

// when
switchLeader( leader );

session.run( "MERGE (n:Person {name: 'Jim'})-[:BOOM]->(m)" ).consume();
session.run( "CREATE (n:Person {name: 'Mark'})" ).consume();

fail( "Should have thrown exception" );
}
catch ( SessionExpiredException sep )
{
// then
assertEquals( String.format( "Server at %s no longer accepts writes",
leader.boltAdvertisedAddress() ), sep.getMessage() );
assertEquals( String.format( "Server at %s no longer accepts writes", leader.boltAdvertisedAddress() ),
sep.getMessage() );
}
finally
{
Expand All @@ -212,7 +203,7 @@ public void sessionShouldExpireOnLeaderSwitch() throws Exception
public void shouldPickANewServerToWriteToOnLeaderSwitch() throws Throwable
{
// given
cluster = clusterRule.startCluster();
cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();

CoreClusterMember leader = cluster.awaitLeader();

Expand All @@ -235,8 +226,8 @@ public void shouldPickANewServerToWriteToOnLeaderSwitch() throws Throwable
thread.start();

Config config = Config.build().withLogging( new JULogging( Level.OFF ) ).toConfig();
try ( Driver driver = GraphDatabase.driver( leader.routingURI(),
AuthTokens.basic( "neo4j", "neo4j" ), config ) )
try ( Driver driver = GraphDatabase
.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ), config ) )
{
boolean success = false;
Set<BoltServerAddress> seenAddresses = new HashSet<>();
Expand Down Expand Up @@ -306,7 +297,7 @@ public void sessionShouldExpireOnFailingReadQuery() throws Exception
return null;
} );

try ( Session readSession = driver.session( AccessMode.READ) )
try ( Session readSession = driver.session( AccessMode.READ ) )
{
// when
connectedServer( readSession ).shutdown();
Expand All @@ -318,7 +309,7 @@ public void sessionShouldExpireOnFailingReadQuery() throws Exception
catch ( SessionExpiredException sep )
{
// then
assertEquals( String.format("Server at %s is no longer available", coreServer.boltAdvertisedAddress()),
assertEquals( String.format( "Server at %s is no longer available", coreServer.boltAdvertisedAddress() ),
sep.getMessage() );
}
finally
Expand Down Expand Up @@ -361,8 +352,9 @@ public void shouldReadAndWriteToANewSessionCreatedAfterALeaderSwitch() throws Ex
catch ( SessionExpiredException sep )
{
// then
assertEquals( String.format( "Server at %s no longer accepts writes",
leader.boltAdvertisedAddress() ), sep.getMessage() );
assertEquals(
String.format( "Server at %s no longer accepts writes", leader.boltAdvertisedAddress() ),
sep.getMessage() );
}
return null;
} );
Expand All @@ -384,7 +376,7 @@ public void bookmarksShouldWorkWithDriverPinnedToSingleServer() throws Exception
{
// given
cluster = clusterRule.withNumberOfEdgeMembers( 1 ).startCluster();
CoreClusterMember leader = cluster.awaitLeader( );
CoreClusterMember leader = cluster.awaitLeader();

try ( Driver driver = GraphDatabase.driver( leader.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
{
Expand Down Expand Up @@ -415,9 +407,9 @@ public void shouldUseBookmarkFromAReadSessionInAWriteSession() throws Exception
{
// given
cluster = clusterRule.withNumberOfEdgeMembers( 1 ).startCluster();
CoreClusterMember leader = cluster.awaitLeader( );
CoreClusterMember leader = cluster.awaitLeader();

try(Driver driver = GraphDatabase.driver( leader.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) ))
try ( Driver driver = GraphDatabase.driver( leader.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
{
inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
{
Expand Down Expand Up @@ -464,7 +456,7 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception
// given
cluster = clusterRule.withNumberOfEdgeMembers( 1 ).startCluster();

CoreClusterMember leader = cluster.awaitLeader( );
CoreClusterMember leader = cluster.awaitLeader();
EdgeClusterMember edgeMember = cluster.getEdgeMemberById( 0 );

edgeMember.txPollingClient().pause();
Expand All @@ -490,7 +482,7 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception

driver = GraphDatabase.driver( edgeMember.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) );

try ( Session session = driver.session(AccessMode.READ) )
try ( Session session = driver.session( AccessMode.READ ) )
{
try ( Transaction tx = session.beginTransaction( bookmark ) )
{
Expand All @@ -505,12 +497,11 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception
public void shouldSendRequestsToNewlyAddedServers() throws Throwable
{
// given
cluster = clusterRule
.withNumberOfEdgeMembers( 1 )
.withSharedCoreParams( stringMap( CoreEdgeClusterSettings.cluster_routing_ttl.name(), "1s") )
cluster = clusterRule.withNumberOfEdgeMembers( 1 )
.withSharedCoreParams( stringMap( CoreEdgeClusterSettings.cluster_routing_ttl.name(), "1s" ) )
.startCluster();

CoreClusterMember leader = cluster.awaitLeader( );
CoreClusterMember leader = cluster.awaitLeader();
Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) );

String bookmark = inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
Expand All @@ -533,7 +524,7 @@ public void shouldSendRequestsToNewlyAddedServers() throws Throwable
}
for ( EdgeClusterMember edgeClusterMember : cluster.edgeMembers() )
{
unusedServers.add(edgeClusterMember.boltAdvertisedAddress());
unusedServers.add( edgeClusterMember.boltAdvertisedAddress() );
}

for ( int i = 1; i <= 3; i++ )
Expand Down Expand Up @@ -572,7 +563,7 @@ private void executeReadQuery( String bookmark, Session session )
}
}

private <T> T inExpirableSession( Driver driver, Function<Driver, Session> acquirer, Function<Session, T> op )
private <T> T inExpirableSession( Driver driver, Function<Driver,Session> acquirer, Function<Session,T> op )
throws TimeoutException, InterruptedException
{
long endTime = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS;
Expand Down Expand Up @@ -602,7 +593,7 @@ private ClusterMember connectedServer( Session session ) throws NoSuchFieldExcep
String host = connection.address().host();
int port = connection.address().port();

return cluster.getMemberByBoltAddress( new AdvertisedSocketAddress( host, port ) ) ;
return cluster.getMemberByBoltAddress( new AdvertisedSocketAddress( host, port ) );
}

private void switchLeader( CoreClusterMember initialLeader )
Expand Down

0 comments on commit 2f8812b

Please sign in to comment.