Skip to content

Commit

Permalink
Introduce ConnectRandomlyToServerGroupStrategy
Browse files Browse the repository at this point in the history
Shares logic with ConnectRandomlyWithinServerGroupStrategy.

Replace dependency setters on UpstreamDatabaseSelectionStrategy with a
single method that calls an init() method after setting fields.
This makes null fields explicit in calling code, and enables
post-injection set up.

Use java-8-matchers for test assertions on Optionals
  • Loading branch information
andrewkerr9000 committed Aug 24, 2017
1 parent a77d570 commit ef074fb
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 177 deletions.
8 changes: 8 additions & 0 deletions enterprise/causal-clustering/pom.xml
Expand Up @@ -192,6 +192,14 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>co.unruly</groupId>
<artifactId>java-8-matchers</artifactId>
<version>1.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
Expand Up @@ -334,16 +334,22 @@ public enum DiscoveryType
setting( "causal_clustering.cluster_topology_refresh", DURATION, "5s", min( Duration.ofSeconds( 1 ) ) );

@Description( "An ordered list in descending preference of the strategy which read replicas use to choose " +
"upstream database server from which to pull transactional updates." )
"the upstream server from which to pull transactional updates." )
public static final Setting<List<String>> upstream_selection_strategy =
setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" );

@Description( "If the operator specifies the user-defined upstream database selection strategy in " +
"causal_clustering.upstream_selection_strategy then the configuration of the upstream dependencies is " +
"written here." )
@Description( "Configuration of a user-defined upstream selection strategy. " +
"The user-defined strategy is used if the list of strategies (`causal_clustering.upstream_selection_strategy`) " +
"includes the value `user_defined`. " )
public static final Setting<String> user_defined_upstream_selection_strategy =
setting( "causal_clustering.user_defined_upstream_strategy", STRING, "" );

@Description( "Comma separated list of groups to be used by the connect-randomly-to-server-group selection strategy. " +
"The connect-randomly-to-server-group strategy is used if the list of strategies (`causal_clustering.upstream_selection_strategy`) " +
"includes the value `connect-randomly-to-server-group`. " )
public static final Setting<List<String>> connect_randomly_to_server_group_strategy =
setting( "causal_clustering.connect-randomly-to-server-group", list( ",", STRING ), "" );

@Description( "A list of group names for the server used when configuring load balancing and replication policies." )
public static final Setting<List<String>> server_groups =
setting( "causal_clustering.server_groups", list( ",", STRING ), "" );
Expand Down
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.readreplica;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;

class ConnectRandomlyToServerGroupImpl
{
private final List<String> groups;
private final TopologyService topologyService;
private final MemberId myself;
private final Random random = new Random();

ConnectRandomlyToServerGroupImpl( List<String> groups, TopologyService topologyService, MemberId myself )
{
this.groups = groups;
this.topologyService = topologyService;
this.myself = myself;
}

public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
Map<MemberId, ReadReplicaInfo> replicas = topologyService.readReplicas().members();

List<MemberId> choices = groups.stream()
.flatMap( group -> replicas.entrySet().stream().filter( isMyGroupAndNotMe( group ) ) )
.map( Map.Entry::getKey )
.collect( Collectors.toList() );

if ( choices.isEmpty() )
{
return Optional.empty();
}
else
{
return Optional.of( choices.get( random.nextInt( choices.size() ) ) );
}
}

private Predicate<Map.Entry<MemberId,ReadReplicaInfo>> isMyGroupAndNotMe( String group )
{
return entry -> entry.getValue().groups().contains( group ) && !entry.getKey().equals( myself );
}
}
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.readreplica;

import java.util.List;
import java.util.Optional;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;

// TODO deprecation warning
@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class ConnectRandomlyToServerGroupStrategy extends UpstreamDatabaseSelectionStrategy
{
private ConnectRandomlyToServerGroupImpl strategyImpl;

public ConnectRandomlyToServerGroupStrategy()
{
super( "connect-randomly-to-server-group" );
}

@Override
void init()
{
List<String> groups = config.get( CausalClusteringSettings.connect_randomly_to_server_group_strategy );
strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService,
myself );
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
return strategyImpl.upstreamDatabase();
}
}
Expand Up @@ -19,57 +19,34 @@
*/
package org.neo4j.causalclustering.readreplica;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;

// TODO deprecation warning
@Service.Implementation(UpstreamDatabaseSelectionStrategy.class)
public class ConnectRandomlyWithinServerGroupStrategy extends UpstreamDatabaseSelectionStrategy
{
private Random random = new Random();
private ConnectRandomlyToServerGroupImpl strategyImpl;

public ConnectRandomlyWithinServerGroupStrategy()
{
super( "connect-randomly-within-server-group" );
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
void init()
{
Map<MemberId, ReadReplicaInfo> replicas = topologyService.readReplicas().members();

List<String> groups = config.get( CausalClusteringSettings.server_groups );
if ( groups.isEmpty() )
{
return Optional.empty();
}

List<Map.Entry<MemberId,ReadReplicaInfo>> choices = new ArrayList<>();

for ( String group : groups )
{
List<Map.Entry<MemberId,ReadReplicaInfo>> list = replicas.entrySet().stream()
.filter( entry -> entry.getValue().groups().contains( group ) && !entry.getKey().equals( myself ) )
.collect( Collectors.toList() );
choices.addAll( list );
}
strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService, myself );
}

if ( choices.isEmpty() )
{
return Optional.empty();
}
else
{
return Optional.of( choices.get( random.nextInt( choices.size() ) ).getKey() );
}
@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
return strategyImpl.upstreamDatabase();
}
}
Expand Up @@ -264,9 +264,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
logProvider );

ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy();
defaultStrategy.setTopologyService( topologyService );
defaultStrategy.setConfig( config );
defaultStrategy.setMyself( myself );
defaultStrategy.inject( topologyService, config, myself );

UpstreamDatabaseStrategiesLoader loader;
if ( config.get( CausalClusteringSettings.multi_dc_license ) )
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.causalclustering.readreplica;

import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -38,21 +40,17 @@ public UpstreamDatabaseSelectionStrategy( String key, String... altKeys )
}

// Service loaded can't inject this via the constructor
void setTopologyService( TopologyService topologyService )
void inject( TopologyService topologyService, Config config, MemberId myself )
{
this.topologyService = topologyService;
}

void setConfig( Config config )
{
this.config = config;
}

void setMyself( MemberId myself )
{
this.myself = myself;

init();
}

void init() {}

public abstract Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException;

@Override
Expand All @@ -63,20 +61,7 @@ public String toString()

private static String nicelyCommaSeparatedList( Iterable<String> keys )
{
StringBuilder sb = new StringBuilder();
for ( String key : keys )
{
sb.append( key );
sb.append( "," );
sb.append( " " );
}

int trimThese = sb.lastIndexOf( ", " );
if ( trimThese > 1 )
{
sb.replace( trimThese, sb.length(), "" );
}

return sb.toString();
return StreamSupport.stream( keys.spliterator(), false )
.collect( Collectors.joining( ", " ) );
}
}
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.TopologyService;
Expand Down Expand Up @@ -64,9 +65,7 @@ public Iterator<UpstreamDatabaseSelectionStrategy> iterator()
{
if ( candidate.getKeys().iterator().next().equals( key ) )
{
candidate.setTopologyService( topologyService );
candidate.setConfig( config );
candidate.setMyself( myself );
candidate.inject( topologyService, config, myself );
candidates.add( candidate );
}
}
Expand All @@ -85,20 +84,8 @@ private void log( LinkedHashSet<UpstreamDatabaseSelectionStrategy> candidates )

private static String nicelyCommaSeparatedList( Collection<UpstreamDatabaseSelectionStrategy> items )
{
StringBuilder sb = new StringBuilder();
for ( UpstreamDatabaseSelectionStrategy strategy : items )
{
sb.append( strategy.toString() );
sb.append( "," );
sb.append( " " );
}

int trimThese = sb.lastIndexOf( ", " );
if ( trimThese > 1 )
{
sb.replace( trimThese, sb.length(), "" );
}

return sb.toString();
return items.stream()
.map( UpstreamDatabaseSelectionStrategy::toString )
.collect( Collectors.joining( ", " ) );
}
}
@@ -1,2 +1,3 @@
org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategy
org.neo4j.causalclustering.readreplica.ConnectRandomlyWithinServerGroupStrategy
org.neo4j.causalclustering.readreplica.ConnectRandomlyToServerGroupStrategy

0 comments on commit ef074fb

Please sign in to comment.