Skip to content

Commit

Permalink
Making the assertions a bit more stable.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Oct 24, 2016
1 parent 9cc0bd2 commit b814ab6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand All @@ -37,7 +38,7 @@
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnectors; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnectors;
import static org.neo4j.server.configuration.ClientConnectorSettings.httpConnector; import static org.neo4j.server.configuration.ClientConnectorSettings.httpConnector;


public class ClientConnectorAddresses public class ClientConnectorAddresses implements Iterable<ClientConnectorAddresses.ConnectorUri>
{ {
private final List<ConnectorUri> connectorUris; private final List<ConnectorUri> connectorUris;


Expand Down Expand Up @@ -110,6 +111,12 @@ static ClientConnectorAddresses fromString( String value )
.map( ConnectorUri::fromString ).collect( Collectors.toList() ) ); .map( ConnectorUri::fromString ).collect( Collectors.toList() ) );
} }


@Override
public Iterator<ConnectorUri> iterator()
{
return connectorUris.iterator();
}

public enum Scheme public enum Scheme
{ {
bolt, http, https bolt, http, https
Expand Down
Expand Up @@ -19,9 +19,12 @@
*/ */
package org.neo4j.causalclustering.scenarios; package org.neo4j.causalclustering.scenarios;


import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
Expand All @@ -31,21 +34,16 @@
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;


import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.discovery.ClientConnectorAddresses;
import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.ClusterMember; import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.ReadReplica; import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.RoutingNetworkSession; import org.neo4j.driver.internal.RoutingNetworkSession;
import org.neo4j.driver.internal.logging.JULogging; import org.neo4j.driver.internal.logging.JULogging;
import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Config;
Expand All @@ -65,15 +63,14 @@


import static java.lang.String.format; import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;

import static org.neo4j.driver.v1.Values.parameters; import static org.neo4j.driver.v1.Values.parameters;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.test.assertion.Assert.assertEventually; import static org.neo4j.test.assertion.Assert.assertEventually;
Expand Down Expand Up @@ -133,15 +130,14 @@ private int executeWriteAndReadThroughBolt( CoreClusterMember core ) throws Time
{ {
try ( Driver driver = GraphDatabase.driver( core.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) ) try ( Driver driver = GraphDatabase.driver( core.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ) )
{ {
int count = inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
return inExpirableSession( driver, ( d ) -> d.session( AccessMode.WRITE ), ( session ) ->
{ {
// when // when
session.run( "MERGE (n:Person {name: 'Jim'})" ).consume(); session.run( "MERGE (n:Person {name: 'Jim'})" ).consume();
Record record = session.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next(); Record record = session.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
return record.get( "count" ).asInt(); return record.get( "count" ).asInt();
} ); } );

return count;
} }
} }


Expand All @@ -153,7 +149,7 @@ public void shouldNotBeAbleToWriteOnAReadSession() throws Exception


assertEventually( "Failed to execute write query on read server", () -> assertEventually( "Failed to execute write query on read server", () ->
{ {
switchLeader(cluster.awaitLeader()); switchLeader( cluster.awaitLeader() );
CoreClusterMember leader = cluster.awaitLeader(); CoreClusterMember leader = cluster.awaitLeader();
Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ); Driver driver = GraphDatabase.driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) );


Expand Down Expand Up @@ -327,8 +323,7 @@ public void sessionShouldExpireOnFailingReadQuery() throws Exception
catch ( SessionExpiredException sep ) catch ( SessionExpiredException sep )
{ {
// then // then
assertEquals( String.format( "Server at %s is no longer available", coreServer.boltAdvertisedAddress() ), assertThat( sep.getMessage(), containsString( "is no longer available" ) );
sep.getMessage() );
} }
finally finally
{ {
Expand Down Expand Up @@ -516,7 +511,7 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception
} }


@Test @Test
public void shouldSendRequestsToNewlyAddedServers() throws Throwable public void shouldSendRequestsToNewlyAddedReadReplicas() throws Throwable
{ {
// given // given
cluster = clusterRule.withNumberOfReadReplicas( 1 ) cluster = clusterRule.withNumberOfReadReplicas( 1 )
Expand All @@ -538,41 +533,37 @@ public void shouldSendRequestsToNewlyAddedServers() throws Throwable
} ); } );


// when // when
Set<String> unusedServers = new HashSet<>(); Set<String> readReplicas = new HashSet<>();


for ( CoreClusterMember coreClusterMember : cluster.coreMembers() )
{
unusedServers.add( coreClusterMember.boltAdvertisedAddress() );
}
for ( ReadReplica readReplica : cluster.readReplicas() ) for ( ReadReplica readReplica : cluster.readReplicas() )
{ {
unusedServers.add( readReplica.boltAdvertisedAddress() ); readReplicas.add( readReplica.boltAdvertisedAddress() );
} }


for ( int i = 1; i <= 3; i++ ) for ( int i = 10; i <= 13; i++ )
{ {
ReadReplica newEdge = cluster.addReadReplicaWithId( i ); ReadReplica newReadReplica = cluster.addReadReplicaWithId( i );
unusedServers.add( newEdge.boltAdvertisedAddress() ); readReplicas.add( newReadReplica.boltAdvertisedAddress() );
newEdge.start(); newReadReplica.start();
} }


assertEventually( "Failed to send requests to all servers", () -> assertEventually( "Failed to send requests to all servers", () ->
{ {
for ( int i = 0; i < cluster.coreMembers().size() + cluster.readReplicas().size(); i++ ) for ( int i = 0; i < cluster.readReplicas().size(); i++ ) // don't care about cores
{ {
try ( Session session = driver.session( AccessMode.READ ) ) try ( Session session = driver.session( AccessMode.READ ) )
{ {
BoltServerAddress boltServerAddress = ((RoutingNetworkSession) session).address(); BoltServerAddress boltServerAddress = ((RoutingNetworkSession) session).address();
executeReadQuery( bookmark, session ); executeReadQuery( bookmark, session );
unusedServers.remove( boltServerAddress.toString() ); readReplicas.remove( boltServerAddress.toString() );
} }
catch ( Throwable throwable ) catch ( Throwable throwable )
{ {
return false; return false;
} }
} }


return unusedServers.size() == 0; return readReplicas.size() == 0; // have sent something to all replicas
}, is( true ), 30, SECONDS ); }, is( true ), 30, SECONDS );
} }


Expand Down Expand Up @@ -682,7 +673,7 @@ private void switchLeader( CoreClusterMember initialLeader ) throws InterruptedE


try try
{ {
triggerElection(initialLeader); triggerElection( initialLeader );
} }
catch ( IOException | TimeoutException e ) catch ( IOException | TimeoutException e )
{ {
Expand All @@ -703,8 +694,7 @@ private CoreClusterMember triggerElection( CoreClusterMember initialLeader ) thr
if ( !coreClusterMember.equals( initialLeader ) ) if ( !coreClusterMember.equals( initialLeader ) )
{ {
coreClusterMember.raft().triggerElection(); coreClusterMember.raft().triggerElection();
CoreClusterMember coreClusterMember1 = cluster.awaitLeader(); return cluster.awaitLeader();
return coreClusterMember1;
} }
} }
return initialLeader; return initialLeader;
Expand Down

0 comments on commit b814ab6

Please sign in to comment.