Skip to content

Commit

Permalink
server policies load balancing IT
Browse files Browse the repository at this point in the history
Tests that the GetServersV2 procedure with the server_policies
load balancing plugin hooked in has the expected behaviour when
it comes to the defined policies and in the face of failovers.
  • Loading branch information
martinfurmanski committed Feb 23, 2017
1 parent c14c847 commit 599922f
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 42 deletions.
Expand Up @@ -32,17 +32,17 @@ public class Endpoint
private final AdvertisedSocketAddress address;
private final Role role;

public String address()
{
return address.toString();
}

public Endpoint( AdvertisedSocketAddress address, Role role )
{
this.address = address;
this.role = role;
}

public AdvertisedSocketAddress address()
{
return address;
}

public static Endpoint write( AdvertisedSocketAddress address )
{
return new Endpoint( address, Role.WRITE );
Expand Down
Expand Up @@ -85,4 +85,15 @@ public int hashCode()
{
return Objects.hash( routeEndpoints, writeEndpoints, readEndpoints, timeToLiveMillis );
}

@Override
public String toString()
{
return "LoadBalancingResult{" +
"routeEndpoints=" + routeEndpoints +
", writeEndpoints=" + writeEndpoints +
", readEndpoints=" + readEndpoints +
", timeToLiveMillis=" + timeToLiveMillis +
'}';
}
}
Expand Up @@ -27,9 +27,9 @@

import static java.lang.String.format;

class Policies
public class Policies
{
static final String POLICY_KEY = "load_balancing.policy"; // TODO: move somewhere (driver support package?)
public static final String POLICY_KEY = "load_balancing.policy"; // TODO: move somewhere (driver support package?)
static final String DEFAULT_POLICY_NAME = "default";
static final Policy DEFAULT_POLICY = new FilteringPolicy( IdentityFilter.as() );

Expand Down
Expand Up @@ -94,8 +94,9 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
List<Endpoint> writeEndpoints = writeEndpoints();
List<Endpoint> readEndpoints = readEndpoints();

return ResultFormatV1.build( new LoadBalancingResult( routeEndpoints, writeEndpoints, readEndpoints,
config.get( CausalClusteringSettings.cluster_routing_ttl ) ) );
return RawIterator.<Object[],ProcedureException>of( ResultFormatV1.build(
new LoadBalancingResult( routeEndpoints, writeEndpoints, readEndpoints,
config.get( CausalClusteringSettings.cluster_routing_ttl ) ) ) );
}

private Optional<AdvertisedSocketAddress> leaderBoltAddress()
Expand Down
Expand Up @@ -76,10 +76,10 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp

LoadBalancingProcessor.Result result = loadBalancingProcessor.run( clientContext );

return ResultFormatV1.build( new LoadBalancingResult(
return RawIterator.<Object[],ProcedureException>of( ResultFormatV1.build( new LoadBalancingResult(
result.routeEndpoints(),
result.writeEndpoints(),
result.readEndpoints(),
result.getTimeToLiveMillis() ) );
result.getTimeToLiveMillis() ) ) );
}
}
Expand Up @@ -21,6 +21,8 @@

import java.util.Arrays;

import static java.lang.String.join;

/**
* This is part of the cluster / driver interface specification and
* defines the procedure names involved in the load balancing solution.
Expand Down Expand Up @@ -60,4 +62,9 @@ public String[] fullyQualifiedProcedureName()
fullyQualifiedProcedureName[nameSpace.length] = name;
return fullyQualifiedProcedureName;
}

public String callName()
{
return join( ".", nameSpace ) + "." + name;
}
}
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.causalclustering.load_balancing.procedure;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -29,9 +31,8 @@
import org.neo4j.causalclustering.load_balancing.Endpoint;
import org.neo4j.causalclustering.load_balancing.LoadBalancingResult;
import org.neo4j.causalclustering.load_balancing.Role;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.helpers.SocketAddress;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
Expand All @@ -42,16 +43,16 @@
/**
* The result format of GetServersV1 and GetServersV2 procedures.
*/
class ResultFormatV1
public class ResultFormatV1
{
private static final String ROLE_KEY = "role";
private static final String ADDRESSES_KEY = "addresses";

static RawIterator<Object[],ProcedureException> build( LoadBalancingResult result )
static Object[] build( LoadBalancingResult result )
{
Object[] routers = result.routeEndpoints().stream().map( Endpoint::address ).toArray();
Object[] readers = result.readEndpoints().stream().map( Endpoint::address ).toArray();
Object[] writers = result.writeEndpoints().stream().map( Endpoint::address ).toArray();
Object[] routers = result.routeEndpoints().stream().map( Endpoint::address ).map( SocketAddress::toString ).toArray();
Object[] readers = result.readEndpoints().stream().map( Endpoint::address ).map( SocketAddress::toString ).toArray();
Object[] writers = result.writeEndpoints().stream().map( Endpoint::address ).map( SocketAddress::toString ).toArray();

List<Map<String,Object>> servers = new ArrayList<>();

Expand Down Expand Up @@ -86,20 +87,16 @@ static RawIterator<Object[],ProcedureException> build( LoadBalancingResult resul
}

long timeToLiveSeconds = MILLISECONDS.toSeconds( result.getTimeToLiveMillis() );
Object[] row = new Object[]{timeToLiveSeconds, servers};
return RawIterator.<Object[],ProcedureException>of( row );
return new Object[]{timeToLiveSeconds, servers};
}

static LoadBalancingResult parse( RawIterator<Object[],ProcedureException> records ) throws ProcedureException
public static LoadBalancingResult parse( Object[] record )
{
Object[] record = records.next();

long timeToLiveSeconds = (long) record[0];
@SuppressWarnings( "unchecked" )
List<Map<String,Object>> endpointData = (List<Map<String,Object>>) record[1];

Map<Role,List<Endpoint>> endpoints = parse( endpointData );
assert !records.hasNext();

return new LoadBalancingResult(
endpoints.get( ROUTE ),
Expand All @@ -108,6 +105,14 @@ static LoadBalancingResult parse( RawIterator<Object[],ProcedureException> recor
timeToLiveSeconds * 1000 );
}

public static LoadBalancingResult parse( Map<String,Object> record )
{
return parse( new Object[]{
record.get( ParameterNames.TTL.parameterName() ),
record.get( ParameterNames.SERVERS.parameterName() )
} );
}

private static Map<Role,List<Endpoint>> parse( List<Map<String,Object>> result )
{
Map<Role,List<Endpoint>> endpoints = new HashMap<>();
Expand All @@ -117,6 +122,9 @@ private static Map<Role,List<Endpoint>> parse( List<Map<String,Object>> result )
List<Endpoint> addresses = parse( (Object[]) single.get( "addresses" ), role );
endpoints.put( role, addresses );
}

Arrays.stream( Role.values() ).forEach( r -> endpoints.putIfAbsent( r, Collections.emptyList() ) );

return endpoints;
}

Expand Down
Expand Up @@ -50,19 +50,19 @@ public class ReadReplica implements ClusterMember
private final DiscoveryServiceFactory discoveryServiceFactory;
private final File neo4jHome;
private final File storeDir;
private final int memberId;
private final int serverId;
private final String boltAdvertisedAddress;
private ReadReplicaGraphDatabase database;
private Monitors monitors;

public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory,
public ReadReplica( File parentDir, int serverId, DiscoveryServiceFactory discoveryServiceFactory,
List<AdvertisedSocketAddress> coreMemberHazelcastAddresses, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat, Monitors monitors )
{
this.memberId = memberId;
int boltPort = 9000 + memberId;
int httpPort = 11000 + memberId;
int txPort = 20000 + memberId;
this.serverId = serverId;
int boltPort = 9000 + serverId;
int httpPort = 11000 + serverId;
int txPort = 20000 + serverId;

String initialHosts = coreMemberHazelcastAddresses.stream().map( AdvertisedSocketAddress::toString )
.collect( joining( "," ) );
Expand All @@ -77,7 +77,7 @@ public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discov

for ( Map.Entry<String,IntFunction<String>> entry : instanceExtraParams.entrySet() )
{
config.put( entry.getKey(), entry.getValue().apply( memberId ) );
config.put( entry.getKey(), entry.getValue().apply( serverId ) );
}

config.put( new BoltConnector( "bolt" ).type.name(), "BOLT" );
Expand All @@ -90,7 +90,7 @@ public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discov
config.put( new HttpConnector( "http", Encryption.NONE ).listen_address.name(), "127.0.0.1:" + httpPort );
config.put( new HttpConnector( "http", Encryption.NONE ).advertised_address.name(), "127.0.0.1:" + httpPort );

this.neo4jHome = new File( parentDir, "read-replica-" + memberId );
this.neo4jHome = new File( parentDir, "read-replica-" + serverId );
config.put( GraphDatabaseSettings.neo4j_home.name(), neo4jHome.getAbsolutePath() );

config.put( CausalClusteringSettings.transaction_listen_address.name(), "127.0.0.1:" + txPort );
Expand Down Expand Up @@ -155,7 +155,7 @@ public File storeDir()

public String toString()
{
return format( "ReadReplica{memberId=%d}", memberId );
return format( "ReadReplica{serverId=%d}", serverId );
}

public String directURI()
Expand All @@ -175,6 +175,11 @@ public void setUpstreamDatabaseSelectionStrategy( String key )

public Optional<MemberId> memberId()
{
return Optional.of( new MemberId( new UUID( memberId, 0 ) ) );
return Optional.of( new MemberId( new UUID( serverId, 0 ) ) );
}

public int serverId()
{
return serverId;
}
}
Expand Up @@ -25,15 +25,12 @@

import org.neo4j.causalclustering.load_balancing.Endpoint;
import org.neo4j.causalclustering.load_balancing.LoadBalancingResult;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.api.exceptions.ProcedureException;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ResultFormatV1Test
{
Expand All @@ -58,13 +55,31 @@ public void shouldSerializeToAndFromRecordFormat() throws Exception
LoadBalancingResult original = new LoadBalancingResult( routers, writers, readers, ttlSeconds * 1000 );

// when
RawIterator<Object[],ProcedureException> records = ResultFormatV1.build( original );
Object[] record = ResultFormatV1.build( original );

// then
assertTrue( records.hasNext() );
LoadBalancingResult parsed = ResultFormatV1.parse( records );
LoadBalancingResult parsed = ResultFormatV1.parse( record );

assertEquals( original, parsed );
}

@Test
public void shouldSerializeToAndFromRecordFormatWithNoEntries() throws Exception
{
// given
List<Endpoint> writers = emptyList();
List<Endpoint> readers = emptyList();
List<Endpoint> routers = emptyList();

long ttlSeconds = 0;
LoadBalancingResult original = new LoadBalancingResult( routers, writers, readers, ttlSeconds * 1000 );

// when
Object[] record = ResultFormatV1.build( original );

// then
LoadBalancingResult parsed = ResultFormatV1.parse( record );

assertFalse( records.hasNext() );
assertEquals( original, parsed );
}
}

0 comments on commit 599922f

Please sign in to comment.