Skip to content

Commit

Permalink
Change the format of the returned data to:
Browse files Browse the repository at this point in the history
C: RUN "CALL dbms.cluster.routing.getServers" {}
   PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
   RECORD [9223372036854775807, [
{"role: "WRITE", "addresses": ["127.0.0.1:9001"]},
{"role": "READ", "addresses": ["127.0.0.1:9002", "127.0.0.1:9003"]},
{"role": "ROUTE", "addresses": ["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}
]]
  • Loading branch information
jimwebber authored and Mark Needham committed Sep 21, 2016
1 parent cc70fd2 commit a0da033
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 92 deletions.
Expand Up @@ -59,7 +59,9 @@ public ClusterOverviewProcedure( CoreTopologyService discoveryService,
{ {
super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) ) super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( "id", Neo4jTypes.NTString ).out( "address", Neo4jTypes.NTString ) .out( "id", Neo4jTypes.NTString ).out( "address", Neo4jTypes.NTString )
.out( "role", Neo4jTypes.NTString ).build() ); .out( "role", Neo4jTypes.NTString )
.description( "Overview of all currently accessible cluster members and their roles." )
.build() );
this.discoveryService = discoveryService; this.discoveryService = discoveryService;
this.leaderLocator = leaderLocator; this.leaderLocator = leaderLocator;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
Expand Down
Expand Up @@ -19,7 +19,11 @@
*/ */
package org.neo4j.coreedge.discovery.procedures; package org.neo4j.coreedge.discovery.procedures;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


Expand All @@ -31,7 +35,6 @@
import org.neo4j.coreedge.discovery.EdgeAddresses; import org.neo4j.coreedge.discovery.EdgeAddresses;
import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context; import org.neo4j.kernel.api.proc.Context;
Expand All @@ -43,24 +46,32 @@
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.concat; import static java.util.stream.Stream.concat;

import static org.neo4j.helpers.collection.Iterators.asRawIterator;
import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature;


/*
C: RUN "CALL dbms.cluster.routing.getServers" {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [
{"role": "WRITE", "addresses": ["127.0.0.1:9001"]},
{"role": "READ", "addresses": ["127.0.0.1:9002", "127.0.0.1:9003"]},
{"role": "ROUTE", "addresses": ["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}
]]
*/
public class GetServersProcedure extends CallableProcedure.BasicProcedure public class GetServersProcedure extends CallableProcedure.BasicProcedure
{ {
public static final String NAME = "getServers"; public static final String NAME = "getServers";
private final CoreTopologyService discoveryService; private final CoreTopologyService discoveryService;
private final LeaderLocator leaderLocator; private final LeaderLocator leaderLocator;
private final Log log; private final Log log;


public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator leaderLocator, LogProvider public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator leaderLocator,
logProvider ) LogProvider logProvider )
{ {
super( procedureSignature( new QualifiedName( new String[]{"dbms", "cluster", "routing"}, NAME ) ) super( procedureSignature( new QualifiedName( new String[]{"dbms", "cluster", "routing"}, NAME ) )
.out( "address", Neo4jTypes.NTString ) .out( "ttl", Neo4jTypes.NTInteger ).out( "servers", Neo4jTypes.NTMap )
.out( "role", Neo4jTypes.NTString ) .description( "Provides recommendations about servers that support reads, writes, and can act as " +
.out( "expiry", Neo4jTypes.NTInteger) "routers." )
.build() ); .build() );


this.discoveryService = discoveryService; this.discoveryService = discoveryService;
Expand All @@ -69,21 +80,18 @@ public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator
} }


@Override @Override
public RawIterator<Object[], ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{ {
Set<ReadWriteRouteEndPoint> writeEndpoints = emptySet(); Set<ReadWriteRouteEndPoint> writeEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> readEndpoints = emptySet(); Set<ReadWriteRouteEndPoint> readEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> routeEndpoints = emptySet(); Set<ReadWriteRouteEndPoint> routeEndpoints = emptySet();
try try
{ {
readEndpoints = readEndpoints(); readEndpoints = readEndpoints();

routeEndpoints = routeEndpoints();
AdvertisedSocketAddress leaderAddress = AdvertisedSocketAddress leaderAddress =
discoveryService.coreServers().find( leaderLocator.getLeader() ).getBoltServer(); discoveryService.coreServers().find( leaderLocator.getLeader() ).getBoltServer();
writeEndpoints = writeEndpoints( leaderAddress ); writeEndpoints = writeEndpoints( leaderAddress );

routeEndpoints = routeEndpoints();

} }
catch ( NoLeaderFoundException | NoKnownAddressesException e ) catch ( NoLeaderFoundException | NoKnownAddressesException e )
{ {
Expand All @@ -95,10 +103,10 @@ public RawIterator<Object[], ProcedureException> apply( Context ctx, Object[] in


private Set<ReadWriteRouteEndPoint> routeEndpoints() private Set<ReadWriteRouteEndPoint> routeEndpoints()
{ {
Stream<AdvertisedSocketAddress> readCore = discoveryService.coreServers().addresses().stream() Stream<AdvertisedSocketAddress> routers =
.map( CoreAddresses::getBoltServer ); discoveryService.coreServers().addresses().stream().map( CoreAddresses::getBoltServer );


return readCore.map( ReadWriteRouteEndPoint::route ).collect( toSet() ); return routers.map( ReadWriteRouteEndPoint::route ).collect( toSet() );
} }


private Set<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader ) private Set<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
Expand All @@ -108,27 +116,63 @@ private Set<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress lead


private Set<ReadWriteRouteEndPoint> readEndpoints() private Set<ReadWriteRouteEndPoint> readEndpoints()
{ {
Stream<AdvertisedSocketAddress> readEdge = discoveryService.edgeServers().members().stream() Stream<AdvertisedSocketAddress> readEdge =
.map( EdgeAddresses::getBoltAddress ); discoveryService.edgeServers().members().stream().map( EdgeAddresses::getBoltAddress );
Stream<AdvertisedSocketAddress> readCore = discoveryService.coreServers().addresses().stream() Stream<AdvertisedSocketAddress> readCore =
.map( CoreAddresses::getBoltServer ); discoveryService.coreServers().addresses().stream().map( CoreAddresses::getBoltServer );


return concat( readEdge, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toSet() ); return concat( readEdge, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toSet() );
} }


private RawIterator<Object[], ProcedureException> wrapUpEndpoints( Set<ReadWriteRouteEndPoint> routeEndpoints, private RawIterator<Object[],ProcedureException> wrapUpEndpoints( Set<ReadWriteRouteEndPoint> routeEndpoints,
Set<ReadWriteRouteEndPoint> writeEndpoints, Set<ReadWriteRouteEndPoint> writeEndpoints, Set<ReadWriteRouteEndPoint> readEndpoints )
Set<ReadWriteRouteEndPoint> readEndpoints )
{ {
return Iterators.map( ( readWriteRouteEndPoint ) -> new Object[]{readWriteRouteEndPoint.address(), Object[] routers = routeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
readWriteRouteEndPoint.type(), readWriteRouteEndPoint.expiry()}, Object[] readers = readEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
asRawIterator( concat( routeEndpoints.stream(), Object[] writers = writeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
concat( writeEndpoints.stream(), readEndpoints.stream() ) ).iterator() ) );
List<Map<String,Object>> servers = new ArrayList<>();

if ( writers.length > 0 )
{
Map<String,Object> map = new TreeMap<>();

map.put( "role", Type.WRITE.name());
map.put( "addresses", writers );

servers.add( map );
}

if ( readers.length > 0 )
{
Map<String,Object> map = new TreeMap<>();

map.put( "role", Type.READ.name());
map.put( "addresses", readers);

servers.add( map );

}

if ( routers.length > 0 )
{
Map<String,Object> map = new TreeMap<>();

map.put( "role", Type.ROUTE.name());
map.put( "addresses", routers);

servers.add( map );
}

Object[] row = new Object[] { Long.MAX_VALUE, servers };
return RawIterator.<Object[], ProcedureException>of(row);
} }


public enum Type public enum Type
{ {
READ, WRITE, ROUTE READ,
WRITE,
ROUTE
} }


private static class ReadWriteRouteEndPoint private static class ReadWriteRouteEndPoint
Expand All @@ -146,11 +190,6 @@ public String type()
return type.toString().toUpperCase(); return type.toString().toUpperCase();
} }


long expiry()
{
return Long.MAX_VALUE;
}

ReadWriteRouteEndPoint( AdvertisedSocketAddress address, Type type ) ReadWriteRouteEndPoint( AdvertisedSocketAddress address, Type type )
{ {
this.address = address; this.address = address;
Expand All @@ -171,5 +210,11 @@ static ReadWriteRouteEndPoint route( AdvertisedSocketAddress address )
{ {
return new ReadWriteRouteEndPoint( address, Type.ROUTE ); return new ReadWriteRouteEndPoint( address, Type.ROUTE );
} }

@Override
public String toString()
{
return "ReadWriteRouteEndPoint{" + "address=" + address + ", type=" + type + '}';
}
} }
} }
Expand Up @@ -37,7 +37,9 @@ abstract class RoleProcedure extends CallableProcedure.BasicProcedure
RoleProcedure() RoleProcedure()
{ {
super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) ) super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( OUTPUT_NAME, Neo4jTypes.NTString ).build() ); .out( OUTPUT_NAME, Neo4jTypes.NTString )
.description( "The role of a specific instance in the cluster." )
.build() );
} }


@Override @Override
Expand Down

0 comments on commit a0da033

Please sign in to comment.