Skip to content

Commit

Permalink
Change stepdown updates to be handled in RobustJobScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth authored and andrewkerr9000 committed May 4, 2018
1 parent 7438691 commit d453278
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Expand Up @@ -91,6 +91,7 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private volatile CoreTopology coreTopology = CoreTopology.EMPTY; private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>(); private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap(); private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap();
private volatile Optional<LeaderInfo> stepDownInfo = Optional.empty();


private Thread startingThread; private Thread startingThread;
private volatile boolean stopped; private volatile boolean stopped;
Expand Down Expand Up @@ -134,21 +135,26 @@ public boolean setClusterId( ClusterId clusterId, String dbName ) throws Interru
@Override @Override
public void setLeader( LeaderInfo leaderInfo, String dbName ) public void setLeader( LeaderInfo leaderInfo, String dbName )
{ {
if ( this.leaderInfo.term() < leaderInfo.term() ) if ( this.leaderInfo.term() < leaderInfo.term() && localDBName.equals( dbName ) )
{ {
log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, leaderInfo.term() );
this.leaderInfo = leaderInfo; this.leaderInfo = leaderInfo;
} }
} }


@Override @Override
public void handleStepDown( long term, String dbName ) public void handleStepDown( long term, String dbName )
{ {
boolean wasLeaderForTerm = Objects.equals( myself, leaderInfo.memberId() ) && term == leaderInfo.term(); boolean wasLeaderForDbAndTerm =
if ( wasLeaderForTerm ) Objects.equals( myself, leaderInfo.memberId() ) &&
localDBName.equals( dbName ) &&
term == leaderInfo.term();

if ( wasLeaderForDbAndTerm )
{ {
log.info( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " + log.info( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() ); "to follower.", myself, leaderInfo.term() );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo.stepDown(), dbName ); stepDownInfo = Optional.of( leaderInfo.stepDown() );
} }
} }


Expand Down Expand Up @@ -367,10 +373,11 @@ private void refreshRoles() throws InterruptedException


if ( leaderInfo.memberId() != null && leaderInfo.memberId().equals( myself ) ) if ( leaderInfo.memberId() != null && leaderInfo.memberId().equals( myself ) )
{ {
log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, leaderInfo.term() );
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName ); HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName );
} }


stepDownInfo.ifPresent( s -> HazelcastClusterTopology.casLeaders( hazelcastInstance, s, localDBName ) );

coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() ); coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
} }


Expand Down
Expand Up @@ -314,7 +314,7 @@ private void assertEventualOverview( Cluster cluster, Matcher<List<MemberInfo>>
Function<List<MemberInfo>, String> printableMemberInfos = Function<List<MemberInfo>, String> printableMemberInfos =
memberInfos -> memberInfos.stream().map( MemberInfo::toString ).collect( Collectors.joining( ", " ) ); memberInfos -> memberInfos.stream().map( MemberInfo::toString ).collect( Collectors.joining( ", " ) );
assertEventually( memberInfos -> "should have overview from core " + coreServerId + " but view was " + printableMemberInfos.apply( memberInfos ), assertEventually( memberInfos -> "should have overview from core " + coreServerId + " but view was " + printableMemberInfos.apply( memberInfos ),
() -> clusterOverview( cluster.getCoreMemberById( coreServerId ).database() ), expected, 60, SECONDS ); () -> clusterOverview( cluster.getCoreMemberById( coreServerId ).database() ), expected, 90, SECONDS );
} }


@SafeVarargs @SafeVarargs
Expand Down

0 comments on commit d453278

Please sign in to comment.