From e7c48ebb5ee9a7eff9846388a37d05eb574f3ff1 Mon Sep 17 00:00:00 2001 From: Chris Gioran Date: Fri, 10 Mar 2017 20:15:29 +0200 Subject: [PATCH] Cluster formation decision depends on initial_hosts contents This change makes it so only instances listed in initial_hosts are considered for purposes of forming a cluster. This allows mismatching initial_hosts contents to be manipulated to make it harder for instances to start a cluster, for example slave_only instances to depend on the presence of a master but the master to be allowed to form the cluster regardless of the presence of other instances. The functionality is achieved with the introduction of a (backwards compatible) discovery header in the configurationRequest messages which keeps track of which instance has contacted which during the initial discovery round. Since sending of these messages depends only on the initial_hosts content, this effectively allows for filtering of configurationRequests based on initial_host content. --- .../neo4j/cluster/com/message/Message.java | 20 ++- .../context/ClusterContextImpl.java | 95 ++++++++++++-- .../protocol/cluster/ClusterContext.java | 6 + .../protocol/cluster/ClusterState.java | 97 +++++++++----- .../context/ClusterContextImplTest.java | 123 +++++++++++++++++- .../protocol/cluster/ClusterStateTest.java | 92 +++++++++++-- 6 files changed, 373 insertions(+), 60 deletions(-) diff --git a/enterprise/cluster/src/main/java/org/neo4j/cluster/com/message/Message.java b/enterprise/cluster/src/main/java/org/neo4j/cluster/com/message/Message.java index 299439342985f..4549e29a4cade 100644 --- a/enterprise/cluster/src/main/java/org/neo4j/cluster/com/message/Message.java +++ b/enterprise/cluster/src/main/java/org/neo4j/cluster/com/message/Message.java @@ -83,7 +83,6 @@ public static Message timeout( ME return timeout; } - // Standard headers public static final String CONVERSATION_ID = "conversation-id"; public static final String CREATED_BY = "created-by"; @@ -91,6 +90,9 @@ public static Message timeout( ME public static final String FROM = "from"; public static final String TO = "to"; public static final String INSTANCE_ID = "instance-id"; + // Should be present only in configurationRequest messages. Value is a comma separated list of instance ids. + // Added in 3.0.9. + public static final String DISCOVERED = "discovered"; private MESSAGETYPE messageType; private Object payload; @@ -136,7 +138,7 @@ public boolean isInternal() public String getHeader( String name ) throws IllegalArgumentException { - String value = headers.get( name ); + String value = getHeader( name, null ); if ( value == null ) { throw new IllegalArgumentException( "No such header:" + name ); @@ -144,8 +146,18 @@ public String getHeader( String name ) return value; } - public Message copyHeadersTo( Message message, - String... names ) + public String getHeader( String name, String defaultValue ) + { + String value = headers.get( name ); + if ( value == null ) + { + return defaultValue; + } + return value; + } + + public Message copyHeadersTo( Message message, + String... names ) { if ( names.length == 0 ) { diff --git a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImpl.java b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImpl.java index f06156754e973..17fd2b15f759d 100644 --- a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImpl.java +++ b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImpl.java @@ -23,9 +23,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory; @@ -54,14 +58,28 @@ class ClusterContextImpl extends AbstractContextImpl implements ClusterContext { + private static final String DISCOVERY_HEADER_SEPARATOR = ","; + // ClusterContext private Iterable clusterListeners = Listeners.newListeners(); - private final List discoveredInstances = new ArrayList(); + /* + * Holds instances that we have contacted and which have contacted us. This is achieved by filtering on + * receipt via contactingInstances and the DISCOVERED header. + * Cleared at the end of each discovery round. + */ + private final List discoveredInstances = new LinkedList<>(); + + /* + * Holds instances that have contacted us, along with a set of the instances they have in turn been contacted + * from. This is used to determine which instances that have contacted us have received messages from us and thus + * are in our initial_hosts. This is used to filter who goes in discoveredInstances. + * This map is also used to create the DISCOVERED header, which is basically the keyset in string form. + */ + private final Map> contactingInstances = new HashMap<>(); + private Iterable joiningInstances; private ClusterMessage.ConfigurationResponseState joinDeniedConfigurationResponseState; - private final Map currentlyJoiningInstances = - new HashMap(); + private final Map currentlyJoiningInstances = new HashMap<>(); private final Executor executor; private final ObjectOutputStreamFactory objectOutputStreamFactory; @@ -333,10 +351,65 @@ public List getDiscoveredInstances() return discoveredInstances; } + @Override + public boolean haveWeContactedInstance( ClusterMessage.ConfigurationRequestState configurationRequested ) + { + return contactingInstances.containsKey( configurationRequested.getJoiningId() ) && contactingInstances.get( + configurationRequested.getJoiningId() ).contains( getMyId() ); + } + + @Override + public void addContactingInstance( ClusterMessage.ConfigurationRequestState instance, String discoveryHeader ) + { + Set contactsOfRemote = contactingInstances.computeIfAbsent( instance.getJoiningId(), k -> new + HashSet<>() ); + // Duplicates of previous calls will be ignored by virtue of this being a set + contactsOfRemote.addAll( parseDiscoveryHeader( discoveryHeader ) ); + } + + @Override + public String generateDiscoveryHeader() + { + /* + * Maps the keyset of contacting instances from InstanceId to strings, collects them in a Set and joins them + * in a string with the appropriate separator + */ + return String.join( DISCOVERY_HEADER_SEPARATOR, contactingInstances.keySet().stream().map( InstanceId::toString ).collect( Collectors.toSet() ) ); + } + + private Set parseDiscoveryHeader( String discoveryHeader ) + { + String[] instanceIds = discoveryHeader.split( DISCOVERY_HEADER_SEPARATOR ); + Set result = new HashSet<>(); + for ( String instanceId : instanceIds ) + { + try + { + result.add( new InstanceId( Integer.parseInt( instanceId.trim() ) ) ); + } + catch( NumberFormatException e ) + { + /* + * This will happen if the message did not contain a DISCOVERY header. There are two reasons for this. + * One, the first configurationRequest going out from every instance does have the header but + * it is empty, since it's sent before any configurationRequests are processed. + * The other is practically the backwards compatibility code for versions which do not carry this header. + * + * Since the header will be empty (default value for it is empty string), the split above will create + * an array with a single empty string. This fails the integer parse. + */ + getLog( getClass() ).debug( "Could not parse discovery header for contacted instances, its value was " + + discoveryHeader ); + } + } + return result; + } + @Override public String toString() { - return "Me: " + me + " Bound at: " + commonState.boundAt() + " Config:" + commonState.configuration(); + return "Me: " + me + " Bound at: " + commonState.boundAt() + " Config:" + commonState.configuration() + + " Current state: " + commonState; } @Override @@ -382,7 +455,7 @@ public Iterable getOtherInstances() public boolean isInstanceJoiningFromDifferentUri( org.neo4j.cluster.InstanceId joiningId, URI uri ) { return currentlyJoiningInstances.containsKey( joiningId ) - && !currentlyJoiningInstances.get( joiningId ).equals(uri); + && !currentlyJoiningInstances.get( joiningId ).equals( uri ); } @Override @@ -410,7 +483,7 @@ public void discoveredLastReceivedInstanceId( long id ) { learnerContext.setLastDeliveredInstanceId( id ); learnerContext.learnedInstanceId( id ); - learnerContext.setNextInstanceId( id + 1); + learnerContext.setNextInstanceId( id + 1 ); } @Override @@ -457,8 +530,8 @@ public boolean equals( Object o ) { return false; } - if ( discoveredInstances != null ? !discoveredInstances.equals( that.discoveredInstances ) : that - .discoveredInstances != null ) + if ( discoveredInstances != null ? !discoveredInstances.equals( that.discoveredInstances ) : + that.discoveredInstances != null ) { return false; } @@ -467,8 +540,8 @@ public boolean equals( Object o ) { return false; } - if ( joinDeniedConfigurationResponseState != null ? !joinDeniedConfigurationResponseState.equals( that - .joinDeniedConfigurationResponseState ) : that.joinDeniedConfigurationResponseState != null ) + if ( joinDeniedConfigurationResponseState != null ? !joinDeniedConfigurationResponseState.equals( + that.joinDeniedConfigurationResponseState ) : that.joinDeniedConfigurationResponseState != null ) { return false; } diff --git a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterContext.java b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterContext.java index 4320ab4449616..80128b14eff16 100644 --- a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterContext.java +++ b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterContext.java @@ -87,6 +87,12 @@ public interface ClusterContext List getDiscoveredInstances(); + boolean haveWeContactedInstance( ClusterMessage.ConfigurationRequestState configurationRequested ); + + void addContactingInstance( ClusterMessage.ConfigurationRequestState instance, String discoveryHeader ); + + String generateDiscoveryHeader(); + void setBoundAt( URI boundAt ); void joinDenied( ConfigurationResponseState configurationResponseState ); diff --git a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterState.java b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterState.java index 7b2b4fbd81dc2..369179abe4f3e 100644 --- a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterState.java +++ b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterState.java @@ -34,6 +34,8 @@ import org.neo4j.cluster.statemachine.State; import org.neo4j.helpers.collection.Iterables; +import static java.lang.String.format; +import static org.neo4j.cluster.com.message.Message.DISCOVERED; import static org.neo4j.cluster.com.message.Message.internal; import static org.neo4j.cluster.com.message.Message.respond; import static org.neo4j.cluster.com.message.Message.timeout; @@ -86,12 +88,18 @@ public enum ClusterState String name = ( String ) args[0]; URI[] clusterInstanceUris = ( URI[] ) args[1]; context.joining( name, Iterables.iterable( clusterInstanceUris ) ); + context.getLog( getClass() ).info( "Trying to join with DISCOVERY header " + context.generateDiscoveryHeader() ); for ( URI potentialClusterInstanceUri : clusterInstanceUris ) { + /* + * The DISCOVERY header is empty, since we haven't processed configurationRequests + * at all yet. However, we still send it out for consistency. + */ outgoing.offer( to( ClusterMessage.configurationRequest, potentialClusterInstanceUri, - new ClusterMessage.ConfigurationRequestState( context.getMyId(), context.boundAt() ) ) ); + new ClusterMessage.ConfigurationRequestState( context.getMyId(), context.boundAt() ) ) + .setHeader( DISCOVERED, context.generateDiscoveryHeader() ) ); } context.setTimeout( "discovery", timeout( ClusterMessage.configurationTimeout, message, @@ -126,6 +134,7 @@ public enum ClusterState MessageHolder outgoing ) throws Throwable { List discoveredInstances = context.getDiscoveredInstances(); + context.getLog( getClass() ).info( format( "Discovered instances are %s", discoveredInstances ) ); switch ( message.getMessageType() ) { case configurationResponse: @@ -150,9 +159,8 @@ public enum ClusterState if ( !memberList.containsKey( context.getMyId() ) || !memberList.get( context.getMyId() ).equals( context.boundAt() ) ) { - context.getLog( ClusterState.class ).info( String.format( "%s joining:%s, " + - "last delivered:%d", context.getMyId().toString(), - context.getConfiguration().toString(), + context.getLog( ClusterState.class ).info( format( "%s joining:%s, last delivered:%d", + context.getMyId().toString(), context.getConfiguration().toString(), state.getLatestReceivedInstanceId().getId() ) ); ClusterMessage.ConfigurationChangeState newState = new ClusterMessage.ConfigurationChangeState(); @@ -200,13 +208,16 @@ public enum ClusterState ClusterMessage.ConfigurationTimeoutState state = message.getPayload(); if ( state.getRemainingPings() > 0 ) { + context.getLog( getClass() ).info( format( "Trying to join with DISCOVERY header %s", + context.generateDiscoveryHeader() ) ); // Send out requests again for ( URI potentialClusterInstanceUri : context.getJoiningInstances() ) { outgoing.offer( to( ClusterMessage.configurationRequest, potentialClusterInstanceUri, new ClusterMessage.ConfigurationRequestState( - context.getMyId(), context.boundAt() ) ) ); + context.getMyId(), context.boundAt() ) ) + .setHeader( DISCOVERED, context.generateDiscoveryHeader() ) ); } context.setTimeout( "join", timeout( ClusterMessage.configurationTimeout, message, @@ -233,11 +244,11 @@ public enum ClusterState * does not contain us, ever. */ ClusterMessage.ConfigurationRequestState ourRequestState = - new ClusterMessage.ConfigurationRequestState(context.getMyId(), context.boundAt()); + new ClusterMessage.ConfigurationRequestState( context.getMyId(), context.boundAt() ); // No one to join with boolean imAlone = count(context.getJoiningInstances()) == 1 - && discoveredInstances.contains(ourRequestState) + && discoveredInstances.contains( ourRequestState ) && discoveredInstances.size() == 1; // Enough instances discovered (half or more - i don't count myself here) boolean haveDiscoveredMajority = @@ -259,14 +270,17 @@ public enum ClusterState else { discoveredInstances.clear(); - + context.getLog( getClass() ).info( format( + "Trying to join with DISCOVERY header %s", + context.generateDiscoveryHeader() ) ); // Someone else is supposed to create the cluster - restart the join discovery for ( URI potentialClusterInstanceUri : context.getJoiningInstances() ) { outgoing.offer( to( ClusterMessage.configurationRequest, potentialClusterInstanceUri, new ClusterMessage.ConfigurationRequestState( context.getMyId(), - context.boundAt() ) ) ); + context.boundAt() ) ) + .setHeader( DISCOVERED, context.generateDiscoveryHeader() )); } context.setTimeout( "discovery", timeout( ClusterMessage.configurationTimeout, message, @@ -289,9 +303,15 @@ public enum ClusterState // We're listening for existing clusters, but if all instances start up at the same time // and look for each other, this allows us to pick that up ClusterMessage.ConfigurationRequestState configurationRequested = message.getPayload(); - configurationRequested = new ClusterMessage.ConfigurationRequestState( configurationRequested.getJoiningId(), URI.create(message.getHeader( Message.FROM ) )); - - if ( !discoveredInstances.contains( configurationRequested )) + configurationRequested = new ClusterMessage.ConfigurationRequestState( + configurationRequested.getJoiningId(), + URI.create( message.getHeader( Message.FROM ) ) ); + // Make a note that this instance contacted us. + context.addContactingInstance( configurationRequested, message.getHeader( DISCOVERED, "" ) ); + context.getLog( getClass() ).info( format( "Received configuration request %s and " + + "the header was %s", configurationRequested, message.getHeader( DISCOVERED, "" ) ) ); + + if ( !discoveredInstances.contains( configurationRequested ) ) { for ( ClusterMessage.ConfigurationRequestState discoveredInstance : discoveredInstances ) @@ -300,25 +320,35 @@ public enum ClusterState { // we are done outgoing.offer( internal( ClusterMessage.joinFailure, - new IllegalStateException( String.format( + new IllegalStateException( format( "Failed to join cluster because I saw two instances with the " + - "same ServerId. " + - "One is %s. The other is %s", discoveredInstance, - configurationRequested ) ) ) ); + "same ServerId. One is %s. The other is %s", + discoveredInstance, configurationRequested ) ) ) ); return start; } } - discoveredInstances.add( configurationRequested ); + if ( context.haveWeContactedInstance( configurationRequested ) ) + { + context.getLog( getClass() ).info( format( "%s had header %s which " + + "contains us. This means we've contacted them and they are in our " + + "initial hosts.", configurationRequested, message.getHeader( DISCOVERED, "" ) ) ); + discoveredInstances.add( configurationRequested ); + } + else + { + context.getLog( getClass() ).warn( + format( "joining instance %s was not in %s, i will not consider it for " + + "purposes of cluster creation", + configurationRequested.getJoiningUri(), + context.getJoiningInstances() ) ); + } } break; } case joinDenied: { -// outgoing.offer( internal( ClusterMessage.joinFailure, -// new ClusterEntryDeniedException( context.me, context.configuration ) ) ); -// return start; - context.joinDenied( (ClusterMessage.ConfigurationResponseState) message.getPayload() ); + context.joinDenied( message.getPayload() ); return this; } } @@ -375,7 +405,8 @@ public enum ClusterState { outgoing.offer( to( ClusterMessage.configurationRequest, potentialClusterInstanceUri, - new ClusterMessage.ConfigurationRequestState( context.getMyId(), context.boundAt() ) ) ); + new ClusterMessage.ConfigurationRequestState( context.getMyId(), context.boundAt() ) ) + .setHeader( DISCOVERED, context.generateDiscoveryHeader() )); } context.setTimeout( "discovery", timeout( ClusterMessage.configurationTimeout, message, @@ -420,7 +451,8 @@ public enum ClusterState case configurationRequest: { ClusterMessage.ConfigurationRequestState request = message.getPayload(); - request = new ClusterMessage.ConfigurationRequestState( request.getJoiningId(), URI.create(message.getHeader( Message.FROM ) )); + request = new ClusterMessage.ConfigurationRequestState( request.getJoiningId(), + URI.create( message.getHeader( Message.FROM ) ) ); InstanceId joiningId = request.getJoiningId(); URI joiningUri = request.getJoiningUri(); @@ -440,16 +472,20 @@ public enum ClusterState { if(otherInstanceJoiningWithSameId) { - context.getLog( ClusterState.class ).info( "Denying entry to instance " + joiningId + " because another instance is currently joining with the same id."); + context.getLog( ClusterState.class ).info( format( "Denying entry to instance %s" + + " because another instance is currently joining with the same id.", + joiningId ) ); } else { - context.getLog( ClusterState.class ).info( "Denying entry to instance " + joiningId + " because that instance is already in the cluster."); + context.getLog( ClusterState.class ).info( format( "Denying entry to " + + "instance %s because that instance is already in the cluster.", joiningId ) ); } outgoing.offer( message.copyHeadersTo( respond( ClusterMessage.joinDenied, message, new ClusterMessage.ConfigurationResponseState( context.getConfiguration() .getRoles(), context.getConfiguration().getMembers(), - new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( context.getLastDeliveredInstanceId() ), + new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( + context.getLastDeliveredInstanceId() ), context.getConfiguration().getName() ) ) ) ); } else @@ -459,7 +495,8 @@ public enum ClusterState outgoing.offer( message.copyHeadersTo( respond( ClusterMessage.configurationResponse, message, new ClusterMessage.ConfigurationResponseState( context.getConfiguration() .getRoles(), context.getConfiguration().getMembers(), - new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( context.getLastDeliveredInstanceId() ), + new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( + context.getLastDeliveredInstanceId() ), context.getConfiguration().getName() ) ) ) ); } break; @@ -477,8 +514,8 @@ public enum ClusterState List nodeList = new ArrayList( context.getConfiguration().getMemberURIs() ); if ( nodeList.size() == 1 ) { - context.getLog( ClusterState.class ).info( "Shutting down cluster: " + context - .getConfiguration().getName() ); + context.getLog( ClusterState.class ).info( format( "Shutting down cluster: %s", + context.getConfiguration().getName() ) ); context.left(); return start; @@ -486,7 +523,7 @@ public enum ClusterState } else { - context.getLog( ClusterState.class ).info( "Leaving:" + nodeList ); + context.getLog( ClusterState.class ).info( format( "Leaving:%s", nodeList ) ); ClusterMessage.ConfigurationChangeState newState = new ClusterMessage .ConfigurationChangeState(); diff --git a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImplTest.java b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImplTest.java index bc5cb07006e79..e161a18a427f8 100644 --- a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImplTest.java +++ b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/ClusterContextImplTest.java @@ -19,24 +19,27 @@ */ package org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.context; -import java.net.URI; -import java.util.concurrent.Executor; - import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.net.URI; +import java.util.concurrent.Executor; + import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerContext; import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; import org.neo4j.cluster.protocol.cluster.ClusterContext; +import org.neo4j.cluster.protocol.cluster.ClusterMessage; import org.neo4j.cluster.protocol.heartbeat.HeartbeatContext; import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener; import org.neo4j.cluster.timeout.Timeouts; import org.neo4j.logging.NullLogProvider; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -248,4 +251,118 @@ timeouts, executor, mock( ObjectOutputStreamFactory.class ), mock( assertEquals( context.getLastElector(), elector ); assertEquals( context.getLastElectorVersion(), 8 ); } + + @Test + public void shouldGracefullyHandleEmptyDiscoveryHeader() throws Exception + { + // Given + InstanceId me = new InstanceId( 1 ); + InstanceId joining = new InstanceId( 2 ); + + CommonContextState commonContextState = mock( CommonContextState.class, RETURNS_MOCKS ); + Timeouts timeouts = mock( Timeouts.class ); + Executor executor = mock( Executor.class ); + + HeartbeatContext heartbeatContext = mock ( HeartbeatContext.class ); + + ClusterContext context = new ClusterContextImpl(me, commonContextState, NullLogProvider.getInstance(), + timeouts, executor, mock( ObjectOutputStreamFactory.class ), mock( + ObjectInputStreamFactory.class ), mock( LearnerContext.class ), heartbeatContext ); + + ClusterMessage.ConfigurationRequestState request = mock( ClusterMessage.ConfigurationRequestState.class ); + when ( request.getJoiningId() ).thenReturn( joining ); + + // When + // Instance 2 contacts us with a request but it is empty + context.addContactingInstance( request, "" ); + + // Then + // The discovery header we generate should still contain that instance + assertEquals( "2", context.generateDiscoveryHeader() ); + } + + @Test + public void shouldUpdateDiscoveryHeaderWithContactingInstances() throws Exception + { + // Given + InstanceId me = new InstanceId( 1 ); + InstanceId joiningOne = new InstanceId( 2 ); + InstanceId joiningTwo = new InstanceId( 3 ); + + CommonContextState commonContextState = mock( CommonContextState.class, RETURNS_MOCKS ); + Timeouts timeouts = mock( Timeouts.class ); + Executor executor = mock( Executor.class ); + + HeartbeatContext heartbeatContext = mock ( HeartbeatContext.class ); + + ClusterContext context = new ClusterContextImpl(me, commonContextState, NullLogProvider.getInstance(), + timeouts, executor, mock( ObjectOutputStreamFactory.class ), mock( + ObjectInputStreamFactory.class ), mock( LearnerContext.class ), heartbeatContext ); + + ClusterMessage.ConfigurationRequestState requestOne = mock( ClusterMessage.ConfigurationRequestState.class ); + when ( requestOne.getJoiningId() ).thenReturn( joiningOne ); + + ClusterMessage.ConfigurationRequestState requestTwo = mock( ClusterMessage.ConfigurationRequestState.class ); + when ( requestTwo.getJoiningId() ).thenReturn( joiningTwo ); + + // When + // Instance 2 contacts us twice and Instance 3 contacts us once + context.addContactingInstance( requestOne, "4, 5" ); // discovery headers are random here + context.addContactingInstance( requestOne, "4, 5" ); + context.addContactingInstance( requestTwo, "2, 5" ); + + // Then + // The discovery header we generate should still contain one copy of each instance + assertEquals( "2,3", context.generateDiscoveryHeader() ); + } + + @Test + public void shouldKeepTrackOfInstancesWeHaveContacted() throws Exception + { + // Given + InstanceId me = new InstanceId( 1 ); + InstanceId joiningOne = new InstanceId( 2 ); + InstanceId joiningTwo = new InstanceId( 3 ); + + CommonContextState commonContextState = mock( CommonContextState.class, RETURNS_MOCKS ); + Timeouts timeouts = mock( Timeouts.class ); + Executor executor = mock( Executor.class ); + + HeartbeatContext heartbeatContext = mock ( HeartbeatContext.class ); + + ClusterContext context = new ClusterContextImpl(me, commonContextState, NullLogProvider.getInstance(), + timeouts, executor, mock( ObjectOutputStreamFactory.class ), mock( + ObjectInputStreamFactory.class ), mock( LearnerContext.class ), heartbeatContext ); + + ClusterMessage.ConfigurationRequestState requestOne = mock( ClusterMessage.ConfigurationRequestState.class ); + when ( requestOne.getJoiningId() ).thenReturn( joiningOne ); + + ClusterMessage.ConfigurationRequestState requestTwo = mock( ClusterMessage.ConfigurationRequestState.class ); + when ( requestTwo.getJoiningId() ).thenReturn( joiningTwo ); + + // When + // Instance two contacts us but we are not in the header + context.addContactingInstance( requestOne, "4, 5" ); + // Then we haven't contacted instance 2 + assertFalse(context.haveWeContactedInstance( requestOne ) ); + + // When + // Instance 2 reports that we have contacted it after all + context.addContactingInstance( requestOne, "4, 5, 1" ); + // Then + assertTrue(context.haveWeContactedInstance( requestOne ) ); + + // When + // Instance 3 says we have contacted it + context.addContactingInstance( requestTwo, "2, 5, 1" ); + // Then + assertTrue( context.haveWeContactedInstance( requestTwo ) ); + + // When + // For some reason we are not in the header of 3 in subsequent responses (a delayed one, for example) + context.addContactingInstance( requestTwo, "2, 5" ); + // Then + // The state should still keep the fact we've contacted it already + assertTrue( context.haveWeContactedInstance( requestTwo ) ); + } } diff --git a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/cluster/ClusterStateTest.java b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/cluster/ClusterStateTest.java index 4db00bc23f622..5c4915a70d5aa 100644 --- a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/cluster/ClusterStateTest.java +++ b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/cluster/ClusterStateTest.java @@ -19,26 +19,19 @@ */ package org.neo4j.cluster.protocol.cluster; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.neo4j.cluster.com.message.Message.to; -import static org.neo4j.cluster.protocol.cluster.ClusterMessage.configurationRequest; -import static org.neo4j.cluster.protocol.cluster.ClusterMessage.joinDenied; +import org.junit.Test; +import org.mockito.ArgumentMatcher; import java.net.URI; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; -import org.junit.Test; -import org.mockito.ArgumentMatcher; import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.com.message.Message; +import org.neo4j.cluster.com.message.MessageHolder; import org.neo4j.cluster.com.message.MessageType; import org.neo4j.cluster.com.message.TrackingMessageHolder; import org.neo4j.cluster.protocol.cluster.ClusterMessage.ConfigurationRequestState; @@ -46,6 +39,21 @@ import org.neo4j.logging.NullLog; import org.neo4j.logging.NullLogProvider; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.cluster.com.message.Message.DISCOVERED; +import static org.neo4j.cluster.com.message.Message.internal; +import static org.neo4j.cluster.com.message.Message.to; +import static org.neo4j.cluster.protocol.cluster.ClusterMessage.configurationRequest; +import static org.neo4j.cluster.protocol.cluster.ClusterMessage.configurationTimeout; +import static org.neo4j.cluster.protocol.cluster.ClusterMessage.joinDenied; + public class ClusterStateTest { @Test @@ -136,6 +144,66 @@ public void shouldNotDenyJoinToInstanceThatRejoinsBeforeTimingOut() throws Throw assertEquals( ClusterMessage.configurationResponse, response.getMessageType() ); } + @Test + public void discoveredInstancesShouldBeOnlyOnesWeHaveContactedDirectly() throws Throwable + { + // GIVEN + ClusterContext context = mock( ClusterContext.class ); + when( context.getLog( any( Class.class ) ) ).thenReturn( NullLog.getInstance() ); + when( context.getUriForId( id( 2 ) ) ).thenReturn( uri( 2 ) ); + + List discoveredInstances = new LinkedList<>(); + when( context.getDiscoveredInstances() ).thenReturn( discoveredInstances ); + + MessageHolder outgoing = mock( MessageHolder.class ); + ConfigurationRequestState configurationRequestFromTwo = configuration( 2 ); + Message message = to( configurationRequest, uri( 1 ), configurationRequestFromTwo ) + .setHeader( Message.FROM, uri( 2 ).toString() ); + + // WHEN + // We receive a configuration request from an instance which we haven't contacted + ClusterState.discovery.handle( context, message, outgoing ); + + // THEN + // It shouldn't be added to the discovered instances + assertTrue( discoveredInstances.isEmpty() ); + + // WHEN + // It subsequently contacts us + when( context.haveWeContactedInstance( configurationRequestFromTwo ) ).thenReturn( true ); + ClusterState.discovery.handle( context, message, outgoing ); + + // Then + assertTrue( discoveredInstances.contains( configurationRequestFromTwo ) ); + } + + @Test + public void shouldSetDiscoveryHeaderProperly() throws Throwable + { + // GIVEN + ClusterContext context = mock( ClusterContext.class ); + when( context.getLog( any( Class.class ) ) ).thenReturn( NullLog.getInstance() ); + when( context.getUriForId( id( 2 ) ) ).thenReturn( uri( 2 ) ); + when( context.getJoiningInstances() ).thenReturn( singletonList( uri( 2 ) ) ); + + List discoveredInstances = new LinkedList<>(); + when( context.getDiscoveredInstances() ).thenReturn( discoveredInstances ); + + TrackingMessageHolder outgoing = new TrackingMessageHolder(); + ClusterMessage.ConfigurationTimeoutState timeoutState = new ClusterMessage.ConfigurationTimeoutState( 3 ); + Message message = internal( configurationTimeout, timeoutState ); + String discoveryHeader = "1,2,3"; + when( context.generateDiscoveryHeader() ).thenReturn( discoveryHeader ); + + // WHEN + // We receive a configuration request from an instance which we haven't contacted + ClusterState.discovery.handle( context, message, outgoing ); + + // THEN + // It shouldn't be added to the discovered instances + assertEquals( discoveryHeader, outgoing.first().getHeader( DISCOVERED ) ); + } + private ConfigurationResponseState configurationResponseState( Map existingMembers ) { return new ConfigurationResponseState( Collections.emptyMap(),