Skip to content

Commit

Permalink
Elector identification now happens more reliably
Browse files Browse the repository at this point in the history
When an instance joins an HA cluster, it assumes all other
 members, as they are received in the configuration response,
 are alive. This means that, if the newly joining instance, is the
 lowest numbered alive one but there are others, lower numbered, but
 which are failed, it will choose not to trigger elections. Updating
 liveness state afterwards, through suspicion gossip, will not
 alter that since marking a member as failed does not result in
 elections if that member does not hold any roles.
This patch addresses this issue by making liveness information
 part of the configuration response and consulting that list on
 the first election request. By getting all alive instances and
 removing what the cluster says is currently failed, we end up with
 a reliable list of alive members which then we can check and see
 who the elector is.
  • Loading branch information
digitalstain committed Mar 5, 2018
1 parent e32a3b9 commit 34c1b31
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 104 deletions.
Expand Up @@ -171,6 +171,12 @@ public boolean shouldFilterContactingInstances()
return config.get( ClusterSettings.strict_initial_hosts ); return config.get( ClusterSettings.strict_initial_hosts );
} }


@Override
public Set<InstanceId> getFailedInstances()
{
return heartbeatContext.getFailed();
}

@Override @Override
public InstanceId getLastElector() public InstanceId getLastElector()
{ {
Expand Down Expand Up @@ -214,10 +220,19 @@ public void joining( String name, Iterable<URI> instanceList )
} }


@Override @Override
public void acquiredConfiguration( final Map<InstanceId, URI> memberList, final Map<String, InstanceId> roles ) public void acquiredConfiguration( final Map<InstanceId, URI> memberList, final Map<String, InstanceId> roles,
final Set<InstanceId> failedInstances )
{ {
commonState.configuration().setMembers( memberList ); commonState.configuration().setMembers( memberList );
commonState.configuration().setRoles( roles ); commonState.configuration().setRoles( roles );
for ( InstanceId failedInstance : failedInstances )
{
if ( !failedInstance.equals( me ) )
{
logProvider.getLog( ClusterContextImpl.class ).debug( "Adding instance " + failedInstance + " as failed from the start" );
heartbeatContext.failed( failedInstance );
}
}
} }


@Override @Override
Expand Down
Expand Up @@ -255,10 +255,14 @@ public org.neo4j.cluster.InstanceId getMyId()
@Override @Override
public boolean isElector() public boolean isElector()
{ {
// Only the first alive server should try elections. Everyone else waits // Only the first *alive* server should try elections. Everyone else waits
// This also takes into account the instances reported by the cluster join response as failed, to
// cover for the case where we just joined and our suspicions are not reliable yet.
List<org.neo4j.cluster.InstanceId> aliveInstances = asList( getAlive() ); List<org.neo4j.cluster.InstanceId> aliveInstances = asList( getAlive() );
aliveInstances.removeAll( getFailed() );
Collections.sort( aliveInstances ); Collections.sort( aliveInstances );
return aliveInstances.indexOf( getMyId() ) == 0; // Either we are the first one or the only one
return aliveInstances.indexOf( getMyId() ) == 0 || aliveInstances.isEmpty();
} }


@Override @Override
Expand Down Expand Up @@ -309,7 +313,7 @@ private static class Election
private Election( WinnerStrategy winnerStrategy ) private Election( WinnerStrategy winnerStrategy )
{ {
this.winnerStrategy = winnerStrategy; this.winnerStrategy = winnerStrategy;
this.votes = new HashMap<InstanceId, Vote>(); this.votes = new HashMap<>();
} }


private Election( WinnerStrategy winnerStrategy, HashMap<InstanceId, Vote> votes ) private Election( WinnerStrategy winnerStrategy, HashMap<InstanceId, Vote> votes )
Expand Down
Expand Up @@ -92,7 +92,7 @@ public boolean alive( InstanceId node )
Set<InstanceId> serverSuspicions = suspicionsFor( getMyId() ); Set<InstanceId> serverSuspicions = suspicionsFor( getMyId() );
boolean suspected = serverSuspicions.remove( node ); boolean suspected = serverSuspicions.remove( node );


if ( !isFailed( node ) && failed.remove( node ) ) if ( !isFailedBasedOnSuspicions( node ) && failed.remove( node ) )
{ {
getLog( HeartbeatContext.class ).info( "Notifying listeners that instance " + node + " is alive" ); getLog( HeartbeatContext.class ).info( "Notifying listeners that instance " + node + " is alive" );
heartBeatListeners.notify( executor, listener -> listener.alive( node ) ); heartBeatListeners.notify( executor, listener -> listener.alive( node ) );
Expand All @@ -113,7 +113,7 @@ public void suspect( InstanceId node )
getLog( HeartbeatContext.class ).info( getMyId() + "(me) is now suspecting " + node ); getLog( HeartbeatContext.class ).info( getMyId() + "(me) is now suspecting " + node );
} }


if ( isFailed( node ) && !failed.contains( node ) ) if ( isFailedBasedOnSuspicions( node ) && !failed.contains( node ) )
{ {
getLog( HeartbeatContext.class ).info( "Notifying listeners that instance " + node + " is failed" ); getLog( HeartbeatContext.class ).info( "Notifying listeners that instance " + node + " is failed" );
failed.add( node ); failed.add( node );
Expand Down Expand Up @@ -187,7 +187,7 @@ public void suspicions( InstanceId from, Set<InstanceId> suspicions )
* will be marked as failed (it has gathered enough suspicions) but we still need to process their messages, in * will be marked as failed (it has gathered enough suspicions) but we still need to process their messages, in
* order to mark as failed the other half. * order to mark as failed the other half.
*/ */
if ( isFailed( from ) && !failed.contains( from ) ) if ( isFailedBasedOnSuspicions( from ) && !failed.contains( from ) )
{ {
getLog( HeartbeatContext.class ).info( getLog( HeartbeatContext.class ).info(
"Ignoring suspicions from failed instance " + from + ": " + Iterables.toString( suspicions, "," ) ); "Ignoring suspicions from failed instance " + from + ": " + Iterables.toString( suspicions, "," ) );
Expand Down Expand Up @@ -221,7 +221,7 @@ public void suspicions( InstanceId from, Set<InstanceId> suspicions )
// Check if anyone is considered failed // Check if anyone is considered failed
for ( InstanceId node : suspicions ) for ( InstanceId node : suspicions )
{ {
if ( isFailed( node ) && !failed.contains( node ) ) if ( isFailedBasedOnSuspicions( node ) && !failed.contains( node ) )
{ {
failed.add( node ); failed.add( node );
heartBeatListeners.notify( executor, listener -> listener.failed( node ) ); heartBeatListeners.notify( executor, listener -> listener.failed( node ) );
Expand All @@ -238,7 +238,7 @@ public Set<InstanceId> getFailed()
@Override @Override
public Iterable<InstanceId> getAlive() public Iterable<InstanceId> getAlive()
{ {
return Iterables.filter( item -> !isFailed( item ), commonState.configuration().getMemberIds() ); return Iterables.filter( item -> !isFailedBasedOnSuspicions( item ), commonState.configuration().getMemberIds() );
} }


@Override @Override
Expand All @@ -264,7 +264,7 @@ public void serverLeftCluster( InstanceId node )
} }


@Override @Override
public boolean isFailed( InstanceId node ) public boolean isFailedBasedOnSuspicions( InstanceId node )
{ {
List<InstanceId> suspicionsForNode = getSuspicionsOf( node ); List<InstanceId> suspicionsForNode = getSuspicionsOf( node );
int countOfInstancesSuspectedByMe = getSuspicionsFor( getMyId() ).size(); int countOfInstancesSuspectedByMe = getSuspicionsFor( getMyId() ).size();
Expand Down Expand Up @@ -344,6 +344,12 @@ public long getLastLearnedInstanceId()
return learnerContext.getLastLearnedInstanceId(); return learnerContext.getLastLearnedInstanceId();
} }


@Override
public void failed( InstanceId instanceId )
{
failed.add( instanceId );
}

public HeartbeatContextImpl snapshot( CommonContextState commonStateSnapshot, LogProvider logging, public HeartbeatContextImpl snapshot( CommonContextState commonStateSnapshot, LogProvider logging,
Timeouts timeouts, Executor executor ) Timeouts timeouts, Executor executor )
{ {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;


import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.protocol.ConfigurationContext; import org.neo4j.cluster.protocol.ConfigurationContext;
Expand Down Expand Up @@ -53,7 +54,8 @@ public interface ClusterContext


void joining( String name, Iterable<URI> instanceList ); void joining( String name, Iterable<URI> instanceList );


void acquiredConfiguration( Map<InstanceId, URI> memberList, Map<String, InstanceId> roles ); void acquiredConfiguration( Map<InstanceId, URI> memberList, Map<String, InstanceId> roles,
Set<InstanceId> failedInstances );


void joined(); void joined();


Expand Down Expand Up @@ -124,4 +126,11 @@ public interface ClusterContext
void setLastElectorVersion( long lastElectorVersion ); void setLastElectorVersion( long lastElectorVersion );


boolean shouldFilterContactingInstances(); boolean shouldFilterContactingInstances();

/**
* @return The set of instances present in the failed set. This is not the same as the instances which are
* determined to be failed based on suspicions, as failed instance information can also come from the cluster
* configuration response at join time.
*/
Set<InstanceId> getFailedInstances();
} }
Expand Up @@ -23,6 +23,7 @@
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;


import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.com.message.MessageType; import org.neo4j.cluster.com.message.MessageType;
Expand Down Expand Up @@ -125,14 +126,16 @@ public static class ConfigurationResponseState
private org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId latestReceivedInstanceId; private org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId latestReceivedInstanceId;
private Map<String, InstanceId> roles; private Map<String, InstanceId> roles;
private String clusterName; private String clusterName;
private Set<InstanceId> failedMembers;


public ConfigurationResponseState( Map<String, InstanceId> roles, Map<InstanceId, URI> nodes, public ConfigurationResponseState( Map<String, InstanceId> roles, Map<InstanceId, URI> nodes,
org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId latestReceivedInstanceId, org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId latestReceivedInstanceId,
String clusterName ) Set<InstanceId> failedMembers, String clusterName )
{ {
this.roles = roles; this.roles = roles;
this.nodes = nodes; this.nodes = nodes;
this.latestReceivedInstanceId = latestReceivedInstanceId; this.latestReceivedInstanceId = latestReceivedInstanceId;
this.failedMembers = failedMembers;
this.clusterName = clusterName; this.clusterName = clusterName;
} }


Expand All @@ -156,10 +159,15 @@ public String getClusterName()
return clusterName; return clusterName;
} }


public Set<InstanceId> getFailedMembers()
{
return failedMembers;
}

public ConfigurationResponseState snapshot() public ConfigurationResponseState snapshot()
{ {
return new ConfigurationResponseState( new HashMap<>(roles), new HashMap<>(nodes), return new ConfigurationResponseState( new HashMap<>( roles ), new HashMap<>( nodes ),
latestReceivedInstanceId, clusterName ); latestReceivedInstanceId, failedMembers, clusterName );
} }


@Override @Override
Expand All @@ -169,6 +177,7 @@ public String toString()
"nodes=" + nodes + "nodes=" + nodes +
", latestReceivedInstanceId=" + latestReceivedInstanceId + ", latestReceivedInstanceId=" + latestReceivedInstanceId +
", roles=" + roles + ", roles=" + roles +
", failed=" + failedMembers +
", clusterName='" + clusterName + '\'' + ", clusterName='" + clusterName + '\'' +
'}'; '}';
} }
Expand Down Expand Up @@ -200,6 +209,10 @@ public boolean equals( Object o )
{ {
return false; return false;
} }
if ( failedMembers != null ? !failedMembers.equals( that.failedMembers ) : that.failedMembers != null )
{
return false;
}
if ( roles != null ? !roles.equals( that.roles ) : that.roles != null ) if ( roles != null ? !roles.equals( that.roles ) : that.roles != null )
{ {
return false; return false;
Expand Down
Expand Up @@ -55,20 +55,20 @@ public enum ClusterState
{ {
@Override @Override
public State<?, ?> handle( ClusterContext context, Message<ClusterMessage> message, public State<?, ?> handle( ClusterContext context, Message<ClusterMessage> message,
MessageHolder outgoing ) throws Throwable MessageHolder outgoing )
{ {
switch ( message.getMessageType() ) switch ( message.getMessageType() )
{ {
case addClusterListener: case addClusterListener:
{ {
context.addClusterListener( message.<ClusterListener>getPayload() ); context.addClusterListener( message.getPayload() );


break; break;
} }


case removeClusterListener: case removeClusterListener:
{ {
context.removeClusterListener( message.<ClusterListener>getPayload() ); context.removeClusterListener( message.getPayload() );


break; break;
} }
Expand All @@ -84,10 +84,10 @@ public enum ClusterState
case join: case join:
{ {
// Send configuration request to all instances // Send configuration request to all instances
Object[] args = message.<Object[]>getPayload(); Object[] args = message.getPayload();
String name = (String) args[0]; String name = (String) args[0];
URI[] clusterInstanceUris = (URI[]) args[1]; URI[] clusterInstanceUris = (URI[]) args[1];
context.joining( name, Iterables.<URI,URI>iterable( clusterInstanceUris ) ); context.joining( name, Iterables.iterable( clusterInstanceUris ) );
context.getLog( getClass() ).info( "Trying to join with DISCOVERY header " + context.generateDiscoveryHeader() ); context.getLog( getClass() ).info( "Trying to join with DISCOVERY header " + context.generateDiscoveryHeader() );


for ( URI potentialClusterInstanceUri : clusterInstanceUris ) for ( URI potentialClusterInstanceUri : clusterInstanceUris )
Expand Down Expand Up @@ -154,10 +154,10 @@ public enum ClusterState
", got " + state.getClusterName() + "." ); ", got " + state.getClusterName() + "." );
} }


HashMap<InstanceId, URI> memberList = new HashMap<InstanceId, URI>( state.getMembers() ); HashMap<InstanceId, URI> memberList = new HashMap<>( state.getMembers() );
context.discoveredLastReceivedInstanceId( state.getLatestReceivedInstanceId().getId() ); context.discoveredLastReceivedInstanceId( state.getLatestReceivedInstanceId().getId() );


context.acquiredConfiguration( memberList, state.getRoles() ); context.acquiredConfiguration( memberList, state.getRoles(), state.getFailedMembers() );


if ( !memberList.containsKey( context.getMyId() ) || if ( !memberList.containsKey( context.getMyId() ) ||
!memberList.get( context.getMyId() ).equals( context.boundAt() ) ) !memberList.get( context.getMyId() ).equals( context.boundAt() ) )
Expand Down Expand Up @@ -379,7 +379,6 @@ public enum ClusterState
Message<ClusterMessage> message, Message<ClusterMessage> message,
MessageHolder outgoing MessageHolder outgoing
) )
throws Throwable
{ {
switch ( message.getMessageType() ) switch ( message.getMessageType() )
{ {
Expand Down Expand Up @@ -448,20 +447,20 @@ public enum ClusterState
{ {
@Override @Override
public State<?, ?> handle( ClusterContext context, Message<ClusterMessage> message, public State<?, ?> handle( ClusterContext context, Message<ClusterMessage> message,
MessageHolder outgoing ) throws Throwable MessageHolder outgoing )
{ {
switch ( message.getMessageType() ) switch ( message.getMessageType() )
{ {
case addClusterListener: case addClusterListener:
{ {
context.addClusterListener( message.<ClusterListener>getPayload() ); context.addClusterListener( message.getPayload() );


break; break;
} }


case removeClusterListener: case removeClusterListener:
{ {
context.removeClusterListener( message.<ClusterListener>getPayload() ); context.removeClusterListener( message.getPayload() );


break; break;
} }
Expand Down Expand Up @@ -504,6 +503,7 @@ public enum ClusterState
.getRoles(), context.getConfiguration().getMembers(), .getRoles(), context.getConfiguration().getMembers(),
new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId(
context.getLastDeliveredInstanceId() ), context.getLastDeliveredInstanceId() ),
context.getFailedInstances(),
context.getConfiguration().getName() ) ) ) ); context.getConfiguration().getName() ) ) ) );
} }
else else
Expand All @@ -515,6 +515,7 @@ public enum ClusterState
.getRoles(), context.getConfiguration().getMembers(), .getRoles(), context.getConfiguration().getMembers(),
new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId(
context.getLastDeliveredInstanceId() ), context.getLastDeliveredInstanceId() ),
context.getFailedInstances(),
context.getConfiguration().getName() ) ) ) ); context.getConfiguration().getName() ) ) ) );
} }
break; break;
Expand All @@ -529,7 +530,7 @@ public enum ClusterState


case leave: case leave:
{ {
List<URI> nodeList = new ArrayList<URI>( context.getConfiguration().getMemberURIs() ); List<URI> nodeList = new ArrayList<>( context.getConfiguration().getMemberURIs() );
if ( nodeList.size() == 1 ) if ( nodeList.size() == 1 )
{ {
context.getLog( ClusterState.class ).info( format( "Shutting down cluster: %s", context.getLog( ClusterState.class ).info( format( "Shutting down cluster: %s",
Expand Down Expand Up @@ -570,7 +571,6 @@ public enum ClusterState
Message<ClusterMessage> message, Message<ClusterMessage> message,
MessageHolder outgoing MessageHolder outgoing
) )
throws Throwable
{ {
switch ( message.getMessageType() ) switch ( message.getMessageType() )
{ {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;


import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.com.message.Message; import org.neo4j.cluster.com.message.Message;
Expand All @@ -49,7 +50,6 @@ public enum ElectionState
Message<ElectionMessage> message, Message<ElectionMessage> message,
MessageHolder outgoing MessageHolder outgoing
) )
throws Throwable
{ {
if ( message.getMessageType() == ElectionMessage.created ) if ( message.getMessageType() == ElectionMessage.created )
{ {
Expand All @@ -72,7 +72,6 @@ else if ( message.getMessageType() == ElectionMessage.join )
Message<ElectionMessage> message, Message<ElectionMessage> message,
MessageHolder outgoing MessageHolder outgoing
) )
throws Throwable
{ {
Log log = context.getLog( ElectionState.class ); Log log = context.getLog( ElectionState.class );
switch ( message.getMessageType() ) switch ( message.getMessageType() )
Expand Down Expand Up @@ -154,6 +153,7 @@ else if ( message.getMessageType() == ElectionMessage.join )


if ( isElector ) if ( isElector )
{ {
context.getLog( ElectionState.class ).info( "I am the elector, doing election..." );
// Start election process for all roles // Start election process for all roles
Iterable<ElectionRole> rolesRequiringElection = context.getPossibleRoles(); Iterable<ElectionRole> rolesRequiringElection = context.getPossibleRoles();
for ( ElectionRole role : rolesRequiringElection ) for ( ElectionRole role : rolesRequiringElection )
Expand Down Expand Up @@ -212,10 +212,23 @@ else if ( message.getMessageType() == ElectionMessage.join )
} }
else else
{ {
List<InstanceId> aliveInstances = Iterables.asList( context.getAlive() ); /*
Collections.sort( aliveInstances ); * We take alive instances as determined by suspicions and remove those that are
* marked as failed in the failed set. This is done so that an instance which
* just joined can use the failed set provided in the configuration response to
* correctly determine the instances that are failed and skip them.
* Basically, this is to solve an issue where if an instance joins and is the
* lowest numbered alive but not overall will not try to get the failed lower
* numbered one to do elections.
*/
Set<InstanceId> aliveInstances = Iterables.asSet( context.getAlive() );
aliveInstances.removeAll( context.getFailed() );
List<InstanceId> adjustedAlive = Iterables.asList( aliveInstances );
Collections.sort( adjustedAlive );

context.getLog( ElectionState.class ).info( "I am NOT the elector, sending to " + adjustedAlive );
outgoing.offer( message.setHeader( Message.TO, outgoing.offer( message.setHeader( Message.TO,
context.getUriForId( firstOrNull( aliveInstances ) ).toString() ) ); context.getUriForId( firstOrNull( adjustedAlive ) ).toString() ) );
} }
} }
break; break;
Expand Down

0 comments on commit 34c1b31

Please sign in to comment.