Skip to content

Commit

Permalink
cluster overview v2 procedure with tags
Browse files Browse the repository at this point in the history
The next version of cluster overview which shows the tags as well.
  • Loading branch information
martinfurmanski committed Feb 28, 2017
1 parent 922768d commit 66ca3ad
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 58 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
Expand All @@ -44,10 +45,14 @@
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.util.Comparator.comparing; import static java.util.Comparator.comparing;
import static org.neo4j.helpers.collection.Iterables.asList;
import static org.neo4j.helpers.collection.Iterators.asRawIterator; import static org.neo4j.helpers.collection.Iterators.asRawIterator;
import static org.neo4j.helpers.collection.Iterators.map; import static org.neo4j.helpers.collection.Iterators.map;
import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature;


/**
* Overview procedure with added support for server tags.
*/
public class ClusterOverviewProcedure extends CallableProcedure.BasicProcedure public class ClusterOverviewProcedure extends CallableProcedure.BasicProcedure
{ {
private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"}; private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"};
Expand All @@ -60,8 +65,10 @@ public ClusterOverviewProcedure( TopologyService topologyService,
LeaderLocator leaderLocator, LogProvider logProvider ) LeaderLocator leaderLocator, LogProvider logProvider )
{ {
super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) ) super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( "id", Neo4jTypes.NTString ).out( "addresses", Neo4jTypes.NTList( Neo4jTypes.NTString ) ) .out( "id", Neo4jTypes.NTString )
.out( "addresses", Neo4jTypes.NTList( Neo4jTypes.NTString ) )
.out( "role", Neo4jTypes.NTString ) .out( "role", Neo4jTypes.NTString )
.out( "tags", Neo4jTypes.NTList( Neo4jTypes.NTString ) )
.description( "Overview of all currently accessible cluster members and their roles." ) .description( "Overview of all currently accessible cluster members and their roles." )
.build() ); .build() );
this.topologyService = topologyService; this.topologyService = topologyService;
Expand All @@ -76,6 +83,7 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
CoreTopology coreTopology = topologyService.coreServers(); CoreTopology coreTopology = topologyService.coreServers();
Set<MemberId> coreMembers = coreTopology.members(); Set<MemberId> coreMembers = coreTopology.members();
MemberId leader = null; MemberId leader = null;

try try
{ {
leader = leaderLocator.getLeader(); leader = leaderLocator.getLeader();
Expand All @@ -87,36 +95,42 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp


for ( MemberId memberId : coreMembers ) for ( MemberId memberId : coreMembers )
{ {
Optional<ClientConnectorAddresses> clientConnectorAddresses = Optional<CoreServerInfo> coreServerInfo = coreTopology.find( memberId );
coreTopology.find( memberId ).map( CoreServerInfo::connectors ); if ( coreServerInfo.isPresent() )
if ( clientConnectorAddresses.isPresent() )
{ {
Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER; Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER;
endpoints.add( new ReadWriteEndPoint( clientConnectorAddresses.get(), role, memberId.getUuid() ) ); endpoints.add( new ReadWriteEndPoint( coreServerInfo.get().connectors(), role, memberId.getUuid(), asList( coreServerInfo.get().tags() ) ) );
} }
else else
{ {
log.debug( "No Address found for " + memberId ); log.debug( "No Address found for " + memberId );
} }
} }
for ( ReadReplicaInfo readReplicaInfo : topologyService.readReplicas().allMemberInfo() )
for ( Map.Entry<MemberId,ReadReplicaInfo> readReplica : topologyService.readReplicas().replicaMembers().entrySet() )
{ {
endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA ) ); ReadReplicaInfo readReplicaInfo = readReplica.getValue();
endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA, readReplica.getKey().getUuid(), asList( readReplicaInfo.tags() ) ) );
} }


endpoints.sort( comparing( o -> o.addresses().toString() ) ); endpoints.sort( comparing( o -> o.addresses().toString() ) );


return map( ( l ) -> new Object[]{l.identifier().toString(), l.addresses().uriList().stream().map( URI::toString ).toArray(), l.role().name()}, return map( ( endpoint ) -> new Object[]
{
endpoint.memberId().toString(),
endpoint.addresses().uriList().stream().map( URI::toString ).toArray(),
endpoint.role().name(),
endpoint.tags()
},
asRawIterator( endpoints.iterator() ) ); asRawIterator( endpoints.iterator() ) );
} }


private static class ReadWriteEndPoint static class ReadWriteEndPoint
{ {
private static final UUID ZERO_ID = new UUID( 0, 0 );

private final ClientConnectorAddresses clientConnectorAddresses; private final ClientConnectorAddresses clientConnectorAddresses;
private final Role role; private final Role role;
private final UUID identifier; private final UUID memberId;
private final List<String> tags;


public ClientConnectorAddresses addresses() public ClientConnectorAddresses addresses()
{ {
Expand All @@ -128,21 +142,22 @@ public Role role()
return role; return role;
} }


UUID identifier() UUID memberId()
{ {
return identifier == null ? ZERO_ID : identifier; return memberId;
} }


ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role ) List<String> tags()
{ {
this( clientConnectorAddresses, role, null ); return tags;
} }


ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role, UUID identifier ) ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role, UUID memberId, List<String> tags )
{ {
this.clientConnectorAddresses = clientConnectorAddresses; this.clientConnectorAddresses = clientConnectorAddresses;
this.role = role; this.role = role;
this.identifier = identifier; this.memberId = memberId;
this.tags = tags;
} }
} }
} }
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.causalclustering.core.state.ClusterStateDirectory; import org.neo4j.causalclustering.core.state.ClusterStateDirectory;
import org.neo4j.causalclustering.core.state.CoreState; import org.neo4j.causalclustering.core.state.CoreState;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand Down Expand Up @@ -79,7 +80,7 @@ public CoreClusterMember( int serverId, int clusterSize,


String initialMembers = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); String initialMembers = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) );


config.put( "dbms.mode", "CORE" ); config.put( ClusterSettings.mode.name(), ClusterSettings.Mode.CORE.name() );
config.put( GraphDatabaseSettings.default_advertised_address.name(), "localhost" ); config.put( GraphDatabaseSettings.default_advertised_address.name(), "localhost" );
config.put( CausalClusteringSettings.initial_discovery_members.name(), initialMembers ); config.put( CausalClusteringSettings.initial_discovery_members.name(), initialMembers );
config.put( CausalClusteringSettings.discovery_listen_address.name(), "127.0.0.1:" + hazelcastPort ); config.put( CausalClusteringSettings.discovery_listen_address.name(), "127.0.0.1:" + hazelcastPort );
Expand Down
Expand Up @@ -19,10 +19,18 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;


import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt; import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt;
import static org.neo4j.helpers.collection.Iterators.asSet;


public class TestTopology public class TestTopology
{ {
Expand All @@ -36,7 +44,8 @@ public static CoreServerInfo adressesForCore( int id )
AdvertisedSocketAddress raftServerAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); AdvertisedSocketAddress raftServerAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) );
AdvertisedSocketAddress catchupServerAddress = new AdvertisedSocketAddress( "localhost", (4000 + id) ); AdvertisedSocketAddress catchupServerAddress = new AdvertisedSocketAddress( "localhost", (4000 + id) );
AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (5000 + id) ); AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (5000 + id) );
return new CoreServerInfo( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ) ); return new CoreServerInfo( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ),
asSet( "core", "core" + id ) );
} }


public static ReadReplicaInfo addressesForReadReplica( int id ) public static ReadReplicaInfo addressesForReadReplica( int id )
Expand All @@ -45,6 +54,22 @@ public static ReadReplicaInfo addressesForReadReplica( int id )
ClientConnectorAddresses clientConnectorAddresses = new ClientConnectorAddresses( ClientConnectorAddresses clientConnectorAddresses = new ClientConnectorAddresses(
singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ); singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) );


return new ReadReplicaInfo( clientConnectorAddresses, advertisedSocketAddress ); return new ReadReplicaInfo( clientConnectorAddresses, advertisedSocketAddress,
asSet( "replica", "replica" + id ) );
}

public static Map<MemberId,ReadReplicaInfo> readReplicaInfoMap( int... ids )
{
return Arrays.stream( ids ).mapToObj( TestTopology::readReplicaInfo ).collect( Collectors
.toMap( ( p ) -> new MemberId( UUID.randomUUID() ), Function.identity() ) );
}

private static ReadReplicaInfo readReplicaInfo( int id )
{
AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) );
return new ReadReplicaInfo(
new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ),
new AdvertisedSocketAddress( "localhost", 4000 + id ),
asSet( "replica", "replica" + id ) );
} }
} }
Expand Up @@ -19,12 +19,15 @@
*/ */
package org.neo4j.causalclustering.discovery.procedures; package org.neo4j.causalclustering.discovery.procedures;


import org.hamcrest.collection.IsIterableContainingInOrder; import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test; import org.junit.Test;


import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;


import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
Expand All @@ -34,14 +37,18 @@
import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopology;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica;
import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore;
import static org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1Test.readReplicaInfoMap; import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.helpers.collection.Iterators.asList;


public class ClusterOverviewProcedureTest public class ClusterOverviewProcedureTest
{ {
Expand All @@ -60,27 +67,92 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception
coreMembers.put( follower1, adressesForCore( 1 ) ); coreMembers.put( follower1, adressesForCore( 1 ) );
coreMembers.put( follower2, adressesForCore( 2 ) ); coreMembers.put( follower2, adressesForCore( 2 ) );


Map<MemberId,ReadReplicaInfo> readReplicas = readReplicaInfoMap( 4, 5 ); Map<MemberId,ReadReplicaInfo> replicaMembers = new HashMap<>();
MemberId replica4 = new MemberId( UUID.randomUUID() );
MemberId replica5 = new MemberId( UUID.randomUUID() );

replicaMembers.put( replica4, addressesForReadReplica( 4 ) );
replicaMembers.put( replica5, addressesForReadReplica( 5 ) );


when( topologyService.coreServers() ).thenReturn( new CoreTopology( null, false, coreMembers ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( null, false, coreMembers ) );
when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( readReplicas ) ); when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( replicaMembers ) );


LeaderLocator leaderLocator = mock( LeaderLocator.class ); LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( theLeader ); when( leaderLocator.getLeader() ).thenReturn( theLeader );


ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator, ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator, NullLogProvider.getInstance() );
NullLogProvider.getInstance() );


// when // when
final List<Object[]> members = asList( procedure.apply( null, new Object[0] ) ); final RawIterator<Object[],ProcedureException> members = procedure.apply( null, new Object[0] );


// then assertThat( members.next(), new IsRecord( theLeader.getUuid(), 5000, Role.LEADER, asSet( "core", "core0" ) ) );
assertThat( members, IsIterableContainingInOrder.contains( assertThat( members.next(), new IsRecord( follower1.getUuid(), 5001, Role.FOLLOWER, asSet( "core", "core1" ) ) );
new Object[]{theLeader.getUuid().toString(), new String[] {"bolt://localhost:5000"}, "LEADER"}, assertThat( members.next(), new IsRecord( follower2.getUuid(), 5002, Role.FOLLOWER, asSet( "core", "core2" ) ) );
new Object[]{follower1.getUuid().toString(), new String[] {"bolt://localhost:5001"}, "FOLLOWER"},
new Object[]{follower2.getUuid().toString(), new String[] {"bolt://localhost:5002"}, "FOLLOWER"}, assertThat( members.next(), new IsRecord( replica4.getUuid(), 6004, Role.READ_REPLICA, asSet( "replica", "replica4" ) ) );
new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6004"}, "READ_REPLICA"}, assertThat( members.next(), new IsRecord( replica5.getUuid(), 6005, Role.READ_REPLICA, asSet( "replica", "replica5" ) ) );
new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6005"}, "READ_REPLICA"}
) ); assertFalse( members.hasNext() );
}

class IsRecord extends TypeSafeMatcher<Object[]>
{
private final UUID memberId;
private final int boltPort;
private final Role role;
private final Set<String> tags;

IsRecord( UUID memberId, int boltPort, Role role, Set<String> tags )
{
this.memberId = memberId;
this.boltPort = boltPort;
this.role = role;
this.tags = tags;
}

@Override
protected boolean matchesSafely( Object[] record )
{
if ( record.length != 4 )
{
return false;
}

if ( !memberId.toString().equals( record[0] ) )
{
return false;
}

String[] boltAddresses = new String[]{"bolt://localhost:" + boltPort};

if ( !Arrays.equals( boltAddresses, ((Object[]) record[1]) ) )
{
return false;
}

if ( !role.name().equals( record[2] ) )
{
return false;
}

Set<String> recordTags = Iterables.asSet( (List<String>) record[3] );
if ( !tags.equals( recordTags ) )
{
return false;
}

return true;
}

@Override
public void describeTo( Description description )
{
description.appendText(
"memberId=" + memberId +
", boltPort=" + boltPort +
", role=" + role +
", tags=" + tags +
'}' );
}
} }
} }
Expand Up @@ -32,17 +32,13 @@
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.discovery.ClientConnectorAddresses;
import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreServerInfo;
import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopology;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -55,7 +51,6 @@
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -66,9 +61,9 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_routing_ttl; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_routing_ttl;
import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt;
import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica; import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica;
import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore;
import static org.neo4j.causalclustering.discovery.TestTopology.readReplicaInfoMap;
import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.causalclustering.identity.RaftTestMember.member;
import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.helpers.collection.Iterators.asList;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand Down Expand Up @@ -370,20 +365,6 @@ private ClusterView run( GetServersProcedureV1 proc ) throws ProcedureException
return ClusterView.parse( (List<Map<String,Object>>) rows[1] ); return ClusterView.parse( (List<Map<String,Object>>) rows[1] );
} }


public static Map<MemberId,ReadReplicaInfo> readReplicaInfoMap( int... ids )
{
return Arrays.stream( ids ).mapToObj( GetServersProcedureV1Test::readReplicaInfo ).collect( Collectors
.toMap( (p) -> new MemberId( UUID.randomUUID() ), Function.identity() ) );
}

private static ReadReplicaInfo readReplicaInfo( int id )
{
AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) );
return new ReadReplicaInfo(
new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ),
new AdvertisedSocketAddress( "localhost", 4000 + id ));
}

private static class ClusterView private static class ClusterView
{ {
private final Map<Role,Set<AdvertisedSocketAddress>> clusterView; private final Map<Role,Set<AdvertisedSocketAddress>> clusterView;
Expand Down

0 comments on commit 66ca3ad

Please sign in to comment.