Skip to content

Commit

Permalink
Preserve step down term when old leader steps down in new term
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Apr 23, 2018
1 parent 29d491c commit 871a98e
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 40 deletions.
Expand Up @@ -39,13 +39,18 @@ public LeaderInfo( MemberId memberId, long term )
this( memberId, term, false );
}

public LeaderInfo( MemberId memberId, long term, boolean isSteppingDown )
private LeaderInfo( MemberId memberId, long term, boolean isSteppingDown )
{
this.memberId = memberId;
this.term = term;
this.isSteppingDown = isSteppingDown;
}

public LeaderInfo stepDown()
{
return new LeaderInfo( null, this.term, true );
}

public boolean isSteppingDown()
{
return isSteppingDown;
Expand Down
Expand Up @@ -22,7 +22,7 @@
public interface LeaderListener
{
void onLeaderSwitch( LeaderInfo leaderInfo );
default void onLeaderStepDown( LeaderInfo leaderInfo )
default void onLeaderStepDown( long stepDownTerm )
{
}
}
Expand Up @@ -213,14 +213,11 @@ public synchronized ExposedRaftState state()

private void notifyLeaderChanges( Outcome outcome )
{
LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm(), outcome.isSteppingDown() );
LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() );

for ( LeaderListener listener : leaderListeners )
{
if ( outcome.isSteppingDown() )
{
listener.onLeaderStepDown( leaderInfo );
}
outcome.stepDownTerm().ifPresent( listener::onLeaderStepDown );
listener.onLeaderSwitch( leaderInfo );
}
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import org.neo4j.causalclustering.messaging.Message;
Expand All @@ -31,6 +32,7 @@
import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState;
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.register.Register;

import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -72,7 +74,7 @@ public class Outcome implements Message, ConsensusOutcome
private FollowerStates<MemberId> followerStates;
private Collection<ShipCommand> shipCommands = new ArrayList<>();
private boolean electedLeader;
private boolean steppingDown;
private Optional<Long> steppingDownInTerm;
private Set<MemberId> heartbeatResponses;

public Outcome( Role currentRole, ReadableRaftState ctx )
Expand Down Expand Up @@ -103,6 +105,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me
this.shipCommands.addAll( shipCommands );
this.commitIndex = commitIndex;
this.isPreElection = isPreElection;
this.steppingDownInTerm = Optional.empty();
}

private void defaults( Role currentRole, ReadableRaftState ctx )
Expand All @@ -119,6 +122,7 @@ private void defaults( Role currentRole, ReadableRaftState ctx )
needsFreshSnapshot = false;

isPreElection = (currentRole == Role.FOLLOWER) && ctx.isPreElection();
steppingDownInTerm = Optional.empty();
preVotesForMe = isPreElection ? new HashSet<>( ctx.preVotesForMe() ) : emptySet();
votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : emptySet();
heartbeatResponses = (currentRole == Role.LEADER) ? new HashSet<>( ctx.heartbeatResponses() ) : emptySet();
Expand Down Expand Up @@ -196,14 +200,14 @@ public void addShipCommand( ShipCommand shipCommand )

public void electedLeader()
{
assert !steppingDown;
assert !isSteppingDown();
this.electedLeader = true;
}

public void steppingDown()
public void steppingDown( long stepDownTerm )
{
assert !electedLeader;
this.steppingDown = true;
steppingDownInTerm = Optional.of( stepDownTerm );
}

@Override
Expand All @@ -226,7 +230,7 @@ public String toString()
", followerStates=" + followerStates +
", shipCommands=" + shipCommands +
", electedLeader=" + electedLeader +
", steppingDown=" + steppingDown +
", steppingDown=" + steppingDownInTerm.map( t -> "true, steppingDownInTerm=" + t ).orElse( "false" ) +
'}';
}

Expand Down Expand Up @@ -303,7 +307,12 @@ public boolean isElectedLeader()

public boolean isSteppingDown()
{
return steppingDown;
return steppingDownInTerm.isPresent();
}

public Optional<Long> stepDownTerm()
{
return steppingDownInTerm;
}

@Override
Expand Down Expand Up @@ -362,7 +371,7 @@ public boolean equals( Object o )
return term == outcome.term && leaderCommit == outcome.leaderCommit && commitIndex == outcome.commitIndex &&
renewElectionTimeout == outcome.renewElectionTimeout && needsFreshSnapshot == outcome.needsFreshSnapshot &&
isPreElection == outcome.isPreElection && lastLogIndexBeforeWeBecameLeader == outcome.lastLogIndexBeforeWeBecameLeader &&
electedLeader == outcome.electedLeader && steppingDown == outcome.steppingDown && nextRole == outcome.nextRole &&
electedLeader == outcome.electedLeader && nextRole == outcome.nextRole && Objects.equals( steppingDownInTerm, outcome.steppingDownInTerm ) &&
Objects.equals( leader, outcome.leader ) && Objects.equals( logCommands, outcome.logCommands ) &&
Objects.equals( outgoingMessages, outcome.outgoingMessages ) && Objects.equals( votedFor, outcome.votedFor ) &&
Objects.equals( preVotesForMe, outcome.preVotesForMe ) && Objects.equals( votesForMe, outcome.votesForMe ) &&
Expand All @@ -375,6 +384,6 @@ public int hashCode()
{
return Objects.hash( nextRole, term, leader, leaderCommit, logCommands, outgoingMessages, commitIndex, votedFor, renewElectionTimeout,
needsFreshSnapshot, isPreElection, preVotesForMe, votesForMe, lastLogIndexBeforeWeBecameLeader, followerStates, shipCommands, electedLeader,
steppingDown, heartbeatResponses );
steppingDownInTerm, heartbeatResponses );
}
}
Expand Up @@ -87,7 +87,7 @@ public Outcome handle( Heartbeat heartbeat ) throws IOException
return outcome;
}

stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " + "%d) from %s",
heartbeat.leaderTerm(), ctx.term(), heartbeat.from() );
Heart.beat( ctx, outcome, heartbeat, log );
Expand All @@ -113,7 +113,7 @@ public Outcome handle( RaftMessages.Timeout.Election election )
{
if ( !isQuorum( ctx.votingMembers().size(), ctx.heartbeatResponses().size() ) )
{
stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info( "Moving to FOLLOWER state after not receiving heartbeat responses in this election timeout " +
"period. Heartbeats received: %s", ctx.heartbeatResponses() );
}
Expand Down Expand Up @@ -142,7 +142,7 @@ else if ( req.leaderTerm() == ctx.term() )
else
{
// There is a new leader in a later term, we should revert to follower. (§5.1)
stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
Expand All @@ -162,7 +162,7 @@ public Outcome handle( RaftMessages.AppendEntries.Response response ) throws IOE
else if ( response.term() > ctx.term() )
{
outcome.setNextTerm( response.term() );
stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " +
"%d) from %s", response.term(), ctx.term(), response.from() );
outcome.replaceFollowerStates( new FollowerStates<>() );
Expand Down Expand Up @@ -235,7 +235,7 @@ public Outcome handle( RaftMessages.Vote.Request req ) throws IOException
{
if ( req.term() > ctx.term() )
{
stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info(
"Moving to FOLLOWER state after receiving vote request at term %d (my term is " + "%d) from %s",
req.term(), ctx.term(), req.from() );
Expand Down Expand Up @@ -286,7 +286,7 @@ public Outcome handle( RaftMessages.PreVote.Request req ) throws IOException
{
if ( req.term() > ctx.term() )
{
stepDownToFollower( outcome );
stepDownToFollower( outcome, ctx.term() );
log.info( "Moving to FOLLOWER state after receiving pre vote request from %s at term %d (I am at %d)",
req.from(), req.term(), ctx.term() );
}
Expand All @@ -307,9 +307,9 @@ public Outcome handle( LogCompactionInfo logCompactionInfo )
return outcome;
}

private void stepDownToFollower( Outcome outcome )
private void stepDownToFollower( Outcome outcome, long inTerm )
{
outcome.steppingDown();
outcome.steppingDown( inTerm );
outcome.setNextRole( FOLLOWER );
outcome.setLeader( null );
}
Expand Down
Expand Up @@ -56,10 +56,10 @@ public interface CoreTopologyService extends TopologyService
* Set the leader memberId to null for a given database (i.e. Raft consensus group).
* This is intended to trigger state cleanup for informational procedures like {@link ClusterOverviewProcedure}
*
* @param stepDownLeaderInfo Information about the stepdown event, including term
* @param term The term for which this topology member should handle a stepdown
* @param dbName The database for which this topology member should handle a stepdown
*/
void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName );
void handleStepDown( long term, String dbName );

interface Listener
{
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.discovery;

import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicReference;
import com.hazelcast.core.IMap;
Expand Down Expand Up @@ -212,7 +213,7 @@ private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance haz
return result;
}

static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName )
static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName, Log log )
{
IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );

Expand All @@ -225,9 +226,11 @@ static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderIn

boolean greaterTermExists = termComparison > 0;

boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() );
boolean sameTermButNoStepdown = termComparison == 0 && !leaderInfo.isSteppingDown();

boolean success = !( invalidTerm || noUpdate);
boolean invalidTerm = greaterTermExists || sameTermButNoStepdown;

boolean success = !( invalidTerm || noUpdate );

if ( !success )
{
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand All @@ -34,7 +35,6 @@
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
Expand Down Expand Up @@ -141,12 +141,14 @@ public void setLeader( LeaderInfo leaderInfo, String dbName )
}

@Override
public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName )
public void handleStepDown( long term, String dbName )
{
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader )
boolean wasLeaderForTerm = Objects.equals( myself, leaderInfo.memberId() ) && term == leaderInfo.term();
if ( wasLeaderForTerm )
{
HazelcastClusterTopology.casLeaders( hazelcastInstance, stepDownLeaderInfo, dbName );
log.info( String.format( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() ) );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo.stepDown(), dbName, log);
}
}

Expand Down Expand Up @@ -365,7 +367,8 @@ private void refreshRoles() throws InterruptedException

if ( leaderInfo.memberId() != null && leaderInfo.memberId().equals( myself ) )
{
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName );
log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, leaderInfo.term() );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName, log);
}

coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
Expand Down
Expand Up @@ -64,9 +64,9 @@ public void onLeaderSwitch( LeaderInfo leaderInfo )
}

@Override
public void onLeaderStepDown( LeaderInfo leaderInfo )
public void onLeaderStepDown( long stepDownTerm )
{
coreTopologyService.handleStepDown( leaderInfo, dbName );
coreTopologyService.handleStepDown( stepDownTerm, dbName );
}

@Override
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.discovery;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand Down Expand Up @@ -143,12 +144,14 @@ public CoreTopology allCoreServers()
}

@Override
public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName )
public void handleStepDown( long stepDownTerm, String dbName )
{
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader )
boolean wasLeaderForTerm = Objects.equals( myself, leaderInfo.memberId() ) && stepDownTerm == leaderInfo.term();
if ( wasLeaderForTerm )
{
sharedDiscoveryService.casLeaders( stepDownLeaderInfo, dbName );
log.info( String.format( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() ) );
sharedDiscoveryService.casLeaders( leaderInfo.stepDown(), dbName );
}
}

Expand Down

0 comments on commit 871a98e

Please sign in to comment.