Skip to content

Commit

Permalink
Fixed bug by adding all column headings to GetServersProcedure output.
Browse files Browse the repository at this point in the history
Made expiry part of the value type that represents an address/role/timeout triple.
  • Loading branch information
jimwebber committed Sep 18, 2016
1 parent ef770c8 commit 54ff2c3
Showing 1 changed file with 31 additions and 29 deletions.
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.kernel.api.proc.QualifiedName;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.register.Register;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
Expand All @@ -59,7 +58,10 @@ public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator
logProvider )
{
super( procedureSignature( new QualifiedName( new String[]{"dbms", "cluster", "routing"}, NAME ) )
.out( "address", Neo4jTypes.NTString ).build() );
.out( "address", Neo4jTypes.NTString )
.out( "role", Neo4jTypes.NTString )
.out( "expiry", Neo4jTypes.NTInteger)
.build() );

this.discoveryService = discoveryService;
this.leaderLocator = leaderLocator;
Expand All @@ -69,9 +71,9 @@ public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator
@Override
public RawIterator<Object[], ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{
Set<ReadWriteEndPoint> writeEndpoints = emptySet();
Set<ReadWriteEndPoint> readEndpoints = emptySet();
Set<ReadWriteEndPoint> routeEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> writeEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> readEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> routeEndpoints = emptySet();
try
{
readEndpoints = readEndpoints();
Expand All @@ -91,50 +93,45 @@ public RawIterator<Object[], ProcedureException> apply( Context ctx, Object[] in
return wrapUpEndpoints( routeEndpoints, writeEndpoints, readEndpoints );
}

private Set<ReadWriteEndPoint> routeEndpoints()
private Set<ReadWriteRouteEndPoint> routeEndpoints()
{
Stream<AdvertisedSocketAddress> readCore = discoveryService.coreServers().addresses().stream()
.map( CoreAddresses::getBoltServer );

return readCore.map( ReadWriteEndPoint::route ).collect( toSet() );
return readCore.map( ReadWriteRouteEndPoint::route ).collect( toSet() );
}

private Set<ReadWriteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
private Set<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
{
return Stream.of( leader ).map( ReadWriteEndPoint::write ).collect( Collectors.toSet() );
return Stream.of( leader ).map( ReadWriteRouteEndPoint::write ).collect( Collectors.toSet() );
}

private Set<ReadWriteEndPoint> readEndpoints()
private Set<ReadWriteRouteEndPoint> readEndpoints()
{
Stream<AdvertisedSocketAddress> readEdge = discoveryService.edgeServers().members().stream()
.map( EdgeAddresses::getBoltAddress );
Stream<AdvertisedSocketAddress> readCore = discoveryService.coreServers().addresses().stream()
.map( CoreAddresses::getBoltServer );

return concat( readEdge, readCore ).map( ReadWriteEndPoint::read ).collect( toSet() );
return concat( readEdge, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toSet() );
}

private RawIterator<Object[], ProcedureException> wrapUpEndpoints( Set<ReadWriteEndPoint> routeEndpoints,
Set<ReadWriteEndPoint> writeEndpoints,
Set<ReadWriteEndPoint> readEndpoints )
private RawIterator<Object[], ProcedureException> wrapUpEndpoints( Set<ReadWriteRouteEndPoint> routeEndpoints,
Set<ReadWriteRouteEndPoint> writeEndpoints,
Set<ReadWriteRouteEndPoint> readEndpoints )
{
return Iterators.map( ( readWriteEndPoint ) -> new Object[]{readWriteEndPoint.address(),
readWriteEndPoint.type(), expiry()},
return Iterators.map( ( readWriteRouteEndPoint ) -> new Object[]{readWriteRouteEndPoint.address(),
readWriteRouteEndPoint.type(), readWriteRouteEndPoint.expiry()},
asRawIterator( concat( routeEndpoints.stream(),
concat( writeEndpoints.stream(), readEndpoints.stream() ) ).iterator() ) );
}

private long expiry()
{
return Long.MAX_VALUE;
}

public enum Type
{
READ, WRITE, ROUTE
}

private static class ReadWriteEndPoint
private static class ReadWriteRouteEndPoint
{
private final AdvertisedSocketAddress address;
private final Type type;
Expand All @@ -149,25 +146,30 @@ public String type()
return type.toString().toUpperCase();
}

ReadWriteEndPoint( AdvertisedSocketAddress address, Type type )
long expiry()
{
return Long.MAX_VALUE;
}

ReadWriteRouteEndPoint( AdvertisedSocketAddress address, Type type )
{
this.address = address;
this.type = type;
}

public static ReadWriteEndPoint write( AdvertisedSocketAddress address )
public static ReadWriteRouteEndPoint write( AdvertisedSocketAddress address )
{
return new ReadWriteEndPoint( address, Type.WRITE );
return new ReadWriteRouteEndPoint( address, Type.WRITE );
}

public static ReadWriteEndPoint read( AdvertisedSocketAddress address )
public static ReadWriteRouteEndPoint read( AdvertisedSocketAddress address )
{
return new ReadWriteEndPoint( address, Type.READ );
return new ReadWriteRouteEndPoint( address, Type.READ );
}

static ReadWriteEndPoint route( AdvertisedSocketAddress address )
static ReadWriteRouteEndPoint route( AdvertisedSocketAddress address )
{
return new ReadWriteEndPoint( address, Type.ROUTE );
return new ReadWriteRouteEndPoint( address, Type.ROUTE );
}
}
}

0 comments on commit 54ff2c3

Please sign in to comment.