diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java index cab2703c10b3..fabfdb8036e7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java @@ -25,7 +25,7 @@ import static java.util.Collections.emptySet; -public class CoreServerInfo implements CatchupServerAddress, ClientConnector, GroupedServer +public class CoreServerInfo implements DiscoveryServerInfo { private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress catchupServer; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index fe84b1c834f4..b5fa19d5c8a9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -22,16 +22,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import static java.lang.String.format; import static java.util.Collections.emptyMap; -import static java.util.stream.Collectors.toSet; -public class CoreTopology +public class CoreTopology implements Topology { static CoreTopology EMPTY = new CoreTopology( null, false, emptyMap() ); @@ -46,6 +44,7 @@ public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map( coreMembers ); } + @Override public Map members() { return coreMembers; @@ -61,31 +60,12 @@ public boolean canBeBootstrapped() return canBeBootstrapped; } - public Optional find( MemberId memberId ) - { - return Optional.ofNullable( coreMembers.get( memberId ) ); - } - @Override public String toString() { return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers ); } - TopologyDifference difference( CoreTopology other ) - { - Set members = coreMembers.keySet(); - Set otherMembers = other.coreMembers.keySet(); - - Set added = otherMembers.stream().filter( m -> !members.contains( m ) ) - .map( memberId -> Difference.asDifference( other, memberId ) ).collect( toSet() ); - - Set removed = members.stream().filter( m -> !otherMembers.contains( m ) ) - .map( memberId -> Difference.asDifference( CoreTopology.this, memberId ) ).collect( toSet() ); - - return new TopologyDifference( added, removed ); - } - public Optional anyCoreMemberId() { return coreMembers.keySet().stream().findAny(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Difference.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Difference.java index 4bfd1ccf8fa5..3801b4f1d0a1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Difference.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Difference.java @@ -32,12 +32,7 @@ private Difference( MemberId memberId, CatchupServerAddress server ) this.server = server; } - static Difference asDifference( CoreTopology topology, MemberId memberId ) - { - return new Difference( memberId, topology.find( memberId ).orElse( null ) ); - } - - static Difference asDifference( ReadReplicaTopology topology, MemberId memberId ) + static Difference asDifference( Topology topology, MemberId memberId ) { return new Difference( memberId, topology.find( memberId ).orElse( null ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServerInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServerInfo.java new file mode 100644 index 000000000000..623282b308a3 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServerInfo.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +public interface DiscoveryServerInfo extends CatchupServerAddress, ClientConnector, GroupedServer +{ +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/GroupedServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/GroupedServer.java index e3a049097b80..fd1140168beb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/GroupedServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/GroupedServer.java @@ -21,7 +21,7 @@ import java.util.Set; -interface GroupedServer +public interface GroupedServer { Set groups(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java index 7852c45d5bb4..8f1ef5b967b8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java @@ -25,7 +25,7 @@ import static java.util.Collections.emptySet; -public class ReadReplicaInfo implements CatchupServerAddress, ClientConnector, GroupedServer +public class ReadReplicaInfo implements DiscoveryServerInfo { private final AdvertisedSocketAddress catchupServerAddress; private final ClientConnectorAddresses clientConnectorAddresses; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index 1b4536b43fd5..70921b063a01 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -22,17 +22,12 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.neo4j.causalclustering.identity.MemberId; import static java.util.Collections.emptyMap; -import static java.util.stream.Collectors.toSet; -import static org.neo4j.causalclustering.discovery.Difference.asDifference; - - -public class ReadReplicaTopology +public class ReadReplicaTopology implements Topology { static final ReadReplicaTopology EMPTY = new ReadReplicaTopology( emptyMap() ); @@ -48,16 +43,12 @@ public Collection allMemberInfo() return readReplicaMembers.values(); } - public Map members() + @Override + public Map members() { return readReplicaMembers; } - Optional find( MemberId memberId ) - { - return Optional.ofNullable( readReplicaMembers.get( memberId ) ); - } - @Override public String toString() { @@ -66,27 +57,6 @@ public String toString() public Optional anyReadReplicaMemberId() { - if ( readReplicaMembers.keySet().size() == 0 ) - { - return Optional.empty(); - } - else - { - return readReplicaMembers.keySet().stream().findAny(); - } - } - - TopologyDifference difference( ReadReplicaTopology other ) - { - Set members = readReplicaMembers.keySet(); - Set otherMembers = other.readReplicaMembers.keySet(); - - Set added = otherMembers.stream().filter( m -> !members.contains( m ) ) - .map( memberId -> asDifference( other, memberId ) ).collect( toSet() ); - - Set removed = members.stream().filter( m -> !otherMembers.contains( m ) ) - .map( memberId -> asDifference( ReadReplicaTopology.this, memberId ) ).collect( toSet() ); - - return new TopologyDifference( added, removed ); + return readReplicaMembers.keySet().stream().findAny(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Topology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Topology.java new file mode 100644 index 000000000000..32ba1407c8ce --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/Topology.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.neo4j.causalclustering.identity.MemberId; + +import static java.util.stream.Collectors.toSet; + +public interface Topology +{ + Map members(); + + default TopologyDifference difference( Topology other ) + { + Set members = members().keySet(); + Set otherMembers = other.members().keySet(); + + Set added = otherMembers.stream().filter( m -> !members.contains( m ) ) + .map( memberId -> Difference.asDifference( other, memberId ) ).collect( toSet() ); + + Set removed = members.stream().filter( m -> !otherMembers.contains( m ) ) + .map( memberId -> Difference.asDifference( this, memberId ) ).collect( toSet() ); + + return new TopologyDifference( added, removed ); + } + + default Optional find( MemberId memberId ) + { + return Optional.ofNullable( members().get( memberId ) ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java index 0dfb1d250285..626a4c9d511e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java @@ -19,13 +19,16 @@ */ package org.neo4j.causalclustering.readreplica; +import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.discovery.CoreServerInfo; -import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.DiscoveryServerInfo; +import org.neo4j.causalclustering.discovery.Topology; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.load_balancing.filters.Filter; import org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilterConfigParser; @@ -46,30 +49,38 @@ public Optional upstreamDatabase() throws UpstreamDatabaseSelectionExc { try { - Filter filters = FilterConfigParser - .parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) ); + Filter filters = FilterConfigParser.parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) ); - Set possibleReaders = topologyService.readReplicas().members().entrySet().stream() - .map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(), - entry.getValue().groups() ) ).collect( Collectors.toSet() ); + Set possibleServers = possibleServers(); - CoreTopology coreTopology = topologyService.coreServers(); - for ( MemberId validCore : coreTopology.members().keySet() ) - { - Optional coreServerInfo = coreTopology.find( validCore ); - if ( coreServerInfo.isPresent() ) - { - CoreServerInfo serverInfo = coreServerInfo.get(); - possibleReaders.add( - new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.groups() ) ); - } - } - - return filters.apply( possibleReaders ).stream().map( ServerInfo::memberId ).findAny(); + return filters.apply( possibleServers ).stream() + .map( ServerInfo::memberId ) + .filter( memberId -> !Objects.equals( myself, memberId ) ) + .findFirst(); } catch ( InvalidFilterSpecification invalidFilterSpecification ) { return Optional.empty(); } } + + private Set possibleServers() + { + Stream> infoMap = + Stream.of( topologyService.readReplicas(), topologyService.coreServers() ) + .map( Topology::members ) + .map( Map::entrySet ) + .flatMap( Set::stream ); + + return infoMap + .map( this::toServerInfo ) + .collect( Collectors.toSet() ); + } + + private ServerInfo toServerInfo( Map.Entry entry ) + { + T server = entry.getValue(); + MemberId memberId = entry.getKey(); + return new ServerInfo( server.connectors().boltAddress(), memberId, server.groups() ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java deleted file mode 100644 index 6d4b8a9176cb..000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.discovery; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.hamcrest.Matchers; -import org.junit.Test; - -import org.neo4j.causalclustering.identity.ClusterId; -import org.neo4j.causalclustering.identity.MemberId; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; - -public class CoreTopologyTest -{ - @Test - public void identicalTopologiesShouldHaveNoDifference() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map coreMembers = new HashMap<>(); - coreMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); - coreMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - - CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers ); - - // when - TopologyDifference diff = topology.difference(topology); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); - } - - @Test - public void shouldDetectAddedMembers() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); - initialMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); - newMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - - CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); - - // when - TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); - } - - @Test - public void shouldDetectRemovedMembers() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); - initialMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - - CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); - - // when - TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 1 ) ); - } - - @Test - public void shouldDetectAddedAndRemovedMembers() throws Exception - { - // given - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - - CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); - - // when - TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 2 ) ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyTest.java deleted file mode 100644 index 0f6501c5a8cb..000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.discovery; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.hamcrest.Matchers; -import org.junit.Test; - -import org.neo4j.causalclustering.identity.MemberId; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; - -public class ReadReplicaTopologyTest -{ - @Test - public void identicalTopologiesShouldHaveNoDifference() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map readReplicaMembers = new HashMap<>(); - readReplicaMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) ); - readReplicaMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) ); - - ReadReplicaTopology topology = new ReadReplicaTopology( readReplicaMembers ); - - // when - TopologyDifference diff = topology.difference(topology); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); - } - - @Test - public void shouldDetectAddedMembers() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) ); - initialMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) ); - newMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); - - ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); - - // when - TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); - } - - @Test - public void shouldDetectRemovedMembers() throws Exception - { - // given - UUID one = UUID.randomUUID(); - UUID two = UUID.randomUUID(); - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) ); - initialMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) ); - - ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); - - // when - TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 1 ) ); - } - - @Test - public void shouldDetectAddedAndRemovedMembers() throws Exception - { - // given - - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); - - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); - - ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); - - // when - TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); - - // then - assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); - assertThat( diff.removed().size(), Matchers.equalTo( 2 ) ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyTest.java new file mode 100644 index 000000000000..c5aed7f40b25 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyTest.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.Test; + +import org.neo4j.causalclustering.identity.MemberId; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; + +public class TopologyTest +{ + @Test + public void identicalTopologiesShouldHaveNoDifference() throws Exception + { + // given + Map readReplicaMembers = randomMembers( 5 ); + + TestTopology topology = new TestTopology( readReplicaMembers ); + + // when + TopologyDifference diff = topology.difference(topology); + + // then + assertThat( diff.added(), hasSize( 0 ) ); + assertThat( diff.removed(), hasSize( 0 ) ); + } + + @Test + public void shouldDetectAddedMembers() throws Exception + { + // given + Map initialMembers = randomMembers( 3 ); + + Map newMembers = new HashMap<>( initialMembers ); + int newMemberQuantity = 2; + IntStream.range( 0, newMemberQuantity ) + .forEach( ignored -> putRandomMember( newMembers ) ); + + TestTopology topology = new TestTopology( initialMembers ); + + // when + TopologyDifference diff = topology.difference(new TestTopology( newMembers )); + + // then + assertThat( diff.added(), hasSize( newMemberQuantity ) ); + assertThat( diff.removed(), hasSize( 0 ) ); + } + + @Test + public void shouldDetectRemovedMembers() throws Exception + { + Map initialMembers = randomMembers( 3 ); + + Map newMembers = new HashMap<>( initialMembers ); + int removedMemberQuantity = 2; + IntStream.range( 0, removedMemberQuantity ) + .forEach( ignored -> removeArbitraryMember( newMembers ) ); + + TestTopology topology = new TestTopology( initialMembers ); + + // when + TopologyDifference diff = topology.difference(new TestTopology( newMembers )); + + // then + assertThat( diff.added(), hasSize( 0 ) ); + assertThat( diff.removed(), hasSize( removedMemberQuantity ) ); + } + + @Test + public void shouldDetectAddedAndRemovedMembers() throws Exception + { + // given + int initialQuantity = 4; + int newQuantity = 5; + Map initialMembers = randomMembers( initialQuantity ); + Map newMembers = randomMembers( newQuantity ); + + TestTopology topology = new TestTopology( initialMembers ); + + // when + TopologyDifference diff = topology.difference(new TestTopology( newMembers )); + + // then + assertThat( diff.added(), hasSize( newQuantity ) ); + assertThat( diff.removed(), hasSize( initialQuantity ) ); + } + + private static class TestTopology implements Topology + { + private final Map members; + + private TestTopology( Map members ) + { + this.members = members; + } + + @Override + public Map members() + { + return members; + } + } + + private Map randomMembers( int quantity ) + { + return Stream.generate( UUID::randomUUID ) + .limit( quantity ) + .collect( Collectors.toMap( MemberId::new, ignored -> mock(ReadReplicaInfo.class) ) ); + + } + + private void putRandomMember( Map newmembers ) + { + newmembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) ); + } + + private void removeArbitraryMember( Map members ) + { + members.remove( + members.keySet().stream().findAny() + .orElseThrow( () -> new AssertionError( "Removing members of an empty map" ) ) + ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java index 3713e671aafa..4aeb7fc86414 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java @@ -21,12 +21,17 @@ import org.junit.Test; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; @@ -39,28 +44,29 @@ import org.neo4j.helpers.collection.MapUtil; import org.neo4j.kernel.configuration.Config; +import static co.unruly.matchers.OptionalMatchers.contains; +import static co.unruly.matchers.OptionalMatchers.empty; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isIn; +import static org.junit.Assert.assertThat; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.helpers.collection.MapUtil.stringMap; - public class UserDefinedConfigurationStrategyTest { @Test - public void shouldPickTheFirstMatchingServer() throws Exception + public void shouldPickTheFirstMatchingServerIfCore() throws Exception { // given MemberId theCoreMemberId = new MemberId( UUID.randomUUID() ); - TopologyService topologyService = fakeTopologyService( fakeCoreTopology( theCoreMemberId ), - fakeReadReplicaTopology( memberIDs( 100 ), new NoEastGroupGenerator() ) ); + TopologyService topologyService = + fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ), this::noEastGroupGenerator ) ); UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); - Config config = Config.defaults() - .with( stringMap( CausalClusteringSettings.user_defined_upstream_selection_strategy.name(), - "groups(east); groups(core); halt()" ) ); + Config config = configWithFilter( "groups(east); groups(core); halt()" ); strategy.inject( topologyService, config, null ); @@ -69,36 +75,135 @@ public void shouldPickTheFirstMatchingServer() throws Exception Optional memberId = strategy.upstreamDatabase(); // then - assertEquals( theCoreMemberId, memberId.get() ); + assertThat( memberId, contains( theCoreMemberId ) ); } - private ReadReplicaTopology fakeReadReplicaTopology( MemberId[] readReplicaIds, NoEastGroupGenerator groupGenerator ) + @Test + public void shouldPickTheFirstMatchingServerIfReadReplica() throws Exception { - assert readReplicaIds.length > 0; + // given + MemberId[] readReplicaIds = memberIDs( 100 ); + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( new MemberId( UUID.randomUUID() ) ), + fakeReadReplicaTopology( readReplicaIds, this::noEastGroupGenerator ) ); + + UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); + String wantedGroup = noEastGroup.get( 1 ); + Config config = configWithFilter( "groups(" + wantedGroup + "); halt()" ); + + strategy.inject( topologyService, config, null ); - Map readReplicas = new HashMap<>(); + //when - int offset = 0; + Optional memberId = strategy.upstreamDatabase(); - for ( MemberId memberId : readReplicaIds ) - { - readReplicas.put( memberId, new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( - new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, - new AdvertisedSocketAddress( "localhost", 11000 + offset ) ) ) ), - new AdvertisedSocketAddress( "localhost", 10000 + offset ), groupGenerator.get( memberId ) ) ); + // then + assertThat( memberId, contains( isIn( readReplicaIds ) ) ); + assertThat( memberId.map( this::noEastGroupGenerator ), contains( equalTo( asSet( wantedGroup ) ) ) ); + } + + @Test + public void shouldReturnEmptyIfNoMatchingServers() throws Exception + { + // given + MemberId[] readReplicaIds = memberIDs( 100 ); + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( new MemberId( UUID.randomUUID() ) ), + fakeReadReplicaTopology( readReplicaIds, this::noEastGroupGenerator ) ); + + UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); + String wantedGroup = eastGroup; + Config config = configWithFilter( "groups(" + wantedGroup + "); halt()" ); + + strategy.inject( topologyService, config, null ); + + //when + + Optional memberId = strategy.upstreamDatabase(); + + // then + assertThat( memberId, empty() ); + } + + @Test + public void shouldReturnEmptyIfInvalidFilterSpecification() throws Exception + { + // given + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( new MemberId( UUID.randomUUID() ) ), + fakeReadReplicaTopology( memberIDs( 100 ), this::noEastGroupGenerator ) ); + + UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); + Config config = configWithFilter( "invalid filter specification" ); + + strategy.inject( topologyService, config, null ); + + //when + + Optional memberId = strategy.upstreamDatabase(); + + // then + assertThat( memberId, empty() ); + } + + @Test + public void shouldNotReturnSelf() throws Exception + { + // given + String wantedGroup = eastGroup; + MemberId[] readReplicaIds = memberIDs( 1 ); + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( new MemberId( UUID.randomUUID() ) ), + fakeReadReplicaTopology( readReplicaIds, memberId -> asSet( wantedGroup ) ) ); + + UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); + Config config = configWithFilter( "groups(" + wantedGroup + "); halt()" ); + + strategy.inject( topologyService, config, readReplicaIds[0] ); - offset++; - } + //when + + Optional memberId = strategy.upstreamDatabase(); + + // then + assertThat( memberId, empty() ); + } + + private Config configWithFilter( String filter ) + { + return Config.defaults() + .with( stringMap( CausalClusteringSettings.user_defined_upstream_selection_strategy.name(), + filter ) ); + } + + static ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds ) + { + return fakeReadReplicaTopology( readReplicaIds, ignored -> Collections.emptySet() ); + } + + static ReadReplicaTopology fakeReadReplicaTopology( MemberId[] readReplicaIds, Function> groupGenerator ) + { + assert readReplicaIds.length > 0; + + final AtomicInteger offset = new AtomicInteger( 10_000 ); + + Function toReadReplicaInfo = memberId -> readReplicaInfo( memberId, offset, groupGenerator ); + + Map readReplicas = Stream.of( readReplicaIds ) + .collect( Collectors.toMap( Function.identity(), toReadReplicaInfo ) ); return new ReadReplicaTopology( readReplicas ); } + private static ReadReplicaInfo readReplicaInfo( MemberId memberId, AtomicInteger offset, Function> groupGenerator ) + { + return new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( + new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, + new AdvertisedSocketAddress( "localhost", offset.getAndIncrement() ) ) ) ), + new AdvertisedSocketAddress( "localhost", offset.getAndIncrement() ), groupGenerator.apply( memberId ) ); + } + static TopologyService fakeTopologyService( CoreTopology coreTopology, ReadReplicaTopology readReplicaTopology ) { return new TopologyService() { - private Map catchupAddresses = - extractCatchupAddressesMap( coreTopology, readReplicaTopology ); + private Map catchupAddresses = extractCatchupAddressesMap( coreTopology, readReplicaTopology ); @Override public CoreTopology coreServers() @@ -146,45 +251,20 @@ public void shutdown() throws Throwable static MemberId[] memberIDs( int howMany ) { - MemberId[] result = new MemberId[howMany]; - - for ( int i = 0; i < howMany; i++ ) - { - result[i] = new MemberId( UUID.randomUUID() ); - } - - return result; + return Stream.generate( () -> new MemberId( UUID.randomUUID() ) ) + .limit( howMany ) + .toArray( MemberId[]::new ); } - static ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds ) - { - assert readReplicaIds.length > 0; - - Map readReplicas = new HashMap<>(); - - int offset = 0; - - for ( MemberId memberId : readReplicaIds ) - { - readReplicas.put( memberId, new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( - new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, - new AdvertisedSocketAddress( "localhost", 11000 + offset ) ) ) ), - new AdvertisedSocketAddress( "localhost", 10000 + offset ) ) ); - - offset++; - } + private final String northGroup = "north"; + private final String southGroup = "south"; + private final String westGroup = "west"; + private final String eastGroup = "east"; + private final List noEastGroup = Arrays.asList( northGroup, southGroup, westGroup ); - return new ReadReplicaTopology( readReplicas ); - } - - private static class NoEastGroupGenerator + private Set noEastGroupGenerator( MemberId memberId ) { - private static final String[] SOME_ORDINALS = {"north", "south", "west"}; - private static final Random random = new Random(); - - public Set get( MemberId memberId ) - { - return asSet( SOME_ORDINALS[random.nextInt( SOME_ORDINALS.length )] ); - } + int index = Math.abs( memberId.hashCode() ) % noEastGroup.size(); + return asSet( noEastGroup.get( index ) ); } }