Skip to content

Commit

Permalink
Re-added cluster overview on RRs
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Jul 3, 2018
1 parent 1ff9cc4 commit cd49a2b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.discovery.TopologyServiceMultiRetryStrategy;
import org.neo4j.causalclustering.discovery.TopologyServiceRetryStrategy;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.ReadReplicaRoleProcedure;
import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
Expand Down Expand Up @@ -152,6 +153,9 @@
*/
public class EnterpriseReadReplicaEditionModule extends EditionModule
{
private final TopologyService topologyService;
private final LogProvider logProvider;

public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, final DiscoveryServiceFactory discoveryServiceFactory, MemberId myself )
{
LogService logging = platformModule.logging;
Expand Down Expand Up @@ -209,7 +213,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
publishEditionInfo( dependencies.resolveDependency( UsageData.class ), platformModule.databaseInfo, config );
commitProcessFactory = readOnly();

LogProvider logProvider = platformModule.logging.getInternalLogProvider();
logProvider = platformModule.logging.getInternalLogProvider();
LogProvider userLogProvider = platformModule.logging.getUserLogProvider();

logProvider.getLog( getClass() ).info( String.format( "Generated new id: %s", myself ) );
Expand All @@ -218,7 +222,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,

configureDiscoveryService( discoveryServiceFactory, dependencies, config, logProvider );

TopologyService topologyService = discoveryServiceFactory.topologyService( config, logProvider, platformModule.jobScheduler, myself, hostnameResolver,
topologyService = discoveryServiceFactory.topologyService( config, logProvider, platformModule.jobScheduler, myself, hostnameResolver,
resolveStrategy( config, logProvider ) );

life.add( dependencies.satisfyDependency( topologyService ) );
Expand Down Expand Up @@ -389,6 +393,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true );
procedures.registerProcedure( EnterpriseBuiltInProcedures.class, true );
procedures.register( new ReadReplicaRoleProcedure() );
procedures.register( new ClusterOverviewProcedure( topologyService, logProvider ) );
}

private void registerRecovery( final DatabaseInfo databaseInfo, LifeSupport life, final DependencyResolver dependencyResolver )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public interface ClusterMember<T extends GraphDatabaseAPI>

File homeDir();

int serverId();

default void updateConfig( Setting<?> setting, String value )
{
config().augment( setting, value );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public String toString()
return format( "CoreClusterMember{serverId=%d}", serverId );
}

@Override
public int serverId()
{
return serverId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public MemberId memberId()
return new MemberId( new UUID( ((long) serverId) << 32, 0 ) );
}

@Override
public int serverId()
{
return serverId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand All @@ -49,6 +51,7 @@
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.causalclustering.discovery.RoleInfo;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.collection.RawIterator;
Expand Down Expand Up @@ -110,11 +113,8 @@ public void shouldDiscoverCoreMembers() throws Exception
containsMemberAddresses( cluster.coreMembers() ),
containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 ), doesNotContainRole( READ_REPLICA ) );

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
// then
assertAllEventualOverviews( cluster, expected );
}

@Test
Expand All @@ -133,11 +133,8 @@ public void shouldDiscoverCoreMembersAndReadReplicas() throws Exception
containsAllMemberAddresses( cluster.coreMembers(), cluster.readReplicas() ),
containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ), containsRole( READ_REPLICA, replicaCount ) );

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
// then
assertAllEventualOverviews( cluster, expected );
}

@Test
Expand All @@ -158,11 +155,8 @@ public void shouldDiscoverReadReplicasAfterRestartingCores() throws Exception
containsAllMemberAddresses( cluster.coreMembers(), cluster.readReplicas() ),
containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 ), containsRole( READ_REPLICA, readReplicas ) );

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
// then
assertAllEventualOverviews( cluster, expected );
}

@Test
Expand All @@ -184,11 +178,8 @@ public void shouldDiscoverNewCoreMembers() throws Exception
containsMemberAddresses( cluster.coreMembers() ),
containsRole( LEADER, 1 ), containsRole( FOLLOWER, finalCoreMembers - 1 ) );

for ( int coreServerId = 0; coreServerId < finalCoreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
// then
assertAllEventualOverviews( cluster, expected );
}

@Test
Expand All @@ -212,11 +203,8 @@ public void shouldDiscoverNewReadReplicas() throws Exception
containsRole( FOLLOWER, coreMembers - 1 ),
containsRole( READ_REPLICA, initialReadReplicas + 2 ) );

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
// then
assertAllEventualOverviews( cluster, expected );
}

@Test
Expand All @@ -230,20 +218,14 @@ public void shouldDiscoverRemovalOfReadReplicas() throws Exception

Cluster cluster = clusterRule.startCluster();

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
assertEventualOverview( cluster, containsRole( READ_REPLICA, initialReadReplicas ), coreServerId );
}
assertAllEventualOverviews( cluster, containsRole( READ_REPLICA, initialReadReplicas ) );

// when
cluster.removeReadReplicaWithMemberId( 0 );
cluster.removeReadReplicaWithMemberId( 1 );

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, containsRole( READ_REPLICA, initialReadReplicas - 2 ), coreServerId );
}
// then
assertAllEventualOverviews( cluster, containsRole( READ_REPLICA, initialReadReplicas - 2 ) );
}

@Test
Expand All @@ -256,22 +238,15 @@ public void shouldDiscoverRemovalOfCoreMembers() throws Exception

Cluster cluster = clusterRule.startCluster();

for ( int coreServerId = 0; coreServerId < coreMembers; coreServerId++ )
{
assertEventualOverview( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 ) ),
coreServerId );
}
assertAllEventualOverviews( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 ) ) );

// when
cluster.removeCoreMemberWithServerId( 0 );
cluster.removeCoreMemberWithServerId( 1 );

for ( int coreServerId = 2; coreServerId < coreMembers; coreServerId++ )
{
// then
assertEventualOverview( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 - 2 ) ),
coreServerId );
}
// then
assertAllEventualOverviews( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, coreMembers - 1 - 2 ) ),
asSet( 0, 1 ), Collections.emptySet() );
}

@Test
Expand All @@ -285,7 +260,7 @@ public void shouldDiscoverTimeoutBasedLeaderStepdown() throws Exception
CoreClusterMember leader = cluster.getMemberWithRole( Role.LEADER );
followers.forEach( CoreClusterMember::shutdown );

assertEventualOverview( cluster, containsRole( LEADER, 0 ), leader.serverId() );
assertEventualOverview( containsRole( LEADER, 0 ), leader, "core" );
}

@Test
Expand All @@ -303,19 +278,47 @@ public void shouldDiscoverGreaterTermBasedLeaderStepdown() throws Exception
CoreClusterMember follower = cluster.getMemberWithRole( Role.FOLLOWER );
follower.raft().triggerElection( Clock.systemUTC() );

assertEventualOverview( cluster, allOf(
assertEventualOverview( allOf(
containsRole( LEADER, 1 ),
containsRole( FOLLOWER, originalCoreMembers - 1 ),
not( equalTo( preElectionOverview ) ) ), leader.serverId() );
not( equalTo( preElectionOverview ) ) ), leader, "core" );
}

private void assertEventualOverview( Cluster cluster, Matcher<List<MemberInfo>> expected, int coreServerId )
private void assertAllEventualOverviews( Cluster cluster, Matcher<List<MemberInfo>> expected ) throws KernelException, InterruptedException
{
assertAllEventualOverviews( cluster, expected, Collections.emptySet(), Collections.emptySet() );
}

private void assertAllEventualOverviews( Cluster cluster, Matcher<List<MemberInfo>> expected, Set<Integer> excludedCores, Set<Integer> excludedRRs )
throws KernelException, InterruptedException
{
for ( CoreClusterMember core : cluster.coreMembers() )
{
if ( !excludedCores.contains( core.serverId() ) )
{
assertEventualOverview( expected, core, "core" );
}

}

for ( ReadReplica rr : cluster.readReplicas() )
{
if ( !excludedRRs.contains( rr.serverId() ) )
{
assertEventualOverview( expected, rr, "rr" );
}
}
}

private void assertEventualOverview( Matcher<List<MemberInfo>> expected, ClusterMember<? extends GraphDatabaseFacade> member, String role )
throws KernelException, InterruptedException
{
Function<List<MemberInfo>, String> printableMemberInfos =
memberInfos -> memberInfos.stream().map( MemberInfo::toString ).collect( Collectors.joining( ", " ) );
assertEventually( memberInfos -> "should have overview from core " + coreServerId + " but view was " + printableMemberInfos.apply( memberInfos ),
() -> clusterOverview( cluster.getCoreMemberById( coreServerId ).database() ), expected, 90, SECONDS );

String message = String.format( "should have overview from %s %s, but view was ", role, member.serverId() );
assertEventually( memberInfos -> message + printableMemberInfos.apply( memberInfos ),
() -> clusterOverview( member.database() ), expected, 90, SECONDS );
}

@SafeVarargs
Expand Down

0 comments on commit cd49a2b

Please sign in to comment.