Skip to content

Commit

Permalink
Cluster formation decision depends on initial_hosts contents
Browse files Browse the repository at this point in the history
This change makes it so only instances listed in initial_hosts are
 considered for purposes of forming a cluster. This allows mismatching
 initial_hosts contents to be manipulated to make it harder for
 instances to start a cluster, for example slave_only instances to
 depend on the presence of a master but the master to be allowed to
 form the cluster regardless of the presence of other instances.
The functionality is achieved with the introduction of a
 (backwards compatible) discovery header in the configurationRequest
 messages which keeps track of which instance has contacted which
 during the initial discovery round. Since sending of these messages
 depends only on the initial_hosts content, this effectively allows
 for filtering of configurationRequests based on initial_host content.
  • Loading branch information
digitalstain committed Mar 13, 2017
1 parent eaaca44 commit e7c48eb
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 60 deletions.
Expand Up @@ -83,14 +83,16 @@ public static <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> timeout( ME
return timeout; return timeout;
} }



// Standard headers // Standard headers
public static final String CONVERSATION_ID = "conversation-id"; public static final String CONVERSATION_ID = "conversation-id";
public static final String CREATED_BY = "created-by"; public static final String CREATED_BY = "created-by";
public static final String TIMEOUT_COUNT = "timeout-count"; public static final String TIMEOUT_COUNT = "timeout-count";
public static final String FROM = "from"; public static final String FROM = "from";
public static final String TO = "to"; public static final String TO = "to";
public static final String INSTANCE_ID = "instance-id"; public static final String INSTANCE_ID = "instance-id";
// Should be present only in configurationRequest messages. Value is a comma separated list of instance ids.
// Added in 3.0.9.
public static final String DISCOVERED = "discovered";


private MESSAGETYPE messageType; private MESSAGETYPE messageType;
private Object payload; private Object payload;
Expand Down Expand Up @@ -136,16 +138,26 @@ public boolean isInternal()
public String getHeader( String name ) public String getHeader( String name )
throws IllegalArgumentException throws IllegalArgumentException
{ {
String value = headers.get( name ); String value = getHeader( name, null );
if ( value == null ) if ( value == null )
{ {
throw new IllegalArgumentException( "No such header:" + name ); throw new IllegalArgumentException( "No such header:" + name );
} }
return value; return value;
} }


public <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> copyHeadersTo( Message<MESSAGETYPE> message, public String getHeader( String name, String defaultValue )
String... names ) {
String value = headers.get( name );
if ( value == null )
{
return defaultValue;
}
return value;
}

public <MSGTYPE extends MessageType> Message<MSGTYPE> copyHeadersTo( Message<MSGTYPE> message,
String... names )
{ {
if ( names.length == 0 ) if ( names.length == 0 )
{ {
Expand Down
Expand Up @@ -23,9 +23,13 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.stream.Collectors;


import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory;
Expand Down Expand Up @@ -54,14 +58,28 @@ class ClusterContextImpl
extends AbstractContextImpl extends AbstractContextImpl
implements ClusterContext implements ClusterContext
{ {
private static final String DISCOVERY_HEADER_SEPARATOR = ",";

// ClusterContext // ClusterContext
private Iterable<ClusterListener> clusterListeners = Listeners.newListeners(); private Iterable<ClusterListener> clusterListeners = Listeners.newListeners();
private final List<ClusterMessage.ConfigurationRequestState> discoveredInstances = new ArrayList<ClusterMessage /*
.ConfigurationRequestState>(); * Holds instances that we have contacted and which have contacted us. This is achieved by filtering on
* receipt via contactingInstances and the DISCOVERED header.
* Cleared at the end of each discovery round.
*/
private final List<ClusterMessage.ConfigurationRequestState> discoveredInstances = new LinkedList<>();

/*
* Holds instances that have contacted us, along with a set of the instances they have in turn been contacted
* from. This is used to determine which instances that have contacted us have received messages from us and thus
* are in our initial_hosts. This is used to filter who goes in discoveredInstances.
* This map is also used to create the DISCOVERED header, which is basically the keyset in string form.
*/
private final Map<InstanceId, Set<InstanceId>> contactingInstances = new HashMap<>();

private Iterable<URI> joiningInstances; private Iterable<URI> joiningInstances;
private ClusterMessage.ConfigurationResponseState joinDeniedConfigurationResponseState; private ClusterMessage.ConfigurationResponseState joinDeniedConfigurationResponseState;
private final Map<InstanceId, URI> currentlyJoiningInstances = private final Map<InstanceId, URI> currentlyJoiningInstances = new HashMap<>();
new HashMap<InstanceId, URI>();


private final Executor executor; private final Executor executor;
private final ObjectOutputStreamFactory objectOutputStreamFactory; private final ObjectOutputStreamFactory objectOutputStreamFactory;
Expand Down Expand Up @@ -333,10 +351,65 @@ public List<ClusterMessage.ConfigurationRequestState> getDiscoveredInstances()
return discoveredInstances; return discoveredInstances;
} }


@Override
public boolean haveWeContactedInstance( ClusterMessage.ConfigurationRequestState configurationRequested )
{
return contactingInstances.containsKey( configurationRequested.getJoiningId() ) && contactingInstances.get(
configurationRequested.getJoiningId() ).contains( getMyId() );
}

@Override
public void addContactingInstance( ClusterMessage.ConfigurationRequestState instance, String discoveryHeader )
{
Set<InstanceId> contactsOfRemote = contactingInstances.computeIfAbsent( instance.getJoiningId(), k -> new
HashSet<>() );
// Duplicates of previous calls will be ignored by virtue of this being a set
contactsOfRemote.addAll( parseDiscoveryHeader( discoveryHeader ) );
}

@Override
public String generateDiscoveryHeader()
{
/*
* Maps the keyset of contacting instances from InstanceId to strings, collects them in a Set and joins them
* in a string with the appropriate separator
*/
return String.join( DISCOVERY_HEADER_SEPARATOR, contactingInstances.keySet().stream().map( InstanceId::toString ).collect( Collectors.toSet() ) );
}

private Set<InstanceId> parseDiscoveryHeader( String discoveryHeader )
{
String[] instanceIds = discoveryHeader.split( DISCOVERY_HEADER_SEPARATOR );
Set<InstanceId> result = new HashSet<>();
for ( String instanceId : instanceIds )
{
try
{
result.add( new InstanceId( Integer.parseInt( instanceId.trim() ) ) );
}
catch( NumberFormatException e )
{
/*
* This will happen if the message did not contain a DISCOVERY header. There are two reasons for this.
* One, the first configurationRequest going out from every instance does have the header but
* it is empty, since it's sent before any configurationRequests are processed.
* The other is practically the backwards compatibility code for versions which do not carry this header.
*
* Since the header will be empty (default value for it is empty string), the split above will create
* an array with a single empty string. This fails the integer parse.
*/
getLog( getClass() ).debug( "Could not parse discovery header for contacted instances, its value was " +
discoveryHeader );
}
}
return result;
}

@Override @Override
public String toString() public String toString()
{ {
return "Me: " + me + " Bound at: " + commonState.boundAt() + " Config:" + commonState.configuration(); return "Me: " + me + " Bound at: " + commonState.boundAt() + " Config:" + commonState.configuration() +
" Current state: " + commonState;
} }


@Override @Override
Expand Down Expand Up @@ -382,7 +455,7 @@ public Iterable<org.neo4j.cluster.InstanceId> getOtherInstances()
public boolean isInstanceJoiningFromDifferentUri( org.neo4j.cluster.InstanceId joiningId, URI uri ) public boolean isInstanceJoiningFromDifferentUri( org.neo4j.cluster.InstanceId joiningId, URI uri )
{ {
return currentlyJoiningInstances.containsKey( joiningId ) return currentlyJoiningInstances.containsKey( joiningId )
&& !currentlyJoiningInstances.get( joiningId ).equals(uri); && !currentlyJoiningInstances.get( joiningId ).equals( uri );
} }


@Override @Override
Expand Down Expand Up @@ -410,7 +483,7 @@ public void discoveredLastReceivedInstanceId( long id )
{ {
learnerContext.setLastDeliveredInstanceId( id ); learnerContext.setLastDeliveredInstanceId( id );
learnerContext.learnedInstanceId( id ); learnerContext.learnedInstanceId( id );
learnerContext.setNextInstanceId( id + 1); learnerContext.setNextInstanceId( id + 1 );
} }


@Override @Override
Expand Down Expand Up @@ -457,8 +530,8 @@ public boolean equals( Object o )
{ {
return false; return false;
} }
if ( discoveredInstances != null ? !discoveredInstances.equals( that.discoveredInstances ) : that if ( discoveredInstances != null ? !discoveredInstances.equals( that.discoveredInstances ) :
.discoveredInstances != null ) that.discoveredInstances != null )
{ {
return false; return false;
} }
Expand All @@ -467,8 +540,8 @@ public boolean equals( Object o )
{ {
return false; return false;
} }
if ( joinDeniedConfigurationResponseState != null ? !joinDeniedConfigurationResponseState.equals( that if ( joinDeniedConfigurationResponseState != null ? !joinDeniedConfigurationResponseState.equals(
.joinDeniedConfigurationResponseState ) : that.joinDeniedConfigurationResponseState != null ) that.joinDeniedConfigurationResponseState ) : that.joinDeniedConfigurationResponseState != null )
{ {
return false; return false;
} }
Expand Down
Expand Up @@ -87,6 +87,12 @@ public interface ClusterContext


List<ClusterMessage.ConfigurationRequestState> getDiscoveredInstances(); List<ClusterMessage.ConfigurationRequestState> getDiscoveredInstances();


boolean haveWeContactedInstance( ClusterMessage.ConfigurationRequestState configurationRequested );

void addContactingInstance( ClusterMessage.ConfigurationRequestState instance, String discoveryHeader );

String generateDiscoveryHeader();

void setBoundAt( URI boundAt ); void setBoundAt( URI boundAt );


void joinDenied( ConfigurationResponseState configurationResponseState ); void joinDenied( ConfigurationResponseState configurationResponseState );
Expand Down

0 comments on commit e7c48eb

Please sign in to comment.