Skip to content

Commit

Permalink
Addressing review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Apr 23, 2018
1 parent 871a98e commit ce3d52b
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 46 deletions.
Expand Up @@ -46,6 +46,9 @@ private LeaderInfo( MemberId memberId, long term, boolean isSteppingDown )
this.isSteppingDown = isSteppingDown;
}

/**
* Produces a new LeaderInfo object for a step down event, setting memberId to null but maintaining the current term.
*/
public LeaderInfo stepDown()
{
return new LeaderInfo( null, this.term, true );
Expand Down
Expand Up @@ -19,10 +19,33 @@
*/
package org.neo4j.causalclustering.core.consensus;

import org.neo4j.causalclustering.core.consensus.outcome.Outcome;

public interface LeaderListener
{
void onLeaderSwitch( LeaderInfo leaderInfo );
/**
* Allows listeners to handle a leader step down for the given term.
* Note: actions taken as a result of a step down should typically happen *before* any
* actions taken as a result of the leader switch which has also, implicitly, taken place.
*
* @param stepDownTerm the term in which the the step down event occurred.
*/
default void onLeaderStepDown( long stepDownTerm )
{
}

void onLeaderSwitch( LeaderInfo leaderInfo );

/**
* Standard catch-all method which delegates leader events to their appropriate handlers
* in the appropriate order, i.e. calls step down logic (if necessary) befor leader switch
* logic.
*
* @param outcome The outcome which contains details of the leader event
*/
default void onLeaderEvent( Outcome outcome )
{
outcome.stepDownTerm().ifPresent( this::onLeaderStepDown );
onLeaderSwitch( new LeaderInfo( outcome.getLeader(), outcome.getTerm() ) );
}
}
Expand Up @@ -213,12 +213,9 @@ public synchronized ExposedRaftState state()

private void notifyLeaderChanges( Outcome outcome )
{
LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() );

for ( LeaderListener listener : leaderListeners )
{
outcome.stepDownTerm().ifPresent( listener::onLeaderStepDown );
listener.onLeaderSwitch( leaderInfo );
listener.onLeaderEvent( outcome );
}
}

Expand Down
Expand Up @@ -23,7 +23,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import org.neo4j.causalclustering.messaging.Message;
Expand All @@ -32,7 +32,6 @@
import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState;
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.register.Register;

import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -74,7 +73,7 @@ public class Outcome implements Message, ConsensusOutcome
private FollowerStates<MemberId> followerStates;
private Collection<ShipCommand> shipCommands = new ArrayList<>();
private boolean electedLeader;
private Optional<Long> steppingDownInTerm;
private OptionalLong steppingDownInTerm;
private Set<MemberId> heartbeatResponses;

public Outcome( Role currentRole, ReadableRaftState ctx )
Expand Down Expand Up @@ -105,7 +104,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me
this.shipCommands.addAll( shipCommands );
this.commitIndex = commitIndex;
this.isPreElection = isPreElection;
this.steppingDownInTerm = Optional.empty();
this.steppingDownInTerm = OptionalLong.empty();
}

private void defaults( Role currentRole, ReadableRaftState ctx )
Expand All @@ -122,7 +121,7 @@ private void defaults( Role currentRole, ReadableRaftState ctx )
needsFreshSnapshot = false;

isPreElection = (currentRole == Role.FOLLOWER) && ctx.isPreElection();
steppingDownInTerm = Optional.empty();
steppingDownInTerm = OptionalLong.empty();
preVotesForMe = isPreElection ? new HashSet<>( ctx.preVotesForMe() ) : emptySet();
votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : emptySet();
heartbeatResponses = (currentRole == Role.LEADER) ? new HashSet<>( ctx.heartbeatResponses() ) : emptySet();
Expand Down Expand Up @@ -207,7 +206,7 @@ public void electedLeader()
public void steppingDown( long stepDownTerm )
{
assert !electedLeader;
steppingDownInTerm = Optional.of( stepDownTerm );
steppingDownInTerm = OptionalLong.of( stepDownTerm );
}

@Override
Expand All @@ -230,7 +229,7 @@ public String toString()
", followerStates=" + followerStates +
", shipCommands=" + shipCommands +
", electedLeader=" + electedLeader +
", steppingDown=" + steppingDownInTerm.map( t -> "true, steppingDownInTerm=" + t ).orElse( "false" ) +
", steppingDownInTerm=" + steppingDownInTerm +
'}';
}

Expand Down Expand Up @@ -310,7 +309,7 @@ public boolean isSteppingDown()
return steppingDownInTerm.isPresent();
}

public Optional<Long> stepDownTerm()
public OptionalLong stepDownTerm()
{
return steppingDownInTerm;
}
Expand Down Expand Up @@ -371,12 +370,12 @@ public boolean equals( Object o )
return term == outcome.term && leaderCommit == outcome.leaderCommit && commitIndex == outcome.commitIndex &&
renewElectionTimeout == outcome.renewElectionTimeout && needsFreshSnapshot == outcome.needsFreshSnapshot &&
isPreElection == outcome.isPreElection && lastLogIndexBeforeWeBecameLeader == outcome.lastLogIndexBeforeWeBecameLeader &&
electedLeader == outcome.electedLeader && nextRole == outcome.nextRole && Objects.equals( steppingDownInTerm, outcome.steppingDownInTerm ) &&
Objects.equals( leader, outcome.leader ) && Objects.equals( logCommands, outcome.logCommands ) &&
Objects.equals( outgoingMessages, outcome.outgoingMessages ) && Objects.equals( votedFor, outcome.votedFor ) &&
Objects.equals( preVotesForMe, outcome.preVotesForMe ) && Objects.equals( votesForMe, outcome.votesForMe ) &&
Objects.equals( followerStates, outcome.followerStates ) && Objects.equals( shipCommands, outcome.shipCommands ) &&
Objects.equals( heartbeatResponses, outcome.heartbeatResponses );
electedLeader == outcome.electedLeader && nextRole == outcome.nextRole &&
Objects.equals( steppingDownInTerm, outcome.steppingDownInTerm ) && Objects.equals( leader, outcome.leader ) &&
Objects.equals( logCommands, outcome.logCommands ) && Objects.equals( outgoingMessages, outcome.outgoingMessages ) &&
Objects.equals( votedFor, outcome.votedFor ) && Objects.equals( preVotesForMe, outcome.preVotesForMe ) &&
Objects.equals( votesForMe, outcome.votesForMe ) && Objects.equals( followerStates, outcome.followerStates ) &&
Objects.equals( shipCommands, outcome.shipCommands ) && Objects.equals( heartbeatResponses, outcome.heartbeatResponses );
}

@Override
Expand Down
Expand Up @@ -87,7 +87,7 @@ public Outcome handle( Heartbeat heartbeat ) throws IOException
return outcome;
}

stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " + "%d) from %s",
heartbeat.leaderTerm(), ctx.term(), heartbeat.from() );
Heart.beat( ctx, outcome, heartbeat, log );
Expand All @@ -113,7 +113,7 @@ public Outcome handle( RaftMessages.Timeout.Election election )
{
if ( !isQuorum( ctx.votingMembers().size(), ctx.heartbeatResponses().size() ) )
{
stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info( "Moving to FOLLOWER state after not receiving heartbeat responses in this election timeout " +
"period. Heartbeats received: %s", ctx.heartbeatResponses() );
}
Expand Down Expand Up @@ -142,7 +142,7 @@ else if ( req.leaderTerm() == ctx.term() )
else
{
// There is a new leader in a later term, we should revert to follower. (§5.1)
stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
Expand All @@ -162,7 +162,7 @@ public Outcome handle( RaftMessages.AppendEntries.Response response ) throws IOE
else if ( response.term() > ctx.term() )
{
outcome.setNextTerm( response.term() );
stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " +
"%d) from %s", response.term(), ctx.term(), response.from() );
outcome.replaceFollowerStates( new FollowerStates<>() );
Expand Down Expand Up @@ -235,7 +235,7 @@ public Outcome handle( RaftMessages.Vote.Request req ) throws IOException
{
if ( req.term() > ctx.term() )
{
stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info(
"Moving to FOLLOWER state after receiving vote request at term %d (my term is " + "%d) from %s",
req.term(), ctx.term(), req.from() );
Expand Down Expand Up @@ -286,7 +286,7 @@ public Outcome handle( RaftMessages.PreVote.Request req ) throws IOException
{
if ( req.term() > ctx.term() )
{
stepDownToFollower( outcome, ctx.term() );
stepDownToFollower( outcome, ctx );
log.info( "Moving to FOLLOWER state after receiving pre vote request from %s at term %d (I am at %d)",
req.from(), req.term(), ctx.term() );
}
Expand All @@ -307,9 +307,9 @@ public Outcome handle( LogCompactionInfo logCompactionInfo )
return outcome;
}

private void stepDownToFollower( Outcome outcome, long inTerm )
private void stepDownToFollower( Outcome outcome, ReadableRaftState raftState )
{
outcome.steppingDown( inTerm );
outcome.steppingDown( raftState.term() );
outcome.setNextRole( FOLLOWER );
outcome.setLeader( null );
}
Expand Down
Expand Up @@ -213,26 +213,22 @@ private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance haz
return result;
}

static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName, Log log )
static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName )
{
IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );

LeaderInfo current = leaderRef.get();
Optional<LeaderInfo> currentOpt = Optional.ofNullable( current );

boolean noUpdate = currentOpt.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );
boolean sameLeader = currentOpt.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );

int termComparison = currentOpt.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 );

boolean greaterTermExists = termComparison > 0;

boolean sameTermButNoStepdown = termComparison == 0 && !leaderInfo.isSteppingDown();

boolean invalidTerm = greaterTermExists || sameTermButNoStepdown;

boolean success = !( invalidTerm || noUpdate );

if ( !success )
if ( sameLeader || greaterTermExists || sameTermButNoStepdown )
{
return;
}
Expand Down
Expand Up @@ -146,9 +146,9 @@ public void handleStepDown( long term, String dbName )
boolean wasLeaderForTerm = Objects.equals( myself, leaderInfo.memberId() ) && term == leaderInfo.term();
if ( wasLeaderForTerm )
{
log.info( String.format( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() ) );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo.stepDown(), dbName, log);
log.info( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo.stepDown(), dbName );
}
}

Expand Down Expand Up @@ -368,7 +368,7 @@ private void refreshRoles() throws InterruptedException
if ( leaderInfo.memberId() != null && leaderInfo.memberId().equals( myself ) )
{
log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, leaderInfo.term() );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName, log);
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName );
}

coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
Expand Down
Expand Up @@ -132,17 +132,15 @@ void casLeaders( LeaderInfo leaderInfo, String dbName )
{
Optional<LeaderInfo> current = Optional.ofNullable( leaderMap.get( dbName ) );

boolean noUpdate = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );
boolean sameLeader = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );

int termComparison = current.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 );

boolean greaterTermExists = termComparison > 0;

boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() );
boolean sameTermButNoStepDown = termComparison == 0 && !leaderInfo.isSteppingDown();

boolean success = !( invalidTerm || noUpdate);

if ( success )
if ( !( greaterTermExists || sameTermButNoStepDown || sameLeader ) )
{
leaderMap.put( dbName, leaderInfo );
}
Expand Down
Expand Up @@ -23,19 +23,20 @@
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -57,6 +58,7 @@
import org.neo4j.internal.kernel.api.exceptions.ProcedureException;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AnonymousContext;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.test.causalclustering.ClusterRule;

Expand All @@ -66,6 +68,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.neo4j.causalclustering.discovery.RoleInfo.FOLLOWER;
import static org.neo4j.causalclustering.discovery.RoleInfo.LEADER;
import static org.neo4j.causalclustering.discovery.RoleInfo.READ_REPLICA;
Expand Down Expand Up @@ -271,10 +274,10 @@ public void shouldDiscoverRemovalOfCoreMembers() throws Exception
}

@Test
public void shouldHaveNoLeaderIfMajorityCoreMembersDead() throws Exception
public void shouldDiscoverTimeoutBasedLeaderStepdown() throws Exception
{
clusterRule.withNumberOfCoreMembers( 3 );
clusterRule.withNumberOfCoreMembers( 2 );
clusterRule.withNumberOfReadReplicas( 2 );

Cluster cluster = clusterRule.startCluster();
List<CoreClusterMember> followers = cluster.getAllMembersWithRole( Role.FOLLOWER );
Expand All @@ -284,6 +287,27 @@ public void shouldHaveNoLeaderIfMajorityCoreMembersDead() throws Exception
assertEventualOverview( cluster, containsRole( LEADER, 0 ), leader.serverId() );
}

@Test
public void shouldDiscoverGreaterTermBasedLeaderStepdown() throws Exception
{
int originalCoreMembers = 3;
clusterRule.withNumberOfCoreMembers( originalCoreMembers );

Cluster cluster = clusterRule.startCluster();
CoreClusterMember leader = cluster.awaitLeader();
leader.config().augment( CausalClusteringSettings.refuse_to_be_leader, Settings.TRUE );

List<MemberInfo> preElectionOverview = clusterOverview( leader.database() );

CoreClusterMember follower = cluster.getMemberWithRole( Role.FOLLOWER );
follower.raft().triggerElection( Clock.systemUTC() );

assertEventualOverview( cluster, allOf(
containsRole( LEADER, 1 ),
containsRole( FOLLOWER, originalCoreMembers - 1 ),
not( equalTo( preElectionOverview ) ) ), leader.serverId() );
}

private void assertEventualOverview( Cluster cluster, Matcher<List<MemberInfo>> expected, int coreServerId )
throws KernelException, InterruptedException
{
Expand Down

0 comments on commit ce3d52b

Please sign in to comment.