Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Apr 19, 2018
1 parent dd58c92 commit 7270fd9
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 39 deletions.
Expand Up @@ -32,23 +32,23 @@ public class LeaderInfo implements Serializable


private final MemberId memberId; private final MemberId memberId;
private final long term; private final long term;
private boolean isStepDown; private boolean isSteppingDown;


public LeaderInfo( MemberId memberId, long term ) public LeaderInfo( MemberId memberId, long term )
{ {
this.memberId = memberId; this( memberId, term, false );
this.term = term;
this.isStepDown = false;
} }


public void stepDown() public LeaderInfo( MemberId memberId, long term, boolean isSteppingDown )
{ {
this.isStepDown = true; this.memberId = memberId;
this.term = term;
this.isSteppingDown = isSteppingDown;
} }


public boolean isStepDown() public boolean isSteppingDown()
{ {
return isStepDown; return isSteppingDown;
} }


public MemberId memberId() public MemberId memberId()
Expand Down
Expand Up @@ -213,17 +213,11 @@ public synchronized ExposedRaftState state()


private void notifyLeaderChanges( Outcome outcome ) private void notifyLeaderChanges( Outcome outcome )
{ {
LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() ); LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm(), outcome.isSteppingDown() );
boolean isStepDown = outcome.isSteppingDown();

if ( isStepDown )
{
leaderInfo.stepDown();
}


for ( LeaderListener listener : leaderListeners ) for ( LeaderListener listener : leaderListeners )
{ {
if ( isStepDown ) if ( outcome.isSteppingDown() )
{ {
listener.onLeaderStepDown( leaderInfo ); listener.onLeaderStepDown( leaderInfo );
} }
Expand Down Expand Up @@ -251,10 +245,7 @@ else if ( outcome.isSteppingDown() )


private boolean leaderChanged( Outcome outcome, MemberId oldLeader ) private boolean leaderChanged( Outcome outcome, MemberId oldLeader )
{ {
Optional<MemberId> oldLeaderOpt = Optional.ofNullable( oldLeader ); return !Objects.equals( oldLeader, outcome.getLeader() );
Optional<MemberId> outcomeLeaderOpt = Optional.ofNullable( outcome.getLeader() );

return !oldLeaderOpt.equals( outcomeLeaderOpt );
} }


public synchronized ConsensusOutcome handle( RaftMessages.RaftMessage incomingMessage ) throws IOException public synchronized ConsensusOutcome handle( RaftMessages.RaftMessage incomingMessage ) throws IOException
Expand Down
Expand Up @@ -56,10 +56,10 @@ public interface CoreTopologyService extends TopologyService
* Set the leader memberId to null for a given database (i.e. Raft consensus group). * Set the leader memberId to null for a given database (i.e. Raft consensus group).
* This is intended to trigger state cleanup for informational procedures like {@link ClusterOverviewProcedure} * This is intended to trigger state cleanup for informational procedures like {@link ClusterOverviewProcedure}
* *
* @param leaderInfo * @param stepDownLeaderInfo Information about the stepdown event, including term
* @param dbName * @param dbName The database for which this topology member should handle a stepdown
*/ */
void handleStepDown( LeaderInfo leaderInfo, String dbName ); void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName );


interface Listener interface Listener
{ {
Expand Down
Expand Up @@ -216,18 +216,25 @@ static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderIn
{ {
IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );


LeaderInfo expected = leaderRef.get(); LeaderInfo current = leaderRef.get();
Optional<LeaderInfo> currentOpt = Optional.ofNullable( current );


boolean noUpdate = Optional.ofNullable( expected ).map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); boolean noUpdate = currentOpt.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );


boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= leaderInfo.term() ).orElse( false ); int termComparison = currentOpt.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 );


if ( (greaterOrEqualTermExists || noUpdate) && !leaderInfo.isStepDown() ) boolean greaterTermExists = termComparison > 0;

boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() );

boolean success = !( invalidTerm || noUpdate);

if ( !success )
{ {
return; return;
} }


leaderRef.compareAndSet( expected, leaderInfo ); leaderRef.compareAndSet( current, leaderInfo );
} }


private static Optional<LeaderInfo> getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName ) private static Optional<LeaderInfo> getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName )
Expand Down
Expand Up @@ -71,7 +71,6 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private static final long HAZELCAST_IS_HEALTHY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis( 10 ); private static final long HAZELCAST_IS_HEALTHY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis( 10 );
private static final int HAZELCAST_MIN_CLUSTER = 2; private static final int HAZELCAST_MIN_CLUSTER = 2;


private final int minimumConsensusSize;
private final Config config; private final Config config;
private final MemberId myself; private final MemberId myself;
private final Log log; private final Log log;
Expand Down Expand Up @@ -101,7 +100,6 @@ public HazelcastCoreTopologyService( Config config, MemberId myself, JobSchedule
TopologyServiceRetryStrategy topologyServiceRetryStrategy ) TopologyServiceRetryStrategy topologyServiceRetryStrategy )
{ {
this.config = config; this.config = config;
this.minimumConsensusSize = config.get( CausalClusteringSettings.minimum_core_cluster_size_at_runtime );
this.myself = myself; this.myself = myself;
this.listenerService = new CoreTopologyListenerService(); this.listenerService = new CoreTopologyListenerService();
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -143,12 +141,12 @@ public void setLeader( LeaderInfo leaderInfo, String dbName )
} }


@Override @Override
public void handleStepDown( LeaderInfo leaderInfo, String dbName ) public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName )
{ {
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() ); boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader ) if ( wasPreviousLeader )
{ {
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, dbName ); HazelcastClusterTopology.casLeaders( hazelcastInstance, stepDownLeaderInfo, dbName );
} }
} }


Expand Down
Expand Up @@ -395,7 +395,9 @@ public List<CoreClusterMember> getAllMembersWithAnyRole( String dbName, Role...
Set<Role> roleSet = Arrays.stream( roles ).collect( toSet() ); Set<Role> roleSet = Arrays.stream( roles ).collect( toSet() );


return coreMembers.values().stream() return coreMembers.values().stream()
.filter( m -> m.database() != null && m.dbName().equals( dbName ) && roleSet.contains( m.database().getRole() ) ) .filter( m -> m.database() != null )
.filter( m -> m.dbName().equals( dbName ) )
.filter( m -> roleSet.contains( m.database().getRole() ) )
.collect( Collectors.toList() ); .collect( Collectors.toList() );
} }


Expand Down
Expand Up @@ -91,7 +91,7 @@ public Map<MemberId,RoleInfo> allCoreRoles()
@Override @Override
public void setLeader( LeaderInfo newLeader, String dbName ) public void setLeader( LeaderInfo newLeader, String dbName )
{ {
if ( this.leaderInfo.term() < newLeader.term() ) if ( this.leaderInfo.term() < newLeader.term() && newLeader.memberId() != null )
{ {
this.leaderInfo = newLeader; this.leaderInfo = newLeader;
sharedDiscoveryService.casLeaders( newLeader, localDBName ); sharedDiscoveryService.casLeaders( newLeader, localDBName );
Expand Down Expand Up @@ -143,12 +143,12 @@ public CoreTopology allCoreServers()
} }


@Override @Override
public void handleStepDown( LeaderInfo leaderInfo, String dbName ) public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName )
{ {
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() ); boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader ) if ( wasPreviousLeader )
{ {
sharedDiscoveryService.casLeaders( leaderInfo, dbName ); sharedDiscoveryService.casLeaders( stepDownLeaderInfo, dbName );
} }
} }


Expand Down
Expand Up @@ -134,9 +134,13 @@ void casLeaders( LeaderInfo leaderInfo, String dbName )


boolean noUpdate = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); boolean noUpdate = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );


boolean greaterOrEqualTermExists = current.map( l -> l.term() >= leaderInfo.term() ).orElse( false ); int termComparison = current.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 );


boolean success = !(greaterOrEqualTermExists || noUpdate) || leaderInfo.isStepDown(); boolean greaterTermExists = termComparison > 0;

boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() );

boolean success = !( invalidTerm || noUpdate);


if ( success ) if ( success )
{ {
Expand Down
Expand Up @@ -277,7 +277,6 @@ public void shouldHaveNoLeaderIfMajorityCoreMembersDead() throws Exception
clusterRule.withNumberOfCoreMembers( 2 ); clusterRule.withNumberOfCoreMembers( 2 );


Cluster cluster = clusterRule.startCluster(); Cluster cluster = clusterRule.startCluster();
//TODO: address proliferation of Role enums
List<CoreClusterMember> followers = cluster.getAllMembersWithRole( Role.FOLLOWER ); List<CoreClusterMember> followers = cluster.getAllMembersWithRole( Role.FOLLOWER );
CoreClusterMember leader = cluster.getMemberWithRole( Role.LEADER ); CoreClusterMember leader = cluster.getMemberWithRole( Role.LEADER );
followers.forEach( CoreClusterMember::shutdown ); followers.forEach( CoreClusterMember::shutdown );
Expand Down

0 comments on commit 7270fd9

Please sign in to comment.