Skip to content

Commit

Permalink
UserDefinedConfigurationStrategy should exclude executing server
Browse files Browse the repository at this point in the history
Introduce interfaces DiscoveryServerInfo (empty but useful for type
bounds) and Topology (extracting methods from Core and ReadReplica
Topology). Move *Topology tests to Topology.
  • Loading branch information
andrewkerr9000 committed Aug 24, 2017
1 parent ef074fb commit b15e27d
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 405 deletions.
Expand Up @@ -25,7 +25,7 @@


import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;


public class CoreServerInfo implements CatchupServerAddress, ClientConnector, GroupedServer public class CoreServerInfo implements DiscoveryServerInfo
{ {
private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress raftServer;
private final AdvertisedSocketAddress catchupServer; private final AdvertisedSocketAddress catchupServer;
Expand Down
Expand Up @@ -22,16 +22,14 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;


import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


import static java.lang.String.format; import static java.lang.String.format;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toSet;


public class CoreTopology public class CoreTopology implements Topology<CoreServerInfo>
{ {
static CoreTopology EMPTY = new CoreTopology( null, false, emptyMap() ); static CoreTopology EMPTY = new CoreTopology( null, false, emptyMap() );


Expand All @@ -46,6 +44,7 @@ public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberI
this.coreMembers = new HashMap<>( coreMembers ); this.coreMembers = new HashMap<>( coreMembers );
} }


@Override
public Map<MemberId,CoreServerInfo> members() public Map<MemberId,CoreServerInfo> members()
{ {
return coreMembers; return coreMembers;
Expand All @@ -61,31 +60,12 @@ public boolean canBeBootstrapped()
return canBeBootstrapped; return canBeBootstrapped;
} }


public Optional<CoreServerInfo> find( MemberId memberId )
{
return Optional.ofNullable( coreMembers.get( memberId ) );
}

@Override @Override
public String toString() public String toString()
{ {
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers ); return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers );
} }


TopologyDifference difference( CoreTopology other )
{
Set<MemberId> members = coreMembers.keySet();
Set<MemberId> otherMembers = other.coreMembers.keySet();

Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> Difference.asDifference( other, memberId ) ).collect( toSet() );

Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> Difference.asDifference( CoreTopology.this, memberId ) ).collect( toSet() );

return new TopologyDifference( added, removed );
}

public Optional<MemberId> anyCoreMemberId() public Optional<MemberId> anyCoreMemberId()
{ {
return coreMembers.keySet().stream().findAny(); return coreMembers.keySet().stream().findAny();
Expand Down
Expand Up @@ -32,12 +32,7 @@ private Difference( MemberId memberId, CatchupServerAddress server )
this.server = server; this.server = server;
} }


static Difference asDifference( CoreTopology topology, MemberId memberId ) static <T extends DiscoveryServerInfo> Difference asDifference( Topology<T> topology, MemberId memberId )
{
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
}

static Difference asDifference( ReadReplicaTopology topology, MemberId memberId )
{ {
return new Difference( memberId, topology.find( memberId ).orElse( null ) ); return new Difference( memberId, topology.find( memberId ).orElse( null ) );
} }
Expand Down
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

public interface DiscoveryServerInfo extends CatchupServerAddress, ClientConnector, GroupedServer
{
}
Expand Up @@ -21,7 +21,7 @@


import java.util.Set; import java.util.Set;


interface GroupedServer public interface GroupedServer
{ {
Set<String> groups(); Set<String> groups();
} }
Expand Up @@ -25,7 +25,7 @@


import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;


public class ReadReplicaInfo implements CatchupServerAddress, ClientConnector, GroupedServer public class ReadReplicaInfo implements DiscoveryServerInfo
{ {
private final AdvertisedSocketAddress catchupServerAddress; private final AdvertisedSocketAddress catchupServerAddress;
private final ClientConnectorAddresses clientConnectorAddresses; private final ClientConnectorAddresses clientConnectorAddresses;
Expand Down
Expand Up @@ -22,17 +22,12 @@
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toSet;


import static org.neo4j.causalclustering.discovery.Difference.asDifference; public class ReadReplicaTopology implements Topology<ReadReplicaInfo>


public class ReadReplicaTopology
{ {
static final ReadReplicaTopology EMPTY = new ReadReplicaTopology( emptyMap() ); static final ReadReplicaTopology EMPTY = new ReadReplicaTopology( emptyMap() );


Expand All @@ -48,16 +43,12 @@ public Collection<ReadReplicaInfo> allMemberInfo()
return readReplicaMembers.values(); return readReplicaMembers.values();
} }


public Map<MemberId,ReadReplicaInfo> members() @Override
public Map<MemberId, ReadReplicaInfo> members()
{ {
return readReplicaMembers; return readReplicaMembers;
} }


Optional<ReadReplicaInfo> find( MemberId memberId )
{
return Optional.ofNullable( readReplicaMembers.get( memberId ) );
}

@Override @Override
public String toString() public String toString()
{ {
Expand All @@ -66,27 +57,6 @@ public String toString()


public Optional<MemberId> anyReadReplicaMemberId() public Optional<MemberId> anyReadReplicaMemberId()
{ {
if ( readReplicaMembers.keySet().size() == 0 ) return readReplicaMembers.keySet().stream().findAny();
{
return Optional.empty();
}
else
{
return readReplicaMembers.keySet().stream().findAny();
}
}

TopologyDifference difference( ReadReplicaTopology other )
{
Set<MemberId> members = readReplicaMembers.keySet();
Set<MemberId> otherMembers = other.readReplicaMembers.keySet();

Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() );

Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( ReadReplicaTopology.this, memberId ) ).collect( toSet() );

return new TopologyDifference( added, removed );
} }
} }
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.neo4j.causalclustering.identity.MemberId;

import static java.util.stream.Collectors.toSet;

public interface Topology<T extends DiscoveryServerInfo>
{
Map<MemberId, T> members();

default TopologyDifference difference( Topology<T> other )
{
Set<MemberId> members = members().keySet();
Set<MemberId> otherMembers = other.members().keySet();

Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> Difference.asDifference( other, memberId ) ).collect( toSet() );

Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> Difference.asDifference( this, memberId ) ).collect( toSet() );

return new TopologyDifference( added, removed );
}

default Optional<T> find( MemberId memberId )
{
return Optional.ofNullable( members().get( memberId ) );
}
}
Expand Up @@ -19,13 +19,16 @@
*/ */
package org.neo4j.causalclustering.readreplica; package org.neo4j.causalclustering.readreplica;


import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.DiscoveryServerInfo;
import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.Topology;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.filters.Filter; import org.neo4j.causalclustering.load_balancing.filters.Filter;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilterConfigParser; import org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilterConfigParser;
Expand All @@ -46,30 +49,38 @@ public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionExc
{ {
try try
{ {
Filter<ServerInfo> filters = FilterConfigParser Filter<ServerInfo> filters = FilterConfigParser.parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) );
.parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) );


Set<ServerInfo> possibleReaders = topologyService.readReplicas().members().entrySet().stream() Set<ServerInfo> possibleServers = possibleServers();
.map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(),
entry.getValue().groups() ) ).collect( Collectors.toSet() );


CoreTopology coreTopology = topologyService.coreServers(); return filters.apply( possibleServers ).stream()
for ( MemberId validCore : coreTopology.members().keySet() ) .map( ServerInfo::memberId )
{ .filter( memberId -> !Objects.equals( myself, memberId ) )
Optional<CoreServerInfo> coreServerInfo = coreTopology.find( validCore ); .findFirst();
if ( coreServerInfo.isPresent() )
{
CoreServerInfo serverInfo = coreServerInfo.get();
possibleReaders.add(
new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.groups() ) );
}
}

return filters.apply( possibleReaders ).stream().map( ServerInfo::memberId ).findAny();
} }
catch ( InvalidFilterSpecification invalidFilterSpecification ) catch ( InvalidFilterSpecification invalidFilterSpecification )
{ {
return Optional.empty(); return Optional.empty();
} }
} }

private Set<ServerInfo> possibleServers()
{
Stream<Map.Entry<MemberId, ? extends DiscoveryServerInfo>> infoMap =
Stream.of( topologyService.readReplicas(), topologyService.coreServers() )
.map( Topology::members )
.map( Map::entrySet )
.flatMap( Set::stream );

return infoMap
.map( this::toServerInfo )
.collect( Collectors.toSet() );
}

private <T extends DiscoveryServerInfo> ServerInfo toServerInfo( Map.Entry<MemberId, T> entry )
{
T server = entry.getValue();
MemberId memberId = entry.getKey();
return new ServerInfo( server.connectors().boltAddress(), memberId, server.groups() );
}
} }

0 comments on commit b15e27d

Please sign in to comment.