Skip to content

Commit

Permalink
Merge pull request #9027 from martinfurmanski/3.1-stepdown-cleanup
Browse files Browse the repository at this point in the history
align and improve heartbeat response handling
  • Loading branch information
jimwebber committed Mar 15, 2017
2 parents 1dd0163 + 657736e commit 5d3489c
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 66 deletions.
Expand Up @@ -31,6 +31,8 @@
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates;
import org.neo4j.causalclustering.identity.MemberId;

import static java.util.Collections.emptySet;

/**
* Holds the outcome of a RAFT role's handling of a message. The role handling logic is stateless
* and responds to RAFT messages in the context of a supplied state. The outcome is later consumed
Expand Down Expand Up @@ -68,6 +70,7 @@ public class Outcome implements Message, ConsensusOutcome
private Collection<ShipCommand> shipCommands = new ArrayList<>();
private boolean electedLeader;
private boolean steppingDown;
private Set<MemberId> heartbeatResponses;

public Outcome( Role currentRole, ReadableRaftState ctx )
{
Expand All @@ -78,7 +81,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me
Set<MemberId> votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates<MemberId> followerStates, boolean renewElectionTimeout,
Collection<RaftLogCommand> logCommands, Collection<RaftMessages.Directed> outgoingMessages,
Collection<ShipCommand> shipCommands, long commitIndex )
Collection<ShipCommand> shipCommands, long commitIndex, Set<MemberId> heartbeatResponses )
{
this.nextRole = nextRole;
this.term = term;
Expand All @@ -89,6 +92,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me
this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader;
this.followerStates = followerStates;
this.renewElectionTimeout = renewElectionTimeout;
this.heartbeatResponses = new HashSet<>( heartbeatResponses );

this.logCommands.addAll( logCommands );
this.outgoingMessages.addAll( outgoingMessages );
Expand All @@ -109,7 +113,8 @@ private void defaults( Role currentRole, ReadableRaftState ctx )
renewElectionTimeout = false;
needsFreshSnapshot = false;

votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : new HashSet<>();
votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : emptySet();
heartbeatResponses = (currentRole == Role.LEADER) ? new HashSet<>( ctx.heartbeatResponses() ) : emptySet();

lastLogIndexBeforeWeBecameLeader = (currentRole == Role.LEADER) ? ctx.lastLogIndexBeforeWeBecameLeader() : -1;
followerStates = (currentRole == Role.LEADER) ? ctx.followerStates() : new FollowerStates<>();
Expand Down Expand Up @@ -303,4 +308,14 @@ public void setCommitIndex( long commitIndex )
{
this.commitIndex = commitIndex;
}

public void addHeartbeatResponse( MemberId from )
{
this.heartbeatResponses.add( from );
}

public Set<MemberId> getHeartbeatResponses()
{
return heartbeatResponses;
}
}
Expand Up @@ -103,12 +103,13 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )

if ( isQuorum( ctx.votingMembers().size(), outcome.getVotesForMe().size() ) )
{

outcome.setLeader( ctx.myself() );
Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() );
Leader.sendHeartbeats( ctx, outcome );

outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() );
outcome.electedLeader();
outcome.renewElectionTimeout();
outcome.setNextRole( LEADER );
log.info( "Moving to LEADER state at term %d (I am %s), voted for by %s",
ctx.term(), ctx.myself(), outcome.getVotesForMe() );
Expand Down
Expand Up @@ -20,10 +20,7 @@
package org.neo4j.causalclustering.core.consensus.roles;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.neo4j.causalclustering.core.consensus.Followers;
import org.neo4j.causalclustering.core.consensus.RaftMessageHandler;
Expand All @@ -47,15 +44,12 @@

public class Leader implements RaftMessageHandler
{
private final Set<MemberId> heartbeatResponses = Collections.synchronizedSet( new HashSet<>() );
private boolean receivedHeartbeats = false;

private static Iterable<MemberId> replicationTargets( final ReadableRaftState ctx )
{
return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) );
}

private static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOException
static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOException
{
long commitIndex = ctx.commitIndex();
long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex );
Expand Down Expand Up @@ -97,24 +91,20 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx,

case HEARTBEAT_RESPONSE:
{
receivedHeartbeats = true;
heartbeatResponses.add( message.from() );
outcome.addHeartbeatResponse( message.from() );
break;
}

case ELECTION_TIMEOUT:
{
if ( !isQuorum( ctx.votingMembers().size(), heartbeatResponses.size() ) && receivedHeartbeats )
if ( !isQuorum( ctx.votingMembers().size(), ctx.heartbeatResponses().size() ) )
{
stepDownToFollower( outcome );
log.info( "Moving to FOLLOWER state after not receiving heartbeat responses in this election timeout " +
"period. Heartbeats received: %s", heartbeatResponses );

}
else
{
heartbeatResponses.clear();
"period. Heartbeats received: %s", ctx.heartbeatResponses() );
}

outcome.getHeartbeatResponses().clear();
break;
}

Expand Down Expand Up @@ -277,8 +267,6 @@ else if ( response.term() > ctx.term() )

private void stepDownToFollower( Outcome outcome )
{
heartbeatResponses.clear();
receivedHeartbeats = false;
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
outcome.setLeader( null );
Expand Down
Expand Up @@ -53,6 +53,7 @@ public class RaftState implements ReadableRaftState

private MemberId leader;
private Set<MemberId> votesForMe = new HashSet<>();
private Set<MemberId> heartbeatResponses = new HashSet<>();
private FollowerStates<MemberId> followerStates = new FollowerStates<>();
private long leaderCommit = -1;
private long commitIndex = -1;
Expand Down Expand Up @@ -140,6 +141,12 @@ public Set<MemberId> votesForMe()
return votesForMe;
}

@Override
public Set<MemberId> heartbeatResponses()
{
return heartbeatResponses;
}

@Override
public long lastLogIndexBeforeWeBecameLeader()
{
Expand Down Expand Up @@ -180,6 +187,7 @@ public void update( Outcome outcome ) throws IOException

leaderCommit = outcome.getLeaderCommit();
votesForMe = outcome.getVotesForMe();
heartbeatResponses = outcome.getHeartbeatResponses();
lastLogIndexBeforeWeBecameLeader = outcome.getLastLogIndexBeforeWeBecameLeader();
followerStates = outcome.getFollowerStates();

Expand Down
Expand Up @@ -43,6 +43,8 @@ public interface ReadableRaftState

Set<MemberId> votesForMe();

Set<MemberId> heartbeatResponses();

long lastLogIndexBeforeWeBecameLeader();

FollowerStates<MemberId> followerStates();
Expand Down
Expand Up @@ -42,21 +42,22 @@
public class ComparableRaftState implements ReadableRaftState
{
protected final MemberId myself;
private final Set votingMembers;
private final Set replicationMembers;
private final Set<MemberId> votingMembers;
private final Set<MemberId> replicationMembers;
private final Log log;
protected long term = 0;
protected MemberId leader;
private long leaderCommit = -1;
private MemberId votedFor = null;
private Set votesForMe = new HashSet<>();
private Set<MemberId> votesForMe = new HashSet<>();
private Set<MemberId> heartbeatResponses = new HashSet<>();
private long lastLogIndexBeforeWeBecameLeader = -1;
private FollowerStates followerStates = new FollowerStates<>();
private FollowerStates<MemberId> followerStates = new FollowerStates<>();
protected final RaftLog entryLog;
private final InFlightMap<RaftLogEntry> inFlightMap;
private long commitIndex = -1;

ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers,
ComparableRaftState( MemberId myself, Set<MemberId> votingMembers, Set<MemberId> replicationMembers,
RaftLog entryLog, InFlightMap<RaftLogEntry> inFlightMap, LogProvider logProvider )
{
this.myself = myself;
Expand All @@ -67,7 +68,7 @@ public class ComparableRaftState implements ReadableRaftState
this.log = logProvider.getLog( getClass() );
}

public ComparableRaftState( ReadableRaftState original ) throws IOException
ComparableRaftState( ReadableRaftState original ) throws IOException
{
this( original.myself(), original.votingMembers(), original.replicationMembers(),
new ComparableRaftLog( original.entryLog() ), new InFlightMap<>(), NullLogProvider.getInstance() );
Expand All @@ -80,13 +81,13 @@ public MemberId myself()
}

@Override
public Set votingMembers()
public Set<MemberId> votingMembers()
{
return votingMembers;
}

@Override
public Set replicationMembers()
public Set<MemberId> replicationMembers()
{
return replicationMembers;
}
Expand Down Expand Up @@ -116,19 +117,25 @@ public MemberId votedFor()
}

@Override
public Set votesForMe()
public Set<MemberId> votesForMe()
{
return votesForMe;
}

@Override
public Set<MemberId> heartbeatResponses()
{
return heartbeatResponses;
}

@Override
public long lastLogIndexBeforeWeBecameLeader()
{
return lastLogIndexBeforeWeBecameLeader;
}

@Override
public FollowerStates followerStates()
public FollowerStates<MemberId> followerStates()
{
return followerStates;
}
Expand Down
Expand Up @@ -21,23 +21,24 @@

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;

import org.neo4j.causalclustering.core.consensus.NewLeaderBarrier;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.causalclustering.core.consensus.outcome.AppendLogEntry;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
import org.neo4j.causalclustering.core.consensus.state.RaftState;
import org.neo4j.causalclustering.core.consensus.state.RaftStateBuilder;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.neo4j.causalclustering.core.consensus.TestMessageBuilders.voteResponse;
Expand All @@ -52,17 +53,20 @@ public class CandidateTest
{
private MemberId myself = member( 0 );
private MemberId member1 = member( 1 );

@Mock
private Inbound inbound;
private MemberId member2 = member( 2 );

private LogProvider logProvider = NullLogProvider.getInstance();

@Test
public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm() throws Exception
{
// given
RaftState state = newState();
RaftState state = RaftStateBuilder.raftState()
.term( 1 )
.myself( myself )
.votingMembers( member1, member2 )
.replicationMembers( member1, member2 )
.build();

// when
Outcome outcome = CANDIDATE.handler.handle( voteResponse()
Expand All @@ -73,8 +77,13 @@ public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm()

// then
assertEquals( LEADER, outcome.getRole() );
assertEquals( true, outcome.electionTimeoutRenewed() );
assertThat( outcome.getLogCommands(), hasItem( new AppendLogEntry( 0,
new RaftLogEntry( state.term(), new NewLeaderBarrier() ) ) ) );
assertThat( outcome.getOutgoingMessages(), hasItems(
new RaftMessages.Directed( member1, new RaftMessages.Heartbeat( myself, state.term(), -1, -1 ) ),
new RaftMessages.Directed( member2, new RaftMessages.Heartbeat( myself, state.term(), -1, -1 ) ) )
);
}

@Test
Expand Down
Expand Up @@ -353,7 +353,8 @@ public void leaderShouldSendHeartbeatsToAllClusterMembersOnReceiptOfHeartbeatTic
{
// given
RaftState state = raftState()
.votingMembers( asSet( myself, member1, member2 ) )
.votingMembers( myself, member1, member2 )
.replicationMembers( myself, member1, member2 )
.build();

Leader leader = new Leader();
Expand All @@ -377,7 +378,6 @@ public void leaderShouldStepDownWhenLackingHeartbeatResponses() throws Exception
.build();

Leader leader = new Leader();
leader.handle( new RaftMessages.HeartbeatResponse( myself ), state, log() );
leader.handle( new RaftMessages.Timeout.Election( myself ), state, log() );

// when
Expand All @@ -399,13 +399,41 @@ public void leaderShouldNotStepDownWhenReceivedQuorumOfHeartbeatResponses() thro
Leader leader = new Leader();

// when
leader.handle( new RaftMessages.HeartbeatResponse( member1 ), state, log() ); //Now we have quorum.
Outcome outcome = leader.handle( new RaftMessages.Timeout.Election( myself ), state, log() );
Outcome outcome = leader.handle( new RaftMessages.HeartbeatResponse( member1 ), state, log() );
state.update( outcome );

// we now have quorum and should not step down
outcome = leader.handle( new RaftMessages.Timeout.Election( myself ), state, log() );

// then
assertThat( outcome.getRole(), is( LEADER ) );
}

@Test
public void oldHeartbeatResponseShouldNotPreventStepdown() throws Exception
{
// given
RaftState state = raftState()
.votingMembers( asSet( myself, member1, member2 ) )
.build();

Leader leader = new Leader();

Outcome outcome = leader.handle( new RaftMessages.HeartbeatResponse( member1 ), state, log() );
state.update( outcome );

outcome = leader.handle( new RaftMessages.Timeout.Election( myself ), state, log() );
state.update( outcome );

assertThat( outcome.getRole(), is( LEADER ) );

// when
outcome = leader.handle( new RaftMessages.Timeout.Election( myself ), state, log() );

// then
assertThat( outcome.getRole(), is( FOLLOWER ) );
}

@Test
public void leaderShouldDecideToAppendToItsLogAndSendAppendEntriesMessageOnReceiptOfClientProposal()
throws Exception
Expand Down

0 comments on commit 5d3489c

Please sign in to comment.