Skip to content

Commit

Permalink
Made changes requested in review by Martin Furmanski
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Mar 20, 2018
1 parent 7a50fff commit fedcaf8
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 33 deletions.
Expand Up @@ -26,26 +26,23 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.stream.Stream;


import org.neo4j.causalclustering.routing.Endpoint; import org.neo4j.causalclustering.routing.Endpoint;
import org.neo4j.causalclustering.routing.load_balancing.LoadBalancingProcessor; import org.neo4j.causalclustering.routing.load_balancing.LoadBalancingProcessor;
import org.neo4j.causalclustering.routing.load_balancing.LoadBalancingResult; import org.neo4j.causalclustering.routing.load_balancing.LoadBalancingResult;
import org.neo4j.causalclustering.routing.Role; import org.neo4j.causalclustering.routing.Role;
import org.neo4j.causalclustering.routing.procedure.RoutingResultFormat;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.SocketAddress; import org.neo4j.helpers.SocketAddress;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.neo4j.causalclustering.routing.Role.READ; import static org.neo4j.causalclustering.routing.Role.READ;
import static org.neo4j.causalclustering.routing.Role.ROUTE; import static org.neo4j.causalclustering.routing.Role.ROUTE;
import static org.neo4j.causalclustering.routing.Role.WRITE; import static org.neo4j.causalclustering.routing.Role.WRITE;
import static org.neo4j.causalclustering.routing.procedure.RoutingResultFormatHelper.parseEndpoints;


/** /**
* The result format of GetServersV1 and GetServersV2 procedures. * The result format of GetServersV1 and GetServersV2 procedures.
*/ */
public class ResultFormatV1 extends RoutingResultFormat public class ResultFormatV1
{ {
private static final String ROLE_KEY = "role"; private static final String ROLE_KEY = "role";
private static final String ADDRESSES_KEY = "addresses"; private static final String ADDRESSES_KEY = "addresses";
Expand Down
Expand Up @@ -32,12 +32,12 @@
public class MultiClusterRoutingResult implements RoutingResult public class MultiClusterRoutingResult implements RoutingResult
{ {
private final Map<String,List<Endpoint>> routers; private final Map<String,List<Endpoint>> routers;
private final long ttl; private final long timeToLiveMillis;


public MultiClusterRoutingResult( Map<String,List<Endpoint>> routers, long ttl ) public MultiClusterRoutingResult( Map<String,List<Endpoint>> routers, long timeToLiveMillis )
{ {
this.routers = routers; this.routers = routers;
this.ttl = ttl; this.timeToLiveMillis = timeToLiveMillis;
} }


public Map<String,List<Endpoint>> routers() public Map<String,List<Endpoint>> routers()
Expand All @@ -47,7 +47,7 @@ public Map<String,List<Endpoint>> routers()


public long ttlMillis() public long ttlMillis()
{ {
return ttl; return timeToLiveMillis;
} }


@Override @Override
Expand All @@ -62,14 +62,13 @@ public boolean equals( Object o )
return false; return false;
} }
MultiClusterRoutingResult that = (MultiClusterRoutingResult) o; MultiClusterRoutingResult that = (MultiClusterRoutingResult) o;
return ttl == that.ttl && Objects.equals( routers, that.routers ); return timeToLiveMillis == that.timeToLiveMillis && Objects.equals( routers, that.routers );
} }


@Override @Override
public int hashCode() public int hashCode()
{ {

return Objects.hash( routers, timeToLiveMillis );
return Objects.hash( routers, ttl );
} }
} }


Expand Up @@ -54,17 +54,17 @@ public class GetSubClusterRoutersProcedure implements CallableProcedure
procedureSignature( GET_SUB_CLUSTER_ROUTERS.fullyQualifiedProcedureName() ) procedureSignature( GET_SUB_CLUSTER_ROUTERS.fullyQualifiedProcedureName() )
.in( DATABASE.parameterName(), Neo4jTypes.NTString ) .in( DATABASE.parameterName(), Neo4jTypes.NTString )
.out( TTL.parameterName(), Neo4jTypes.NTInteger ) .out( TTL.parameterName(), Neo4jTypes.NTInteger )
.out( ROUTERS.parameterName(), Neo4jTypes.NTMap ) .out( ROUTERS.parameterName(), Neo4jTypes.NTList( Neo4jTypes.NTMap ) )
.description( DESCRIPTION ) .description( DESCRIPTION )
.build(); .build();


private final TopologyService topologyService; private final TopologyService topologyService;
private final Config config; private final long timeToLiveMillis;


public GetSubClusterRoutersProcedure( TopologyService topologyService, Config config ) public GetSubClusterRoutersProcedure( TopologyService topologyService, Config config )
{ {
this.topologyService = topologyService; this.topologyService = topologyService;
this.config = config; this.timeToLiveMillis = config.get( CausalClusteringSettings.cluster_routing_ttl ).toMillis();
} }


@Override @Override
Expand All @@ -79,12 +79,11 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
String dbName = (String) input[0]; String dbName = (String) input[0];
List<Endpoint> routers = routeEndpoints( dbName ); List<Endpoint> routers = routeEndpoints( dbName );
long ttl = config.get( CausalClusteringSettings.cluster_routing_ttl ).toMillis();


HashMap<String,List<Endpoint>> routerMap = new HashMap<>(); HashMap<String,List<Endpoint>> routerMap = new HashMap<>();
routerMap.put( dbName, routers ); routerMap.put( dbName, routers );


MultiClusterRoutingResult result = new MultiClusterRoutingResult( routerMap, ttl ); MultiClusterRoutingResult result = new MultiClusterRoutingResult( routerMap, timeToLiveMillis );
return RawIterator.<Object[], ProcedureException>of( MultiClusterRoutingResultFormat.build( result ) ); return RawIterator.<Object[], ProcedureException>of( MultiClusterRoutingResultFormat.build( result ) );
} }


Expand Down
Expand Up @@ -53,17 +53,17 @@ public class GetSuperClusterRoutersProcedure implements CallableProcedure
private final ProcedureSignature procedureSignature = private final ProcedureSignature procedureSignature =
procedureSignature( GET_SUPER_CLUSTER_ROUTERS.fullyQualifiedProcedureName() ) procedureSignature( GET_SUPER_CLUSTER_ROUTERS.fullyQualifiedProcedureName() )
.out( TTL.parameterName(), Neo4jTypes.NTInteger ) .out( TTL.parameterName(), Neo4jTypes.NTInteger )
.out( ROUTERS.parameterName(), Neo4jTypes.NTMap ) .out( ROUTERS.parameterName(), Neo4jTypes.NTList( Neo4jTypes.NTMap ) )
.description( DESCRIPTION ) .description( DESCRIPTION )
.build(); .build();


private final TopologyService topologyService; private final TopologyService topologyService;
private final Config config; private final long timeToLiveMillis;


public GetSuperClusterRoutersProcedure( TopologyService topologyService, Config config ) public GetSuperClusterRoutersProcedure( TopologyService topologyService, Config config )
{ {
this.topologyService = topologyService; this.topologyService = topologyService;
this.config = config; this.timeToLiveMillis = config.get( CausalClusteringSettings.cluster_routing_ttl ).toMillis();
} }


@Override @Override
Expand All @@ -76,8 +76,7 @@ public ProcedureSignature signature()
public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException
{ {
Map<String,List<Endpoint>> routersPerDb = routeEndpoints(); Map<String,List<Endpoint>> routersPerDb = routeEndpoints();
long ttl = config.get( CausalClusteringSettings.cluster_routing_ttl ).toMillis(); MultiClusterRoutingResult result = new MultiClusterRoutingResult( routersPerDb, timeToLiveMillis );
MultiClusterRoutingResult result = new MultiClusterRoutingResult( routersPerDb, ttl );
return RawIterator.<Object[], ProcedureException>of( MultiClusterRoutingResultFormat.build( result ) ); return RawIterator.<Object[], ProcedureException>of( MultiClusterRoutingResultFormat.build( result ) );
} }


Expand Down
Expand Up @@ -25,20 +25,19 @@
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;


import org.neo4j.causalclustering.routing.Endpoint; import org.neo4j.causalclustering.routing.Endpoint;
import org.neo4j.causalclustering.routing.Role; import org.neo4j.causalclustering.routing.Role;
import org.neo4j.causalclustering.routing.RoutingResult;
import org.neo4j.causalclustering.routing.multi_cluster.MultiClusterRoutingResult; import org.neo4j.causalclustering.routing.multi_cluster.MultiClusterRoutingResult;
import org.neo4j.causalclustering.routing.procedure.RoutingResultFormat; import static org.neo4j.causalclustering.routing.procedure.RoutingResultFormatHelper.parseEndpoints;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;


/** /**
* The result format of Get*ClusterRouting procedures. * The result format of {@link GetSubClusterRoutersProcedure} and
* {@link GetSuperClusterRoutersProcedure} procedures.
*/ */
public class MultiClusterRoutingResultFormat extends RoutingResultFormat public class MultiClusterRoutingResultFormat
{ {


private static final String DB_NAME_KEY = "database"; private static final String DB_NAME_KEY = "database";
Expand Down
Expand Up @@ -24,7 +24,6 @@


public interface ProcedureNamesEnum public interface ProcedureNamesEnum
{ {

String[] procedureNameSpace(); String[] procedureNameSpace();
String procedureName(); String procedureName();


Expand Down
Expand Up @@ -28,10 +28,10 @@


import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;


public abstract class RoutingResultFormat public final class RoutingResultFormatHelper
{ {


protected static List<Endpoint> parseEndpoints( Object[] addresses, Role role ) public static List<Endpoint> parseEndpoints( Object[] addresses, Role role )
{ {
return Stream.of( addresses ) return Stream.of( addresses )
.map( rawAddress -> parseAddress( (String) rawAddress ) ) .map( rawAddress -> parseAddress( (String) rawAddress ) )
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.causalclustering.scenarios; package org.neo4j.causalclustering.scenarios;


import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -152,7 +150,7 @@ public void superCallShouldReturnAllRouters()
} }


@Test @Test
public void subCallShouldRerturnLocalRouters() public void subCallShouldReturnLocalRouters()
{ {
String dbName = getFirstDbName( dbNames ); String dbName = getFirstDbName( dbNames );
Stream<CoreGraphDatabase> members = dbNames.stream().map( n -> cluster.getDbWithAnyRole( n, Role.FOLLOWER, Role.LEADER ).database() ); Stream<CoreGraphDatabase> members = dbNames.stream().map( n -> cluster.getDbWithAnyRole( n, Role.FOLLOWER, Role.LEADER ).database() );
Expand Down

0 comments on commit fedcaf8

Please sign in to comment.