Skip to content

Commit

Permalink
Reset step down info on refreshing roles.
Browse files Browse the repository at this point in the history
Make leader info and step down info AtomicReferences.
  • Loading branch information
andrewkerr9000 committed May 4, 2018
1 parent d453278 commit 2c62f35
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;


import com.hazelcast.config.InterfacesConfig; import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.JoinConfig; import com.hazelcast.config.JoinConfig;
Expand Down Expand Up @@ -85,13 +86,14 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private String membershipRegistrationId; private String membershipRegistrationId;
private JobScheduler.JobHandle refreshJob; private JobScheduler.JobHandle refreshJob;


private volatile LeaderInfo leaderInfo = LeaderInfo.INITIAL; private final AtomicReference<LeaderInfo> leaderInfo = new AtomicReference<>( LeaderInfo.INITIAL );
private final AtomicReference<Optional<LeaderInfo>> stepDownInfo = new AtomicReference<>( Optional.empty() );

private volatile HazelcastInstance hazelcastInstance; private volatile HazelcastInstance hazelcastInstance;
private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY; private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile CoreTopology coreTopology = CoreTopology.EMPTY; private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>(); private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap(); private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap();
private volatile Optional<LeaderInfo> stepDownInfo = Optional.empty();


private Thread startingThread; private Thread startingThread;
private volatile boolean stopped; private volatile boolean stopped;
Expand Down Expand Up @@ -133,28 +135,37 @@ public boolean setClusterId( ClusterId clusterId, String dbName ) throws Interru
} }


@Override @Override
public void setLeader( LeaderInfo leaderInfo, String dbName ) public void setLeader( LeaderInfo newLeaderInfo, String dbName )
{ {
if ( this.leaderInfo.term() < leaderInfo.term() && localDBName.equals( dbName ) ) leaderInfo.updateAndGet( currentLeaderInfo ->
{ {
log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, leaderInfo.term() ); if ( currentLeaderInfo.term() < newLeaderInfo.term() && localDBName.equals( dbName ) )
this.leaderInfo = leaderInfo; {
} log.info( "Leader %s updating leader info for database %s and term %s", myself, localDBName, newLeaderInfo.term() );
return newLeaderInfo;
}
else
{
return currentLeaderInfo;
}
} );
} }


@Override @Override
public void handleStepDown( long term, String dbName ) public void handleStepDown( long term, String dbName )
{ {
LeaderInfo localLeaderInfo = leaderInfo.get();

boolean wasLeaderForDbAndTerm = boolean wasLeaderForDbAndTerm =
Objects.equals( myself, leaderInfo.memberId() ) && Objects.equals( myself, localLeaderInfo.memberId() ) &&
localDBName.equals( dbName ) && localDBName.equals( dbName ) &&
term == leaderInfo.term(); term == localLeaderInfo.term();


if ( wasLeaderForDbAndTerm ) if ( wasLeaderForDbAndTerm )
{ {
log.info( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " + log.info( "Step down event detected. This topology member, with MemberId %s, was leader in term %s, now moving " +
"to follower.", myself, leaderInfo.term() ); "to follower.", myself, localLeaderInfo.term() );
stepDownInfo = Optional.of( leaderInfo.stepDown() ); stepDownInfo.set( Optional.of( localLeaderInfo.stepDown() ) );
} }
} }


Expand Down Expand Up @@ -370,13 +381,18 @@ private Optional<AdvertisedSocketAddress> retrieveSocketAddress( MemberId member
private void refreshRoles() throws InterruptedException private void refreshRoles() throws InterruptedException
{ {
waitOnHazelcastInstanceCreation(); waitOnHazelcastInstanceCreation();
LeaderInfo localLeaderInfo = leaderInfo.get();
Optional<LeaderInfo> localStepDownInfo = stepDownInfo.get();


if ( leaderInfo.memberId() != null && leaderInfo.memberId().equals( myself ) ) if ( localStepDownInfo.isPresent() )
{ {
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName ); HazelcastClusterTopology.casLeaders( hazelcastInstance, localStepDownInfo.get(), localDBName );
stepDownInfo.compareAndSet( localStepDownInfo, Optional.empty() );
}
else if ( localLeaderInfo.memberId() != null && localLeaderInfo.memberId().equals( myself ) )
{
HazelcastClusterTopology.casLeaders( hazelcastInstance, localLeaderInfo, localDBName );
} }

stepDownInfo.ifPresent( s -> HazelcastClusterTopology.casLeaders( hazelcastInstance, s, localDBName ) );


coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() ); coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
} }
Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.neo4j.causalclustering.discovery.RoleInfo.FOLLOWER; import static org.neo4j.causalclustering.discovery.RoleInfo.FOLLOWER;
import static org.neo4j.causalclustering.discovery.RoleInfo.LEADER; import static org.neo4j.causalclustering.discovery.RoleInfo.LEADER;
Expand Down

0 comments on commit 2c62f35

Please sign in to comment.