Skip to content

Commit

Permalink
User-defined Upstream Selecton Strategy
Browse files Browse the repository at this point in the history
Uses the same DSL as the load balancer to allow users to describe their
upstream preferences.
  • Loading branch information
jimwebber authored and Mark Needham committed Mar 14, 2017
1 parent 723c471 commit 07acd85
Show file tree
Hide file tree
Showing 16 changed files with 274 additions and 63 deletions.
Expand Up @@ -317,6 +317,9 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<List<String>> upstream_selection_strategy =
setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" );

public static final Setting<String> user_defined_upstream_selection_strategy =
setting( "causal_clustering.user_defined_upstream_strategy", STRING, "" );

@Description( "Tags for the server used when configuring load balancing and replication policies." )
public static final Setting<List<String>> server_tags =
setting( "causal_clustering.server_tags", list( ",", STRING ), "" );
Expand Down
Expand Up @@ -33,7 +33,7 @@
import static java.lang.String.format;
import static java.util.Collections.singletonList;

class FilterConfigParser
public class FilterConfigParser
{
private static Filter<ServerInfo> filterFor( String filterName, String[] args ) throws InvalidFilterSpecification
{
Expand Down Expand Up @@ -84,7 +84,7 @@ private static Filter<ServerInfo> filterFor( String filterName, String[] args )
}
}

static Filter<ServerInfo> parse( String filterConfig ) throws InvalidFilterSpecification
public static Filter<ServerInfo> parse( String filterConfig ) throws InvalidFilterSpecification
{
if ( filterConfig.length() == 0 )
{
Expand Down
Expand Up @@ -22,22 +22,30 @@
import java.util.Objects;
import java.util.Set;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;

/**
* Hold the server information that is interesting for load balancing purposes.
*/
class ServerInfo
public class ServerInfo
{
private final AdvertisedSocketAddress boltAddress;
private MemberId memberId;
private Set<String> tags;

ServerInfo( AdvertisedSocketAddress boltAddress, Set<String> tags )
public ServerInfo( AdvertisedSocketAddress boltAddress, MemberId memberId, Set<String> tags )
{
this.boltAddress = boltAddress;
this.memberId = memberId;
this.tags = tags;
}

public MemberId memberId()
{
return memberId;
}

AdvertisedSocketAddress boltAddress()
{
return boltAddress;
Expand All @@ -56,22 +64,19 @@ public boolean equals( Object o )
if ( o == null || getClass() != o.getClass() )
{ return false; }
ServerInfo that = (ServerInfo) o;
return Objects.equals( boltAddress, that.boltAddress ) &&
return Objects.equals( boltAddress, that.boltAddress ) && Objects.equals( memberId, that.memberId ) &&
Objects.equals( tags, that.tags );
}

@Override
public int hashCode()
{
return Objects.hash( boltAddress, tags );
return Objects.hash( boltAddress, memberId, tags );
}

@Override
public String toString()
{
return "ServerInfo{" +
"boltAddress=" + boltAddress +
", tags=" + tags +
'}';
return "ServerInfo{" + "boltAddress=" + boltAddress + ", memberId=" + memberId + ", tags=" + tags + '}';
}
}
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.discovery.CoreServerInfo;
import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.ReadReplicaTopology;
import org.neo4j.causalclustering.discovery.TopologyService;
Expand Down Expand Up @@ -130,8 +131,9 @@ private List<Endpoint> writeEndpoints( CoreTopology cores )

private List<Endpoint> readEndpoints( CoreTopology coreTopology, ReadReplicaTopology rrTopology, Policy policy )
{
Set<ServerInfo> possibleReaders = rrTopology.allMemberInfo().stream()
.map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) )
Set<ServerInfo> possibleReaders = rrTopology.members().entrySet().stream()
.map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(),
entry.getValue().tags() ) )
.collect( Collectors.toSet() );

if ( allowReadsOnFollowers || possibleReaders.size() == 0 )
Expand All @@ -147,9 +149,16 @@ private List<Endpoint> readEndpoints( CoreTopology coreTopology, ReadReplicaTopo
// we might end up using the leader for reading during this ttl, should be fine in general
}

possibleReaders.addAll( validCores.stream().map( coreTopology::find ).map( Optional::get )
.map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) )
.collect( Collectors.toSet() ) );
for ( MemberId validCore : validCores )
{
Optional<CoreServerInfo> coreServerInfo = coreTopology.find( validCore );
if ( coreServerInfo.isPresent() )
{
CoreServerInfo serverInfo = coreServerInfo.get();
possibleReaders.add(
new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.tags() ) );
}
}
}

Set<ServerInfo> readers = policy.apply( possibleReaders );
Expand Down
Expand Up @@ -28,11 +28,11 @@
import org.neo4j.helpers.Service;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class ConnectToRandomCoreServer extends UpstreamDatabaseSelectionStrategy
public class ConnectToRandomCoreServerStrategy extends UpstreamDatabaseSelectionStrategy
{
private final Random random = new Random();

public ConnectToRandomCoreServer()
public ConnectToRandomCoreServerStrategy()
{
super( "connect-to-random-core-server" );
}
Expand Down
Expand Up @@ -31,11 +31,11 @@
import org.neo4j.helpers.Service;

@Service.Implementation(UpstreamDatabaseSelectionStrategy.class)
public class ConnectWithinDataCenter extends UpstreamDatabaseSelectionStrategy
public class ConnectWithinDataCenterStrategy extends UpstreamDatabaseSelectionStrategy
{
private Random random = new Random();;
private Random random = new Random();

public ConnectWithinDataCenter()
public ConnectWithinDataCenterStrategy()
{
super( "connect-within-data-center" );
}
Expand Down
Expand Up @@ -248,7 +248,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
new StoreCopyProcess( fileSystem, pageCache, localDatabase, copiedStoreRecovery, remoteStore,
logProvider );

ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer();
ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy();
defaultStrategy.setTopologyService( topologyService );
defaultStrategy.setConfig( config );
defaultStrategy.setMyself( myself );
Expand Down
Expand Up @@ -25,11 +25,11 @@
import org.neo4j.helpers.Service;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class TypicallyConnectToRandomReadReplica extends UpstreamDatabaseSelectionStrategy
public class TypicallyConnectToRandomReadReplicaStrategy extends UpstreamDatabaseSelectionStrategy
{
private final ModuloCounter counter = new ModuloCounter( 10 );

public TypicallyConnectToRandomReadReplica()
public TypicallyConnectToRandomReadReplicaStrategy()
{
super( "typically-connect-to-random-read-replica" );
}
Expand Down
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.readreplica;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.CoreServerInfo;
import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.filters.Filter;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilterConfigParser;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.InvalidFilterSpecification;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.ServerInfo;
import org.neo4j.helpers.Service;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class UserDefinedConfigurationStrategy extends UpstreamDatabaseSelectionStrategy
{
public UserDefinedConfigurationStrategy()
{
super( "user-defined" );
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
try
{
Filter<ServerInfo> filters = FilterConfigParser
.parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) );

Set<ServerInfo> possibleReaders = topologyService.readReplicas().members().entrySet().stream()
.map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(),
entry.getValue().tags() ) ).collect( Collectors.toSet() );

CoreTopology coreTopology = topologyService.coreServers();
for ( MemberId validCore : coreTopology.members().keySet() )
{
Optional<CoreServerInfo> coreServerInfo = coreTopology.find( validCore );
if ( coreServerInfo.isPresent() )
{
CoreServerInfo serverInfo = coreServerInfo.get();
possibleReaders.add(
new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.tags() ) );
}
}

return filters.apply( possibleReaders ).stream().map( ServerInfo::memberId ).findAny();
}
catch ( InvalidFilterSpecification invalidFilterSpecification )
{
return Optional.empty();
}
}
}
@@ -1,2 +1,2 @@
org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer
org.neo4j.causalclustering.readreplica.ConnectWithinDataCenter
org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategy
org.neo4j.causalclustering.readreplica.ConnectWithinDataCenterStrategy
Expand Up @@ -23,7 +23,9 @@

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;

import static org.junit.Assert.assertEquals;
Expand All @@ -37,14 +39,14 @@ public void shouldReturnServersMatchingAnyTag() throws Exception
// given
AnyTagFilter tagFilter = new AnyTagFilter( asSet( "china-west", "europe" ) );

ServerInfo serverA = new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "china-west" ) );
ServerInfo serverB = new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "europe" ) );
ServerInfo serverC = new ServerInfo( new AdvertisedSocketAddress( "bolt", 3 ), asSet( "china", "china-west" ) );
ServerInfo serverD = new ServerInfo( new AdvertisedSocketAddress( "bolt", 4 ), asSet( "china-west", "china" ) );
ServerInfo serverE = new ServerInfo( new AdvertisedSocketAddress( "bolt", 5 ), asSet( "china-east", "asia" ) );
ServerInfo serverF = new ServerInfo( new AdvertisedSocketAddress( "bolt", 6 ), asSet( "europe-west" ) );
ServerInfo serverG = new ServerInfo( new AdvertisedSocketAddress( "bolt", 7 ), asSet( "china-west", "europe" ) );
ServerInfo serverH = new ServerInfo( new AdvertisedSocketAddress( "bolt", 8 ), asSet( "africa" ) );
ServerInfo serverA = new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "china-west" ) );
ServerInfo serverB = new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "europe" ) );
ServerInfo serverC = new ServerInfo( new AdvertisedSocketAddress( "bolt", 3 ), new MemberId( UUID.randomUUID() ), asSet( "china", "china-west" ) );
ServerInfo serverD = new ServerInfo( new AdvertisedSocketAddress( "bolt", 4 ), new MemberId( UUID.randomUUID() ), asSet( "china-west", "china" ) );
ServerInfo serverE = new ServerInfo( new AdvertisedSocketAddress( "bolt", 5 ), new MemberId( UUID.randomUUID() ), asSet( "china-east", "asia" ) );
ServerInfo serverF = new ServerInfo( new AdvertisedSocketAddress( "bolt", 6 ), new MemberId( UUID.randomUUID() ), asSet( "europe-west" ) );
ServerInfo serverG = new ServerInfo( new AdvertisedSocketAddress( "bolt", 7 ), new MemberId( UUID.randomUUID() ), asSet( "china-west", "europe" ) );
ServerInfo serverH = new ServerInfo( new AdvertisedSocketAddress( "bolt", 8 ), new MemberId( UUID.randomUUID() ), asSet( "africa" ) );

Set<ServerInfo> data = asSet( serverA, serverB, serverC, serverD, serverE, serverF, serverG, serverH );

Expand Down
Expand Up @@ -22,7 +22,9 @@
import org.junit.Test;

import java.util.Set;
import java.util.UUID;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.Log;

Expand All @@ -46,8 +48,8 @@ public void shouldSupplyDefaultUnfilteredPolicyForEmptyContext() throws Exceptio
// when
Policy policy = policies.selectFor( emptyMap() );
Set<ServerInfo> input = asSet(
new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "tagA" ) ),
new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "tagB" ) )
new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "tagA" ) ),
new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "tagB" ) )
);

Set<ServerInfo> output = policy.apply( input );
Expand All @@ -66,8 +68,8 @@ public void shouldSupplyDefaultUnfilteredPolicyForUnknownPolicyName() throws Exc
// when
Policy policy = policies.selectFor( stringMap( Policies.POLICY_KEY, "unknown-policy" ) );
Set<ServerInfo> input = asSet(
new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "tagA" ) ),
new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "tagB" ) )
new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "tagA" ) ),
new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "tagB" ) )
);

Set<ServerInfo> output = policy.apply( input );
Expand Down
Expand Up @@ -41,8 +41,9 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterators.asSet;

public class ConnectToRandomCoreServerTest
public class ConnectToRandomCoreServerStrategyTest
{
@Test
public void shouldConnectToRandomCoreServer() throws Exception
Expand All @@ -56,7 +57,7 @@ public void shouldConnectToRandomCoreServer() throws Exception
when( topologyService.coreServers() )
.thenReturn( fakeCoreTopology( memberId1, memberId2, memberId3 ) );

ConnectToRandomCoreServer connectionStrategy = new ConnectToRandomCoreServer();
ConnectToRandomCoreServerStrategy connectionStrategy = new ConnectToRandomCoreServerStrategy();
connectionStrategy.setTopologyService( topologyService );

// when
Expand All @@ -81,7 +82,7 @@ static CoreTopology fakeCoreTopology( MemberId... memberIds )
coreMembers.put( memberId, new CoreServerInfo( new AdvertisedSocketAddress( "localhost", 5000 + offset ),
new AdvertisedSocketAddress( "localhost", 6000 + offset ), new ClientConnectorAddresses(
singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ) ) );
new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ), asSet( "core" ) ) );

offset++;
}
Expand Down

0 comments on commit 07acd85

Please sign in to comment.