Skip to content

Commit

Permalink
Raft pre elections should be compatible with refuse_to_be_leader
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkerr9000 committed Feb 6, 2018
1 parent 905fe9e commit dd114f8
Show file tree
Hide file tree
Showing 14 changed files with 859 additions and 322 deletions.
Expand Up @@ -74,7 +74,6 @@ public enum Timeouts implements TimerService.TimerName

private final LeaderAvailabilityTimers leaderAvailabilityTimers;
private RaftMembershipManager membershipManager;
private final boolean refuseToBecomeLeader;

private final VolatileFuture<MemberId> volatileLeader = new VolatileFuture<>( null );

Expand All @@ -84,10 +83,10 @@ public enum Timeouts implements TimerService.TimerName

private RaftLogShippingManager logShipping;

public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateStorage<VoteState> voteStorage,
RaftLog entryLog, LeaderAvailabilityTimers leaderAvailabilityTimers, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping,
InFlightCache inFlightCache, boolean refuseToBecomeLeader, boolean supportPreVoting, Monitors monitors )
public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateStorage<VoteState> voteStorage, RaftLog entryLog,
LeaderAvailabilityTimers leaderAvailabilityTimers, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LogProvider logProvider,
RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, InFlightCache inFlightCache, boolean refuseToBecomeLeader,
boolean supportPreVoting, Monitors monitors )
{
this.myself = myself;
this.leaderAvailabilityTimers = leaderAvailabilityTimers;
Expand All @@ -97,11 +96,9 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
this.log = logProvider.getLog( getClass() );

this.membershipManager = membershipManager;
this.refuseToBecomeLeader = refuseToBecomeLeader;

this.inFlightCache = inFlightCache;
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache,
logProvider, supportPreVoting );
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache, logProvider, supportPreVoting, refuseToBecomeLeader );

leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );
}
Expand All @@ -113,13 +110,8 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
*/
public synchronized void postRecoveryActions()
{
if ( !refuseToBecomeLeader )
{
leaderAvailabilityTimers.start(
this::electionTimeout,
clock -> handle( RaftMessages.ReceivedInstantAwareMessage.of( clock.instant(), new RaftMessages.Timeout.Heartbeat( myself ) ) )
);
}
leaderAvailabilityTimers.start( this::electionTimeout,
clock -> handle( RaftMessages.ReceivedInstantAwareMessage.of( clock.instant(), new RaftMessages.Timeout.Heartbeat( myself ) ) ) );

inFlightCache.enable();
}
Expand All @@ -139,10 +131,7 @@ private synchronized void electionTimeout( Clock clock ) throws IOException

public void triggerElection( Clock clock ) throws IOException
{
if ( !refuseToBecomeLeader )
{
handle( RaftMessages.ReceivedInstantAwareMessage.of( clock.instant(), new RaftMessages.Timeout.Election( myself ) ) );
}
}

public void panic()
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import org.neo4j.causalclustering.messaging.Message;
Expand Down Expand Up @@ -345,4 +346,35 @@ public Set<MemberId> getPreVotesForMe()
{
return preVotesForMe;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
Outcome outcome = (Outcome) o;
return term == outcome.term && leaderCommit == outcome.leaderCommit && commitIndex == outcome.commitIndex &&
renewElectionTimeout == outcome.renewElectionTimeout && needsFreshSnapshot == outcome.needsFreshSnapshot &&
isPreElection == outcome.isPreElection && lastLogIndexBeforeWeBecameLeader == outcome.lastLogIndexBeforeWeBecameLeader &&
electedLeader == outcome.electedLeader && steppingDown == outcome.steppingDown && nextRole == outcome.nextRole &&
Objects.equals( leader, outcome.leader ) && Objects.equals( logCommands, outcome.logCommands ) &&
Objects.equals( outgoingMessages, outcome.outgoingMessages ) && Objects.equals( votedFor, outcome.votedFor ) &&
Objects.equals( preVotesForMe, outcome.preVotesForMe ) && Objects.equals( votesForMe, outcome.votesForMe ) &&
Objects.equals( followerStates, outcome.followerStates ) && Objects.equals( shipCommands, outcome.shipCommands ) &&
Objects.equals( heartbeatResponses, outcome.heartbeatResponses );
}

@Override
public int hashCode()
{
return Objects.hash( nextRole, term, leader, leaderCommit, logCommands, outgoingMessages, commitIndex, votedFor, renewElectionTimeout,
needsFreshSnapshot, isPreElection, preVotesForMe, votesForMe, lastLogIndexBeforeWeBecameLeader, followerStates, shipCommands, electedLeader,
steppingDown, heartbeatResponses );
}
}

0 comments on commit dd114f8

Please sign in to comment.