diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java index b97d77f6ef4dd..b60d72135db8b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -44,10 +45,14 @@ import org.neo4j.logging.LogProvider; import static java.util.Comparator.comparing; +import static org.neo4j.helpers.collection.Iterables.asList; import static org.neo4j.helpers.collection.Iterators.asRawIterator; import static org.neo4j.helpers.collection.Iterators.map; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; +/** + * Overview procedure with added support for server tags. + */ public class ClusterOverviewProcedure extends CallableProcedure.BasicProcedure { private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"}; @@ -60,8 +65,10 @@ public ClusterOverviewProcedure( TopologyService topologyService, LeaderLocator leaderLocator, LogProvider logProvider ) { super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) ) - .out( "id", Neo4jTypes.NTString ).out( "addresses", Neo4jTypes.NTList( Neo4jTypes.NTString ) ) + .out( "id", Neo4jTypes.NTString ) + .out( "addresses", Neo4jTypes.NTList( Neo4jTypes.NTString ) ) .out( "role", Neo4jTypes.NTString ) + .out( "tags", Neo4jTypes.NTList( Neo4jTypes.NTString ) ) .description( "Overview of all currently accessible cluster members and their roles." ) .build() ); this.topologyService = topologyService; @@ -76,6 +83,7 @@ public RawIterator apply( Context ctx, Object[] inp CoreTopology coreTopology = topologyService.coreServers(); Set coreMembers = coreTopology.members(); MemberId leader = null; + try { leader = leaderLocator.getLeader(); @@ -87,36 +95,42 @@ public RawIterator apply( Context ctx, Object[] inp for ( MemberId memberId : coreMembers ) { - Optional clientConnectorAddresses = - coreTopology.find( memberId ).map( CoreServerInfo::connectors ); - if ( clientConnectorAddresses.isPresent() ) + Optional coreServerInfo = coreTopology.find( memberId ); + if ( coreServerInfo.isPresent() ) { Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER; - endpoints.add( new ReadWriteEndPoint( clientConnectorAddresses.get(), role, memberId.getUuid() ) ); + endpoints.add( new ReadWriteEndPoint( coreServerInfo.get().connectors(), role, memberId.getUuid(), asList( coreServerInfo.get().tags() ) ) ); } else { log.debug( "No Address found for " + memberId ); } } - for ( ReadReplicaInfo readReplicaInfo : topologyService.readReplicas().allMemberInfo() ) + + for ( Map.Entry readReplica : topologyService.readReplicas().replicaMembers().entrySet() ) { - endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA ) ); + ReadReplicaInfo readReplicaInfo = readReplica.getValue(); + endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA, readReplica.getKey().getUuid(), asList( readReplicaInfo.tags() ) ) ); } endpoints.sort( comparing( o -> o.addresses().toString() ) ); - return map( ( l ) -> new Object[]{l.identifier().toString(), l.addresses().uriList().stream().map( URI::toString ).toArray(), l.role().name()}, + return map( ( endpoint ) -> new Object[] + { + endpoint.memberId().toString(), + endpoint.addresses().uriList().stream().map( URI::toString ).toArray(), + endpoint.role().name(), + endpoint.tags() + }, asRawIterator( endpoints.iterator() ) ); } - private static class ReadWriteEndPoint + static class ReadWriteEndPoint { - private static final UUID ZERO_ID = new UUID( 0, 0 ); - private final ClientConnectorAddresses clientConnectorAddresses; private final Role role; - private final UUID identifier; + private final UUID memberId; + private final List tags; public ClientConnectorAddresses addresses() { @@ -128,21 +142,22 @@ public Role role() return role; } - UUID identifier() + UUID memberId() { - return identifier == null ? ZERO_ID : identifier; + return memberId; } - ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role ) + List tags() { - this( clientConnectorAddresses, role, null ); + return tags; } - ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role, UUID identifier ) + ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role, UUID memberId, List tags ) { this.clientConnectorAddresses = clientConnectorAddresses; this.role = role; - this.identifier = identifier; + this.memberId = memberId; + this.tags = tags; } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java index b49ec03b45467..3ff4507f9665f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java @@ -34,6 +34,7 @@ import org.neo4j.causalclustering.core.state.ClusterStateDirectory; import org.neo4j.causalclustering.core.state.CoreState; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.cluster.ClusterSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.DefaultFileSystemAbstraction; @@ -79,7 +80,7 @@ public CoreClusterMember( int serverId, int clusterSize, String initialMembers = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); - config.put( "dbms.mode", "CORE" ); + config.put( ClusterSettings.mode.name(), ClusterSettings.Mode.CORE.name() ); config.put( GraphDatabaseSettings.default_advertised_address.name(), "localhost" ); config.put( CausalClusteringSettings.initial_discovery_members.name(), initialMembers ); config.put( CausalClusteringSettings.discovery_listen_address.name(), "127.0.0.1:" + hazelcastPort ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java index 808fb44bec9dd..31a5f658f27d2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java @@ -19,10 +19,18 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import static java.util.Collections.singletonList; import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt; +import static org.neo4j.helpers.collection.Iterators.asSet; public class TestTopology { @@ -36,7 +44,8 @@ public static CoreServerInfo adressesForCore( int id ) AdvertisedSocketAddress raftServerAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); AdvertisedSocketAddress catchupServerAddress = new AdvertisedSocketAddress( "localhost", (4000 + id) ); AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (5000 + id) ); - return new CoreServerInfo( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ) ); + return new CoreServerInfo( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ), + asSet( "core", "core" + id ) ); } public static ReadReplicaInfo addressesForReadReplica( int id ) @@ -45,6 +54,22 @@ public static ReadReplicaInfo addressesForReadReplica( int id ) ClientConnectorAddresses clientConnectorAddresses = new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ); - return new ReadReplicaInfo( clientConnectorAddresses, advertisedSocketAddress ); + return new ReadReplicaInfo( clientConnectorAddresses, advertisedSocketAddress, + asSet( "replica", "replica" + id ) ); + } + + public static Map readReplicaInfoMap( int... ids ) + { + return Arrays.stream( ids ).mapToObj( TestTopology::readReplicaInfo ).collect( Collectors + .toMap( ( p ) -> new MemberId( UUID.randomUUID() ), Function.identity() ) ); + } + + private static ReadReplicaInfo readReplicaInfo( int id ) + { + AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) ); + return new ReadReplicaInfo( + new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ), + new AdvertisedSocketAddress( "localhost", 4000 + id ), + asSet( "replica", "replica" + id ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java index b65c0746424e3..0a11508a66c92 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java @@ -19,12 +19,15 @@ */ package org.neo4j.causalclustering.discovery.procedures; -import org.hamcrest.collection.IsIterableContainingInOrder; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; import org.junit.Test; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.neo4j.causalclustering.core.consensus.LeaderLocator; @@ -34,14 +37,18 @@ import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.collection.RawIterator; +import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica; import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; -import static org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1Test.readReplicaInfoMap; -import static org.neo4j.helpers.collection.Iterators.asList; +import static org.neo4j.helpers.collection.Iterators.asSet; public class ClusterOverviewProcedureTest { @@ -60,27 +67,92 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception coreMembers.put( follower1, adressesForCore( 1 ) ); coreMembers.put( follower2, adressesForCore( 2 ) ); - Map readReplicas = readReplicaInfoMap( 4, 5 ); + Map replicaMembers = new HashMap<>(); + MemberId replica4 = new MemberId( UUID.randomUUID() ); + MemberId replica5 = new MemberId( UUID.randomUUID() ); + + replicaMembers.put( replica4, addressesForReadReplica( 4 ) ); + replicaMembers.put( replica5, addressesForReadReplica( 5 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( null, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( readReplicas ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( replicaMembers ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( theLeader ); - ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator, - NullLogProvider.getInstance() ); + ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator, NullLogProvider.getInstance() ); // when - final List members = asList( procedure.apply( null, new Object[0] ) ); - - // then - assertThat( members, IsIterableContainingInOrder.contains( - new Object[]{theLeader.getUuid().toString(), new String[] {"bolt://localhost:5000"}, "LEADER"}, - new Object[]{follower1.getUuid().toString(), new String[] {"bolt://localhost:5001"}, "FOLLOWER"}, - new Object[]{follower2.getUuid().toString(), new String[] {"bolt://localhost:5002"}, "FOLLOWER"}, - new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6004"}, "READ_REPLICA"}, - new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6005"}, "READ_REPLICA"} - ) ); + final RawIterator members = procedure.apply( null, new Object[0] ); + + assertThat( members.next(), new IsRecord( theLeader.getUuid(), 5000, Role.LEADER, asSet( "core", "core0" ) ) ); + assertThat( members.next(), new IsRecord( follower1.getUuid(), 5001, Role.FOLLOWER, asSet( "core", "core1" ) ) ); + assertThat( members.next(), new IsRecord( follower2.getUuid(), 5002, Role.FOLLOWER, asSet( "core", "core2" ) ) ); + + assertThat( members.next(), new IsRecord( replica4.getUuid(), 6004, Role.READ_REPLICA, asSet( "replica", "replica4" ) ) ); + assertThat( members.next(), new IsRecord( replica5.getUuid(), 6005, Role.READ_REPLICA, asSet( "replica", "replica5" ) ) ); + + assertFalse( members.hasNext() ); + } + + class IsRecord extends TypeSafeMatcher + { + private final UUID memberId; + private final int boltPort; + private final Role role; + private final Set tags; + + IsRecord( UUID memberId, int boltPort, Role role, Set tags ) + { + this.memberId = memberId; + this.boltPort = boltPort; + this.role = role; + this.tags = tags; + } + + @Override + protected boolean matchesSafely( Object[] record ) + { + if ( record.length != 4 ) + { + return false; + } + + if ( !memberId.toString().equals( record[0] ) ) + { + return false; + } + + String[] boltAddresses = new String[]{"bolt://localhost:" + boltPort}; + + if ( !Arrays.equals( boltAddresses, ((Object[]) record[1]) ) ) + { + return false; + } + + if ( !role.name().equals( record[2] ) ) + { + return false; + } + + Set recordTags = Iterables.asSet( (List) record[3] ); + if ( !tags.equals( recordTags ) ) + { + return false; + } + + return true; + } + + @Override + public void describeTo( Description description ) + { + description.appendText( + "memberId=" + memberId + + ", boltPort=" + boltPort + + ", role=" + role + + ", tags=" + tags + + '}' ); + } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java index c2034abb79c33..61ee2917127bc 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java @@ -32,17 +32,13 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; -import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -55,7 +51,6 @@ import org.neo4j.kernel.configuration.Settings; import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; @@ -66,9 +61,9 @@ import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_routing_ttl; -import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt; import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica; import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; +import static org.neo4j.causalclustering.discovery.TestTopology.readReplicaInfoMap; import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -370,20 +365,6 @@ private ClusterView run( GetServersProcedureV1 proc ) throws ProcedureException return ClusterView.parse( (List>) rows[1] ); } - public static Map readReplicaInfoMap( int... ids ) - { - return Arrays.stream( ids ).mapToObj( GetServersProcedureV1Test::readReplicaInfo ).collect( Collectors - .toMap( (p) -> new MemberId( UUID.randomUUID() ), Function.identity() ) ); - } - - private static ReadReplicaInfo readReplicaInfo( int id ) - { - AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) ); - return new ReadReplicaInfo( - new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ), - new AdvertisedSocketAddress( "localhost", 4000 + id )); - } - private static class ClusterView { private final Map> clusterView;