Skip to content

Commit

Permalink
introduce filters and tags for server policy load balancing
Browse files Browse the repository at this point in the history
This is the first step in building up the server-side policy based
load balancing strategy. The idea is that servers are tagged
with one or more tags and these are used in a server-side configured
scheme of filters to select those which match. Different policies can
select different sets of tags and there can be other filters attached
apart from tag-based filters.

Every policy has a name which the client can use to bind to a particular
policy by supplying that name in the client supplied context, as supported
by the V2 version of GetServers.
  • Loading branch information
martinfurmanski committed Feb 14, 2017
1 parent 39b77e7 commit 353fa2e
Show file tree
Hide file tree
Showing 41 changed files with 1,336 additions and 237 deletions.
Expand Up @@ -313,4 +313,17 @@ public class CausalClusteringSettings implements LoadableConfig
"upstream database server from which to pull transactional updates." ) "upstream database server from which to pull transactional updates." )
public static final Setting<List<String>> upstream_selection_strategy = public static final Setting<List<String>> upstream_selection_strategy =
setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" ); setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" );

@Description( "The load balancing plugin to use. This must be the same for all core servers participating in the cluster." )
public static Setting<String> load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

@Description( "The configuration must be valid for the configured plugin." )
public static Setting<String> load_balancing_config =
setting( "causal_clustering.load_balancing.config", STRING, "" );

@Description( "Tags for the server used when configuring load balancing and replication policies." +
" Multiple tags can be configured by separating with a comma." )
public static Setting<List<String>> server_tags =
setting( "causal_clustering.server_tags", list( ",", STRING ), "" );
} }
Expand Up @@ -36,20 +36,9 @@ public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readRepli


public Optional<CatchupServerAddress> find( MemberId upstream ) public Optional<CatchupServerAddress> find( MemberId upstream )
{ {
Optional<CoreAddresses> coreAddresses = coreTopology.find( upstream ); Optional<CatchupServerAddress> coreCatchupAddress = coreTopology.find( upstream ).map( a -> (CatchupServerAddress) a );
Optional<ReadReplicaAddresses> readReplicaAddresses = readReplicaTopology.findAddressFor( upstream ); Optional<CatchupServerAddress> readCatchupAddress = readReplicaTopology.find( upstream ).map( a -> (CatchupServerAddress) a );


if ( coreAddresses.isPresent() ) return coreCatchupAddress.map( Optional::of ).orElse( readCatchupAddress );
{
return Optional.of( coreAddresses.get() );
}
else if ( readReplicaAddresses.isPresent() )
{
return Optional.of( readReplicaAddresses.get() );
}
else
{
return Optional.empty();
}
} }
} }
Expand Up @@ -19,20 +19,32 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import java.util.Set;

import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;


public class CoreAddresses implements CatchupServerAddress, ClientConnector import static java.util.Collections.emptySet;

public class CoreServerInfo implements CatchupServerAddress, ClientConnector, TaggedServer
{ {
private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress raftServer;
private final AdvertisedSocketAddress catchupServer; private final AdvertisedSocketAddress catchupServer;
private final ClientConnectorAddresses clientConnectorAddresses; private final ClientConnectorAddresses clientConnectorAddresses;
private Set<String> tags;

public CoreServerInfo( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer,
ClientConnectorAddresses clientConnectors )
{
this( raftServer, catchupServer, clientConnectors, emptySet() );
}


public CoreAddresses( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer, public CoreServerInfo( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer,
ClientConnectorAddresses clientConnectorAddresses ) ClientConnectorAddresses clientConnectorAddresses, Set<String> tags )
{ {
this.raftServer = raftServer; this.raftServer = raftServer;
this.catchupServer = catchupServer; this.catchupServer = catchupServer;
this.clientConnectorAddresses = clientConnectorAddresses; this.clientConnectorAddresses = clientConnectorAddresses;
this.tags = tags;
} }


public AdvertisedSocketAddress getRaftServer() public AdvertisedSocketAddress getRaftServer()
Expand All @@ -46,18 +58,26 @@ public AdvertisedSocketAddress getCatchupServer()
return catchupServer; return catchupServer;
} }


@Override
public ClientConnectorAddresses connectors() public ClientConnectorAddresses connectors()
{ {
return clientConnectorAddresses; return clientConnectorAddresses;
} }


@Override
public Set<String> tags()
{
return tags;
}

@Override @Override
public String toString() public String toString()
{ {
return "CoreAddresses{" + return "CoreServerInfo{" +
"raftServer=" + raftServer + "raftServer=" + raftServer +
", catchupServer=" + catchupServer + ", catchupServer=" + catchupServer +
", clientConnectorAddresses=" + clientConnectorAddresses + ", clientConnectorAddresses=" + clientConnectorAddresses +
'}'; ", tags=" + tags +
'}';
} }
} }
Expand Up @@ -38,9 +38,9 @@ public class CoreTopology


private final ClusterId clusterId; private final ClusterId clusterId;
private final boolean canBeBootstrapped; private final boolean canBeBootstrapped;
private final Map<MemberId,CoreAddresses> coreMembers; private final Map<MemberId,CoreServerInfo> coreMembers;


public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberId,CoreAddresses> coreMembers ) public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberId,CoreServerInfo> coreMembers )
{ {
this.clusterId = clusterId; this.clusterId = clusterId;
this.canBeBootstrapped = canBeBootstrapped; this.canBeBootstrapped = canBeBootstrapped;
Expand All @@ -57,7 +57,7 @@ public ClusterId clusterId()
return clusterId; return clusterId;
} }


public Collection<CoreAddresses> addresses() public Collection<CoreServerInfo> allMemberInfo()
{ {
return coreMembers.values(); return coreMembers.values();
} }
Expand All @@ -67,7 +67,7 @@ public boolean canBeBootstrapped()
return canBeBootstrapped; return canBeBootstrapped;
} }


public Optional<CoreAddresses> find( MemberId memberId ) public Optional<CoreServerInfo> find( MemberId memberId )
{ {
return Optional.ofNullable( coreMembers.get( memberId ) ); return Optional.ofNullable( coreMembers.get( memberId ) );
} }
Expand Down Expand Up @@ -139,18 +139,18 @@ public String toString()
private class Difference private class Difference
{ {
private MemberId memberId; private MemberId memberId;
private CoreAddresses coreAddresses; private CoreServerInfo coreServerInfo;


Difference( MemberId memberId, CoreAddresses coreAddresses ) Difference( MemberId memberId, CoreServerInfo coreServerInfo )
{ {
this.memberId = memberId; this.memberId = memberId;
this.coreAddresses = coreAddresses; this.coreServerInfo = coreServerInfo;
} }


@Override @Override
public String toString() public String toString()
{ {
return String.format( "{memberId=%s, coreAddresses=%s}", memberId, coreAddresses ); return String.format( "{memberId=%s, coreServerInfo=%s}", memberId, coreServerInfo );
} }
} }
} }
Expand Up @@ -21,7 +21,9 @@


import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MultiMap;


import java.util.List;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand All @@ -37,6 +39,9 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;


class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologyService class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologyService
{ {
Expand All @@ -46,6 +51,7 @@ class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologySer
private final HazelcastConnector connector; private final HazelcastConnector connector;
private final RenewableTimeoutService renewableTimeoutService; private final RenewableTimeoutService renewableTimeoutService;
private final AdvertisedSocketAddress transactionSource; private final AdvertisedSocketAddress transactionSource;
private final List<String> tags;
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;
private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer; private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer;
private final long readReplicaTimeToLiveTimeout; private final long readReplicaTimeToLiveTimeout;
Expand All @@ -62,6 +68,7 @@ class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologySer
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config );
this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address ); this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address );
this.tags = config.get( CausalClusteringSettings.server_tags );
this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout; this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout;
this.myself = myself; this.myself = myself;
} }
Expand All @@ -71,7 +78,7 @@ public CoreTopology coreServers()
{ {
try try
{ {
return retry( ( hazelcastInstance ) -> HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ) ); return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, log ) );
} }
catch ( Exception e ) catch ( Exception e )
{ {
Expand All @@ -87,8 +94,7 @@ public ReadReplicaTopology readReplicas()
{ {
try try
{ {
return retry( ( hazelcastInstance ) -> HazelcastClusterTopology return retry( ( hazelcastInstance ) -> getReadReplicaTopology( hazelcastInstance, log ) );
.getReadReplicaTopology( hazelcastInstance, log ) );
} }
catch ( Exception e ) catch ( Exception e )
{ {
Expand Down Expand Up @@ -132,6 +138,9 @@ private Void addReadReplica( HazelcastInstance hazelcastInstance )
hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME )
.put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); .put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS );


MultiMap<String,String> tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );
tags.forEach( tag -> tagsMap.put( uuid, tag ) );

return null; // return value not used. return null; // return value not used.
} }


Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IAtomicReference;
import com.hazelcast.core.IMap; import com.hazelcast.core.IMap;
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import com.hazelcast.core.MultiMap;


import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
Expand All @@ -35,30 +36,32 @@
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toSet;
import static org.neo4j.helpers.SocketAddressFormat.socketAddress; import static org.neo4j.helpers.SocketAddressFormat.socketAddress;
import static org.neo4j.helpers.collection.Iterables.asSet;


class HazelcastClusterTopology class HazelcastClusterTopology
{ {
// hz client uuid string -> boltAddress string // per server attributes
static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read-replicas"; private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used
static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers";
static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids";
private static final String CLUSTER_UUID = "cluster_uuid";
static final String MEMBER_UUID = "member_uuid"; static final String MEMBER_UUID = "member_uuid";
static final String TRANSACTION_SERVER = "transaction_server"; static final String TRANSACTION_SERVER = "transaction_server";
private static final String DISCOVERY_SERVER = "discovery_server";
static final String RAFT_SERVER = "raft_server"; static final String RAFT_SERVER = "raft_server";
static final String CLIENT_CONNECTOR_ADDRESSES = "client_connector_addresses"; static final String CLIENT_CONNECTOR_ADDRESSES = "client_connector_addresses";


// cluster-wide attributes
private static final String CLUSTER_UUID = "cluster_uuid";
static final String SERVER_TAGS_MULTIMAP_NAME = "tags";
static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas"; // hz client uuid string -> boltAddress string
static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers";
static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids";

static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log ) static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log )
{ {
Map<MemberId,ReadReplicaAddresses> readReplicas = emptyMap(); Map<MemberId,ReadReplicaInfo> readReplicas = emptyMap();


if ( hazelcastInstance != null ) if ( hazelcastInstance != null )
{ {
Expand All @@ -74,7 +77,7 @@ static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastIn


static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log log ) static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log log )
{ {
Map<MemberId,CoreAddresses> coreMembers = emptyMap(); Map<MemberId,CoreServerInfo> coreMembers = emptyMap();
boolean canBeBootstrapped = false; boolean canBeBootstrapped = false;
ClusterId clusterId = null; ClusterId clusterId = null;


Expand All @@ -83,7 +86,7 @@ static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log lo
Set<Member> hzMembers = hazelcastInstance.getCluster().getMembers(); Set<Member> hzMembers = hazelcastInstance.getCluster().getMembers();
canBeBootstrapped = canBeBootstrapped( hzMembers ); canBeBootstrapped = canBeBootstrapped( hzMembers );


coreMembers = toCoreMemberMap( hzMembers, log ); coreMembers = toCoreMemberMap( hzMembers, log, hazelcastInstance );


clusterId = getClusterId( hazelcastInstance ); clusterId = getClusterId( hazelcastInstance );
} }
Expand All @@ -108,24 +111,24 @@ static boolean casClusterId( HazelcastInstance hazelcastInstance, ClusterId clus
return uuidReference.compareAndSet( null, clusterId.uuid() ) || uuidReference.get().equals( clusterId.uuid() ); return uuidReference.compareAndSet( null, clusterId.uuid() ) || uuidReference.get().equals( clusterId.uuid() );
} }


private static Map<MemberId,ReadReplicaAddresses> readReplicas( HazelcastInstance hazelcastInstance ) private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastInstance )
{ {
IMap<String/*uuid*/,String/*boltAddress*/> clientAddressMap = IMap<String/*uuid*/,String/*boltAddress*/> clientAddressMap =
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ); hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME );


IMap<String,String> txServerMap = hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ); IMap<String,String> txServerMap = hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME );

IMap<String,String> memberIdMap = hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ); IMap<String,String> memberIdMap = hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME );
MultiMap<String,String> serverTags = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );


Map<MemberId, ReadReplicaAddresses> result = new HashMap<>( ); Map<MemberId,ReadReplicaInfo> result = new HashMap<>();


for ( String hzUUID : clientAddressMap.keySet() ) for ( String hzUUID : clientAddressMap.keySet() )
{ {
ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.fromString( clientAddressMap.get( hzUUID ) ); ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.fromString( clientAddressMap.get( hzUUID ) );
AdvertisedSocketAddress catchupAddress = socketAddress( txServerMap.get( hzUUID ), AdvertisedSocketAddress::new ); AdvertisedSocketAddress catchupAddress = socketAddress( txServerMap.get( hzUUID ), AdvertisedSocketAddress::new );


result.put( new MemberId( UUID.fromString( memberIdMap.get( hzUUID ) ) ), result.put( new MemberId( UUID.fromString( memberIdMap.get( hzUUID ) ) ),
new ReadReplicaAddresses( clientConnectorAddresses, catchupAddress ) ) ; new ReadReplicaInfo( clientConnectorAddresses, catchupAddress, asSet( serverTags.get( hzUUID ) ) ) );
} }
return result; return result;
} }
Expand All @@ -136,36 +139,24 @@ private static boolean canBeBootstrapped( Set<Member> coreMembers )
return iterator.hasNext() && iterator.next().localMember(); return iterator.hasNext() && iterator.next().localMember();
} }


static Map<MemberId,CoreAddresses> toCoreMemberMap( Set<Member> members, Log log ) static Map<MemberId,CoreServerInfo> toCoreMemberMap( Set<Member> members, Log log, HazelcastInstance hazelcastInstance )
{ {
Map<MemberId,CoreAddresses> coreMembers = new HashMap<>(); Map<MemberId,CoreServerInfo> coreMembers = new HashMap<>();
MultiMap<String,String> serverTagsMMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );


for ( Member member : members ) for ( Member member : members )
{ {
try try
{ {
Pair<MemberId,CoreAddresses> pair = extractMemberAttributesForCore( member ); MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) );
coreMembers.put( pair.first(), pair.other() );
}
catch ( IllegalArgumentException e )
{
log.warn( "Incomplete member attributes supplied from Hazelcast", e );
}
}


return coreMembers; CoreServerInfo coreServerInfo = new CoreServerInfo(
} socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ),

socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ),
static Map<MemberId,CoreAddresses> toReadReplicaMemberMap( Set<Member> members, Log log ) ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ),
{ asSet( serverTagsMMap.get( memberId.getUuid().toString() ) ) );
Map<MemberId,CoreAddresses> coreMembers = new HashMap<>();


for ( Member member : members ) coreMembers.put( memberId, coreServerInfo );
{
try
{
Pair<MemberId,CoreAddresses> pair = extractMemberAttributesForCore( member );
coreMembers.put( pair.first(), pair.other() );
} }
catch ( IllegalArgumentException e ) catch ( IllegalArgumentException e )
{ {
Expand Down Expand Up @@ -193,16 +184,7 @@ static MemberAttributeConfig buildMemberAttributesForCore( MemberId myself, Conf


ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config );
memberAttributeConfig.setStringAttribute( CLIENT_CONNECTOR_ADDRESSES, clientConnectorAddresses.toString() ); memberAttributeConfig.setStringAttribute( CLIENT_CONNECTOR_ADDRESSES, clientConnectorAddresses.toString() );
return memberAttributeConfig;
}


static Pair<MemberId,CoreAddresses> extractMemberAttributesForCore( Member member ) return memberAttributeConfig;
{
MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) );

return Pair.of( memberId, new CoreAddresses(
socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ),
socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ),
ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) ) );
} }
} }

0 comments on commit 353fa2e

Please sign in to comment.