Skip to content

Commit

Permalink
Update Shared Cluster Overview state in event of leader step down and…
Browse files Browse the repository at this point in the history
… read only
  • Loading branch information
hugofirth committed Apr 19, 2018
1 parent 606b2ec commit dd58c92
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 46 deletions.
Expand Up @@ -140,7 +140,7 @@ public void dataIsInAUsableStateAfterBackup() throws Exception
Cluster cluster = startCluster( recordFormat );

// and the database has indexes
ClusterHelper.createIndexes( cluster.getDbWithAnyRole( Role.LEADER ).database() );
ClusterHelper.createIndexes( cluster.getMemberWithAnyRole( Role.LEADER ).database() );

// and the database is being populated
AtomicBoolean populateDatabaseFlag = new AtomicBoolean( true );
Expand Down Expand Up @@ -229,7 +229,7 @@ public void reportsProgress() throws Exception
{
// given
Cluster cluster = startCluster( recordFormat );
ClusterHelper.createIndexes( cluster.getDbWithAnyRole( Role.LEADER ).database() );
ClusterHelper.createIndexes( cluster.getMemberWithAnyRole( Role.LEADER ).database() );
String customAddress = CausalClusteringTestHelpers.backupAddress( clusterLeader( cluster ).database() );

// and
Expand Down Expand Up @@ -356,7 +356,7 @@ public static DbRepresentation createSomeData( Cluster cluster )

private static CoreClusterMember clusterLeader( Cluster cluster )
{
return cluster.getDbWithRole( Role.LEADER );
return cluster.getMemberWithRole( Role.LEADER );
}

public static DbRepresentation getBackupDbRepresentation( String name, File backupDir )
Expand Down
Expand Up @@ -102,7 +102,7 @@ static CoreGraphDatabase createSomeData( Cluster cluster ) throws Exception

static String backupAddress( Cluster cluster )
{
return cluster.getDbWithRole( Role.LEADER ).settingValue( "causal_clustering.transaction_listen_address" );
return cluster.getMemberWithRole( Role.LEADER ).settingValue( "causal_clustering.transaction_listen_address" );
}

static String[] backupArguments( String from, File backupsDir, String name )
Expand Down
Expand Up @@ -32,11 +32,23 @@ public class LeaderInfo implements Serializable

private final MemberId memberId;
private final long term;
private boolean isStepDown;

public LeaderInfo( MemberId memberId, long term )
{
this.memberId = memberId;
this.term = term;
this.isStepDown = false;
}

public void stepDown()
{
this.isStepDown = true;
}

public boolean isStepDown()
{
return isStepDown;
}

public MemberId memberId()
Expand Down
Expand Up @@ -22,4 +22,7 @@
public interface LeaderListener
{
void onLeaderSwitch( LeaderInfo leaderInfo );
default void onLeaderStepDown( LeaderInfo leaderInfo )
{
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand Down Expand Up @@ -213,8 +214,19 @@ public synchronized ExposedRaftState state()
private void notifyLeaderChanges( Outcome outcome )
{
LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() );
boolean isStepDown = outcome.isSteppingDown();

if ( isStepDown )
{
leaderInfo.stepDown();
}

for ( LeaderListener listener : leaderListeners )
{
if ( isStepDown )
{
listener.onLeaderStepDown( leaderInfo );
}
listener.onLeaderSwitch( leaderInfo );
}
}
Expand All @@ -239,18 +251,10 @@ else if ( outcome.isSteppingDown() )

private boolean leaderChanged( Outcome outcome, MemberId oldLeader )
{
if ( oldLeader == null && outcome.getLeader() != null )
{
return true;
}
else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) )
{
return true;
}

//TODO: Add logic for handling leader step-down with no replacement
Optional<MemberId> oldLeaderOpt = Optional.ofNullable( oldLeader );
Optional<MemberId> outcomeLeaderOpt = Optional.ofNullable( outcome.getLeader() );

return false;
return !oldLeaderOpt.equals( outcomeLeaderOpt );
}

public synchronized ConsensusOutcome handle( RaftMessages.RaftMessage incomingMessage ) throws IOException
Expand Down
Expand Up @@ -52,6 +52,15 @@ public interface CoreTopologyService extends TopologyService
*/
void setLeader( LeaderInfo leaderInfo, String dbName );

/**
* Set the leader memberId to null for a given database (i.e. Raft consensus group).
* This is intended to trigger state cleanup for informational procedures like {@link ClusterOverviewProcedure}
*
* @param leaderInfo
* @param dbName
*/
void handleStepDown( LeaderInfo leaderInfo, String dbName );

interface Listener
{
void onCoreTopologyChange( CoreTopology coreTopology );
Expand Down
Expand Up @@ -222,7 +222,7 @@ static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderIn

boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= leaderInfo.term() ).orElse( false );

if ( greaterOrEqualTermExists || noUpdate )
if ( (greaterOrEqualTermExists || noUpdate) && !leaderInfo.isStepDown() )
{
return;
}
Expand Down
Expand Up @@ -71,6 +71,7 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private static final long HAZELCAST_IS_HEALTHY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis( 10 );
private static final int HAZELCAST_MIN_CLUSTER = 2;

private final int minimumConsensusSize;
private final Config config;
private final MemberId myself;
private final Log log;
Expand Down Expand Up @@ -100,6 +101,7 @@ public HazelcastCoreTopologyService( Config config, MemberId myself, JobSchedule
TopologyServiceRetryStrategy topologyServiceRetryStrategy )
{
this.config = config;
this.minimumConsensusSize = config.get( CausalClusteringSettings.minimum_core_cluster_size_at_runtime );
this.myself = myself;
this.listenerService = new CoreTopologyListenerService();
this.log = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -140,6 +142,16 @@ public void setLeader( LeaderInfo leaderInfo, String dbName )
}
}

@Override
public void handleStepDown( LeaderInfo leaderInfo, String dbName )
{
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader )
{
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, dbName );
}
}

@Override
public Map<MemberId,RoleInfo> allCoreRoles()
{
Expand Down
Expand Up @@ -63,6 +63,12 @@ public void onLeaderSwitch( LeaderInfo leaderInfo )
coreTopologyService.setLeader( leaderInfo, dbName );
}

@Override
public void onLeaderStepDown( LeaderInfo leaderInfo )
{
coreTopologyService.handleStepDown( leaderInfo, dbName );
}

@Override
public String dbName()
{
Expand Down
Expand Up @@ -80,9 +80,8 @@ public ClusterOverviewProcedure( TopologyService topologyService, LogProvider lo
public RawIterator<Object[],ProcedureException> apply(
Context ctx, Object[] input, ResourceTracker resourceTracker )
{
Map<MemberId,RoleInfo> roleMap = emptyMap();
Map<MemberId,RoleInfo> roleMap = topologyService.allCoreRoles();
List<ReadWriteEndPoint> endpoints = new ArrayList<>();
roleMap = topologyService.allCoreRoles();

CoreTopology coreTopology = topologyService.allCoreServers();
Set<MemberId> coreMembers = coreTopology.members().keySet();
Expand Down
Expand Up @@ -352,31 +352,51 @@ private void ensureDBName( String dbName ) throws IllegalArgumentException
}
}

public CoreClusterMember getDbWithRole( Role role )
public CoreClusterMember getMemberWithRole( Role role )
{
return getDbWithAnyRole( role );
return getMemberWithAnyRole( role );
}

public CoreClusterMember getDbWithRole( String dbName, Role role )
public List<CoreClusterMember> getAllMembersWithRole( Role role )
{
return getDbWithAnyRole( dbName, role );
return getAllMembersWithAnyRole( role );
}

public CoreClusterMember getDbWithAnyRole( Role... roles )
public CoreClusterMember getMemberWithRole( String dbName, Role role )
{
return getMemberWithAnyRole( dbName, role );
}

public List<CoreClusterMember> getAllMembersWithRole( String dbName, Role role )
{
return getAllMembersWithAnyRole( dbName, role );
}

public CoreClusterMember getMemberWithAnyRole( Role... roles )
{
String dbName = CausalClusteringSettings.database.getDefaultValue();
return getDbWithAnyRole( dbName, roles );
return getMemberWithAnyRole( dbName, roles );
}

public CoreClusterMember getDbWithAnyRole( String dbName, Role... roles )
public List<CoreClusterMember> getAllMembersWithAnyRole( Role... roles )
{
String dbName = CausalClusteringSettings.database.getDefaultValue();
return getAllMembersWithAnyRole( dbName, roles );
}

public CoreClusterMember getMemberWithAnyRole( String dbName, Role... roles )
{
return getAllMembersWithAnyRole( dbName, roles ).stream().findFirst().orElse( null );
}

public List<CoreClusterMember> getAllMembersWithAnyRole( String dbName, Role... roles )
{
ensureDBName( dbName );
Set<Role> roleSet = Arrays.stream( roles ).collect( toSet() );

Optional<CoreClusterMember> firstAppropriate = coreMembers.values().stream().filter( m ->
m.database() != null && m.dbName().equals( dbName ) && roleSet.contains( m.database().getRole() ) ).findFirst();

return firstAppropriate.orElse( null );
return coreMembers.values().stream()
.filter( m -> m.database() != null && m.dbName().equals( dbName ) && roleSet.contains( m.database().getRole() ) )
.collect( Collectors.toList() );
}

public CoreClusterMember awaitLeader() throws TimeoutException
Expand All @@ -401,12 +421,12 @@ public CoreClusterMember awaitLeader( long timeout, TimeUnit timeUnit ) throws T

public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit ) throws TimeoutException
{
return await( () -> getDbWithRole( role ), notNull(), timeout, timeUnit );
return await( () -> getMemberWithRole( role ), notNull(), timeout, timeUnit );
}

public CoreClusterMember awaitCoreMemberWithRole( String dbName, Role role, long timeout, TimeUnit timeUnit ) throws TimeoutException
{
return await( () -> getDbWithRole( dbName, role ), notNull(), timeout, timeUnit );
return await( () -> getMemberWithRole( dbName, role ), notNull(), timeout, timeUnit );
}

public int numberOfCoreMembersReportedByTopology()
Expand Down
Expand Up @@ -91,7 +91,7 @@ public Map<MemberId,RoleInfo> allCoreRoles()
@Override
public void setLeader( LeaderInfo newLeader, String dbName )
{
if ( this.leaderInfo.term() < newLeader.term() && newLeader.memberId() != null )
if ( this.leaderInfo.term() < newLeader.term() )
{
this.leaderInfo = newLeader;
sharedDiscoveryService.casLeaders( newLeader, localDBName );
Expand Down Expand Up @@ -142,6 +142,16 @@ public CoreTopology allCoreServers()
return this.coreTopology;
}

@Override
public void handleStepDown( LeaderInfo leaderInfo, String dbName )
{
boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() );
if ( wasPreviousLeader )
{
sharedDiscoveryService.casLeaders( leaderInfo, dbName );
}
}

@Override
public String localDBName()
{
Expand Down
Expand Up @@ -136,7 +136,7 @@ void casLeaders( LeaderInfo leaderInfo, String dbName )

boolean greaterOrEqualTermExists = current.map( l -> l.term() >= leaderInfo.term() ).orElse( false );

boolean success = !(greaterOrEqualTermExists || noUpdate);
boolean success = !(greaterOrEqualTermExists || noUpdate) || leaderInfo.isStepDown();

if ( success )
{
Expand Down
Expand Up @@ -120,7 +120,7 @@ public void shouldBeAbleToAddAndRemoveCoreMembersUnderModestLoad()
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit( () ->
{
CoreGraphDatabase leader = cluster.getDbWithRole( Role.LEADER ).database();
CoreGraphDatabase leader = cluster.getMemberWithRole( Role.LEADER ).database();
try ( Transaction tx = leader.beginTx() )
{
leader.createNode();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -41,8 +42,10 @@
import java.util.stream.Stream;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.RoleInfo;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.collection.RawIterator;
Expand All @@ -62,6 +65,7 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.neo4j.causalclustering.discovery.RoleInfo.FOLLOWER;
import static org.neo4j.causalclustering.discovery.RoleInfo.LEADER;
import static org.neo4j.causalclustering.discovery.RoleInfo.READ_REPLICA;
Expand Down Expand Up @@ -266,6 +270,21 @@ public void shouldDiscoverRemovalOfCoreMembers() throws Exception
}
}

@Test
public void shouldHaveNoLeaderIfMajorityCoreMembersDead() throws Exception
{
clusterRule.withNumberOfCoreMembers( 3 );
clusterRule.withNumberOfCoreMembers( 2 );

Cluster cluster = clusterRule.startCluster();
//TODO: address proliferation of Role enums
List<CoreClusterMember> followers = cluster.getAllMembersWithRole( Role.FOLLOWER );
CoreClusterMember leader = cluster.getMemberWithRole( Role.LEADER );
followers.forEach( CoreClusterMember::shutdown );

assertEventualOverview( cluster, containsRole( LEADER, 0 ), leader.serverId() );
}

private void assertEventualOverview( Cluster cluster, Matcher<List<MemberInfo>> expected, int coreServerId )
throws KernelException, InterruptedException
{
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void shouldNotAllowTheConsensusGroupToDropBelowMinimumConsensusGroupSize(
// when
for ( int i = 0; i < numberOfCoreSeversToRemove; i++ )
{
cluster.removeCoreMember( cluster.getDbWithRole( Role.LEADER ) );
cluster.removeCoreMember( cluster.getMemberWithRole( Role.LEADER ) );
cluster.awaitLeader( 30, SECONDS );
}

Expand Down

0 comments on commit dd58c92

Please sign in to comment.