diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java index 17a3ff49351c4..8704e847f32d3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java @@ -66,11 +66,12 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe private Outbound outbound; public CoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors, - int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService ) + int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, + long logThresholdMillis ) { senderService = new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels ); - this.outbound = new CoreOutbound( discoveryService, senderService ); + this.outbound = new CoreOutbound( discoveryService, senderService, logProvider, logThresholdMillis ); this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/CoreToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/CoreToCoreClient.java index 07df3a2923bd3..f24b3583aaadd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/CoreToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/CoreToCoreClient.java @@ -53,9 +53,11 @@ public class CoreToCoreClient extends CoreClient { public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors, - int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService ) + int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService, + long logThresholdMillis ) { - super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService ); + super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService, + logThresholdMillis ); } public static class ChannelInitializer extends io.netty.channel.ChannelInitializer diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java index 268dd1e7c2abc..aba483b6f13e8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java @@ -50,9 +50,11 @@ public class EdgeToCoreClient extends CoreClient { public EdgeToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors, - int maxQueueSize, NonBlockingChannels nonBlockingChannels, EdgeTopologyService discoveryService ) + int maxQueueSize, NonBlockingChannels nonBlockingChannels, EdgeTopologyService discoveryService, + long logThresholdMillis ) { - super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService ); + super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService, + logThresholdMillis ); } public static class ChannelInitializer extends io.netty.channel.ChannelInitializer diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClusterTopology.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClusterTopology.java index 31d5bbd6203d2..ac92ff9a82f70 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClusterTopology.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClusterTopology.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.discovery; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -45,7 +46,12 @@ public Set coreMembers() return coreMembers.keySet(); } - public Set edgeMembers() + public Collection coreMemberAddresses() + { + return coreMembers.values(); + } + + public Set edgeMemberAddresses() { return edgeAddresses; } @@ -55,9 +61,14 @@ boolean canBeBootstrapped() return canBeBootstrapped; } - public CoreAddresses coreAddresses(CoreMember coreMember) + public CoreAddresses coreAddresses( CoreMember coreMember ) throws NoKnownAddressesException { - return coreMembers.get( coreMember ); + CoreAddresses coreAddresses = coreMembers.get( coreMember ); + if ( coreAddresses == null ) + { + throw new NoKnownAddressesException(); + } + return coreAddresses; } @Override @@ -66,7 +77,7 @@ public String toString() return "TestOnlyClusterTopology{" + "coreMembers.size()=" + coreMembers.size() + ", bootstrappable=" + canBeBootstrapped() + - ", edgeMembers.size()=" + edgeAddresses.size() + + ", edgeMemberAddresses.size()=" + edgeAddresses.size() + '}'; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/NoKnownAddressesException.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/NoKnownAddressesException.java new file mode 100644 index 0000000000000..c8a685a7ad047 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/NoKnownAddressesException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +public class NoKnownAddressesException extends Exception +{ +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java index a5ca519826cf1..807ae68ecddb1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java @@ -1,3 +1,22 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package org.neo4j.coreedge.discovery; import org.neo4j.kernel.lifecycle.Lifecycle; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/CoreOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/CoreOutbound.java index 82ac3c93101f5..c14d550251307 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/CoreOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/CoreOutbound.java @@ -19,46 +19,57 @@ */ package org.neo4j.coreedge.raft.net; +import java.time.Clock; import java.util.Collection; import org.neo4j.coreedge.discovery.CoreAddresses; +import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.discovery.TopologyService; import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.logging.LogProvider; public class CoreOutbound implements Outbound { private final TopologyService discoveryService; private final Outbound outbound; + private UnknownAddressMonitor unknownAddressMonitor; - public CoreOutbound( TopologyService discoveryService, Outbound outbound ) + public CoreOutbound( TopologyService discoveryService, Outbound outbound, + LogProvider logProvider, long logThresholdMillis ) { this.discoveryService = discoveryService; this.outbound = outbound; + this.unknownAddressMonitor = new UnknownAddressMonitor( + logProvider.getLog( this.getClass() ), Clock.systemUTC(), logThresholdMillis ); } @Override public void send( CoreMember to, Message message ) { - CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); - if ( coreAddresses != null ) + try { + CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); outbound.send( coreAddresses.getCoreServer(), message ); } - // Drop messages for servers that are missing from the cluster topology; - // discovery service thinks that they are offline, so it's not worth trying to send them anything. + catch ( NoKnownAddressesException e ) + { + unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to ); + } } @Override public void send( CoreMember to, Collection messages ) { - CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); - if ( coreAddresses != null ) + try { + CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); outbound.send( coreAddresses.getCoreServer(), messages ); } - // Drop messages for servers that are missing from the cluster topology; - // discovery service thinks that they are offline, so it's not worth trying to send them anything. + catch ( NoKnownAddressesException e ) + { + unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to ); + } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java index 908ae589a150c..394c831866997 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java @@ -19,16 +19,19 @@ */ package org.neo4j.coreedge.raft.net; +import java.time.Clock; import java.util.Collection; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.discovery.CoreAddresses; import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.RaftMessages.StoreIdAwareMessage; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.logging.LogProvider; import static java.util.stream.Collectors.toList; @@ -37,38 +40,45 @@ public class RaftOutbound implements Outbound private final CoreTopologyService discoveryService; private final Outbound outbound; private final LocalDatabase localDatabase; + private final UnknownAddressMonitor unknownAddressMonitor; public RaftOutbound( CoreTopologyService discoveryService, Outbound outbound, - LocalDatabase localDatabase ) + LocalDatabase localDatabase, LogProvider logProvider, long logThresholdMillis ) { this.discoveryService = discoveryService; this.outbound = outbound; this.localDatabase = localDatabase; + this.unknownAddressMonitor = new UnknownAddressMonitor( + logProvider.getLog( this.getClass() ), Clock.systemUTC(), logThresholdMillis ); } @Override public void send( CoreMember to, RaftMessage message ) { - CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); - if ( coreAddresses != null ) + try { + CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); outbound.send( coreAddresses.getRaftServer(), decorateWithStoreId( message ) ); } - // Drop messages for servers that are missing from the cluster topology; - // discovery service thinks that they are offline, so it's not worth trying to send them anything. + catch ( NoKnownAddressesException e ) + { + unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to ); + } } @Override public void send( CoreMember to, Collection messages ) { - CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); - if ( coreAddresses != null ) + try { + CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to ); outbound.send( coreAddresses.getRaftServer(), messages.stream().map( this::decorateWithStoreId ).collect( toList() ) ); } - // Drop messages for servers that are missing from the cluster topology; - // discovery service thinks that they are offline, so it's not worth trying to send them anything. + catch ( NoKnownAddressesException e ) + { + unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to ); + } } private StoreIdAwareMessage decorateWithStoreId( RaftMessage m ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitor.java new file mode 100644 index 0000000000000..c0bf86ad23d4e --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitor.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.net; + +import java.time.Clock; +import java.util.HashMap; + +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.logging.Log; + +public class UnknownAddressMonitor +{ + private final Log log; + private final Clock clock; + private final long logThreshold; + + private HashMap throttle = new HashMap<>( ); + + public UnknownAddressMonitor( Log log, Clock clock, long logThresholdMillis ) + { + this.log = log; + this.clock = clock; + this.logThreshold = logThresholdMillis; + } + + void logAttemptToSendToMemberWithNoKnownAddress( CoreMember to ) + { + long currentTime = clock.millis(); + Long lastLogged = throttle.get( to ); + if ( lastLogged == null || (currentTime - lastLogged) > logThreshold ) + { + log.info( "No address found for member %s, probably because the member has been shut down; " + + "dropping message.", to ); + throttle.put( to, currentTime ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index f538be8cbff3d..2551899c3e563 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -33,7 +33,6 @@ import org.neo4j.coreedge.raft.state.follower.FollowerState; import org.neo4j.coreedge.raft.state.follower.FollowerStates; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.server.StoreId; import org.neo4j.helpers.collection.FilteringIterable; import org.neo4j.logging.Log; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java index 692ff5483f5e3..3ee286a381acd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java @@ -217,6 +217,10 @@ public String toString() public static final Setting cluster_name = setting( "core_edge.cluster_name", STRING, "core-cluster", illegalValueMessage( "must be a valid cluster name", matches( ANY ) ) ); + @Description("Throttle limit for logging unknown cluster member address") + public static final Setting unknown_address_logging_throttle = + setting( "core_edge.unknown_address_logging_throttle", DURATION, "10000ms" ); + @Description( "Maximum transaction batch size for edge servers when applying transactions pulled from core servers." ) @Internal public static Setting edge_transaction_applier_batch_size = setting( "core_edge.edge_transaction_applier_batch_size", INTEGER, "16" ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java index 4068285db840e..550793b00b4e4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java @@ -28,19 +28,20 @@ import org.neo4j.coreedge.discovery.CoreAddresses; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.EdgeAddresses; +import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.NoLeaderFoundException; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.helpers.collection.Iterators; import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Neo4jTypes; import org.neo4j.kernel.api.proc.ProcedureSignature; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toSet; import static org.neo4j.helpers.collection.Iterators.asRawIterator; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; @@ -65,25 +66,25 @@ public AcquireEndpointsProcedure( CoreTopologyService discoveryService, @Override public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException { + Set writeEndpoints = emptySet(); try { CoreMember leader = leaderLocator.getLeader(); AdvertisedSocketAddress leaderAddress = discoveryService.currentTopology().coreAddresses( leader ).getBoltServer(); - Set writeEndpoints = writeEndpoints( leaderAddress ); - Set readEndpoints = readEndpoints( leaderAddress ); - - log.info( "Write: %s, Read: %s", - writeEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ), - readEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ) ); - - return wrapUpEndpoints( writeEndpoints, readEndpoints ); + writeEndpoints = writeEndpoints( leaderAddress ); } - catch ( NoLeaderFoundException e ) + catch ( NoLeaderFoundException | NoKnownAddressesException e ) { - throw new ProcedureException( Status.Cluster.NoLeader, - "No write server found. This can happen during a leader switch. " ); + log.debug( "No write server found. This can happen during a leader switch." ); } + Set readEndpoints = readEndpoints(); + + log.info( "Write: %s, Read: %s", + writeEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ), + readEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ) ); + + return wrapUpEndpoints( writeEndpoints, readEndpoints ); } private Set writeEndpoints( AdvertisedSocketAddress leader ) @@ -98,17 +99,16 @@ private RawIterator wrapUpEndpoints( Set readEndpoints( AdvertisedSocketAddress leader ) throws NoLeaderFoundException + private Set readEndpoints() { ClusterTopology clusterTopology = discoveryService.currentTopology(); - Stream readEdge = clusterTopology.edgeMembers().stream() + Stream readEdge = clusterTopology.edgeMemberAddresses().stream() .map( EdgeAddresses::getBoltAddress ); - Stream readCore = clusterTopology.coreMembers().stream() - .map( clusterTopology::coreAddresses ).map( CoreAddresses::getBoltServer ); - Stream readLeader = Stream.of( leader ); + Stream readCore = clusterTopology.coreMemberAddresses().stream() + .map( CoreAddresses::getBoltServer ); - return Stream.concat( Stream.concat( readEdge, readCore ), readLeader ).map( ReadWriteEndPoint::read ) + return Stream.concat( readEdge, readCore ).map( ReadWriteEndPoint::read ) .limit( 1 ).collect( toSet() ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedure.java index 52d804f678668..f6516242d5a28 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedure.java @@ -19,27 +19,28 @@ */ package org.neo4j.coreedge.server.core; +import java.util.HashSet; import java.util.Set; import java.util.UUID; -import java.util.stream.Stream; import org.neo4j.collection.RawIterator; import org.neo4j.coreedge.discovery.ClusterTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.EdgeAddresses; +import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.NoLeaderFoundException; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.helpers.collection.Iterators; import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Neo4jTypes; import org.neo4j.kernel.api.proc.ProcedureSignature; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; -import static org.neo4j.coreedge.server.core.ClusterOverviewProcedure.ReadWriteEndPoint.follower; -import static org.neo4j.coreedge.server.core.ClusterOverviewProcedure.ReadWriteEndPoint.leader; import static org.neo4j.helpers.collection.Iterators.asRawIterator; +import static org.neo4j.helpers.collection.Iterators.map; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; public class ClusterOverviewProcedure extends CallableProcedure.BasicProcedure @@ -47,67 +48,73 @@ public class ClusterOverviewProcedure extends CallableProcedure.BasicProcedure public static final String NAME = "overview"; private final CoreTopologyService discoveryService; private final LeaderLocator leaderLocator; + private final Log log; ClusterOverviewProcedure( CoreTopologyService discoveryService, - LeaderLocator leaderLocator) + LeaderLocator leaderLocator, LogProvider logProvider ) { super( procedureSignature( new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, NAME ) ) - .out( "id", Neo4jTypes.NTString ) - .out( "address", Neo4jTypes.NTString ) + .out( "id", Neo4jTypes.NTString ).out( "address", Neo4jTypes.NTString ) .out( "role", Neo4jTypes.NTString ).build() ); this.discoveryService = discoveryService; this.leaderLocator = leaderLocator; + this.log = logProvider.getLog( getClass() ); } @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException { + Set endpoints = new HashSet<>(); + ClusterTopology clusterTopology = discoveryService.currentTopology(); + Set coreMembers = clusterTopology.coreMembers(); + CoreMember leader = null; try { - CoreMember leader = leaderLocator.getLeader(); - ClusterTopology clusterTopology = discoveryService.currentTopology(); - - Set coreMembers = clusterTopology.coreMembers(); - - Stream leaderEndpoint = coreMembers.stream() - .filter( c -> c.equals( leader ) ) - .map( c -> leader( clusterTopology.coreAddresses( c ).getBoltServer(), c.getUuid() ) ); - - Stream followerEndpoints = coreMembers.stream() - .filter( c -> !c.equals( leader ) ) - .map( c -> follower( clusterTopology.coreAddresses( c ).getBoltServer(), c.getUuid() ) ); - - Stream readReplicaEndpoints = clusterTopology.edgeMembers().stream().map( m -> - ReadWriteEndPoint.readReplica( m.getBoltAddress() ) ); - - Stream allTheEndpoints = Stream.concat( leaderEndpoint, - Stream.concat( followerEndpoints, readReplicaEndpoints ) ); - - return Iterators.map( ( l ) -> new Object[]{l.identifier(), l.address(), l.type()}, - asRawIterator( allTheEndpoints.iterator() ) ); - + leader = leaderLocator.getLeader(); } catch ( NoLeaderFoundException e ) { - throw new ProcedureException( Status.Cluster.NoLeader, - "No write server found. This can happen during a leader switch. " ); + log.debug( "No write server found. This can happen during a leader switch." ); + } + + for ( CoreMember coreMember : coreMembers ) + { + AdvertisedSocketAddress boltServerAddress = null; + try + { + boltServerAddress = clusterTopology.coreAddresses( coreMember ).getBoltServer(); + } + catch ( NoKnownAddressesException e ) + { + log.debug( "Address found for " ); + } + Type type = coreMember.equals( leader ) ? Type.LEADER : Type.FOLLOWER; + endpoints.add( new ReadWriteEndPoint( boltServerAddress, type, coreMember.getUuid() ) ); } + for ( EdgeAddresses edgeAddresses : clusterTopology.edgeMemberAddresses() ) + { + endpoints.add( new ReadWriteEndPoint( edgeAddresses.getBoltAddress(), Type.READ_REPLICA, null ) ); + } + return map( ( l ) -> new Object[]{l.identifier(), l.address(), l.type()}, + asRawIterator( endpoints.iterator() ) ); } public enum Type { - LEADER, FOLLOWER, READ_REPLICA + LEADER, + FOLLOWER, + READ_REPLICA } - static class ReadWriteEndPoint + private static class ReadWriteEndPoint { private final AdvertisedSocketAddress address; private final Type type; - private final String identifier; + private final UUID identifier; public String address() { - return address.toString(); + return address == null ? null : address.toString(); } public String type() @@ -117,29 +124,15 @@ public String type() String identifier() { - return identifier ; + return identifier == null ? null : identifier.toString(); } - ReadWriteEndPoint(AdvertisedSocketAddress address, Type type, String identifier) + public ReadWriteEndPoint( AdvertisedSocketAddress address, Type type, UUID identifier ) { this.address = address; this.type = type; this.identifier = identifier; } - public static ReadWriteEndPoint leader( AdvertisedSocketAddress address, UUID identifier ) - { - return new ReadWriteEndPoint( address, Type.LEADER, identifier.toString() ); - } - - public static ReadWriteEndPoint follower( AdvertisedSocketAddress address, UUID identifier ) - { - return new ReadWriteEndPoint( address, Type.FOLLOWER, identifier.toString() ); - } - - static ReadWriteEndPoint readReplica(AdvertisedSocketAddress address) - { - return new ReadWriteEndPoint( address, Type.READ_REPLICA, null ); - } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java index 90ff1b1b038ff..a497bf109d55e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java @@ -67,8 +67,7 @@ public RawIterator apply( Context ctx, Object[] inp private Stream findAddresses() { ClusterTopology clusterTopology = discoveryService.currentTopology(); - return clusterTopology.coreMembers().stream() - .map( clusterTopology::coreAddresses ).map( CoreAddresses::getBoltServer ); + return clusterTopology.coreMemberAddresses().stream().map( CoreAddresses::getBoltServer ); } private int noOfAddressesToReturn( Object[] input ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 98ef12845148d..438271173365f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -98,7 +98,6 @@ import org.neo4j.coreedge.raft.state.term.MonitoredTermStateStorage; import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.VoteState; -import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember.CoreMemberMarshal; @@ -233,13 +232,12 @@ public void registerProcedures( Procedures procedures ) final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal(); int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size ); + long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle ); final SenderService senderService = new SenderService( new RaftChannelInitializer( marshal ), logProvider, platformModule.monitors, maxQueueSize, new NonBlockingChannels() ); life.add( senderService ); - AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address ); - final MessageLogger messageLogger; if ( config.get( CoreEdgeClusterSettings.raft_messages_log_enable ) ) { @@ -274,7 +272,7 @@ public void registerProcedures( Procedures procedures ) new CoreToCoreClient.ChannelInitializer( logProvider, nonBlockingChannels ); CoreToCoreClient coreToCoreClient = life.add( new CoreToCoreClient( logProvider, channelInitializer, platformModule.monitors, maxQueueSize, - nonBlockingChannels, discoveryService ) ); + nonBlockingChannels, discoveryService, logThresholdMillis ) ); channelInitializer.setOwner( coreToCoreClient ); StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache, @@ -284,8 +282,10 @@ public void registerProcedures( Procedures procedures ) GlobalSession myGlobalSession = new GlobalSession( UUID.randomUUID(), myself ); LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession ); ProgressTrackerImpl progressTracker = new ProgressTrackerImpl( myGlobalSession ); + RaftOutbound raftOutbound = + new RaftOutbound( discoveryService, senderService, localDatabase, logProvider, logThresholdMillis ); Outbound loggingOutbound = new LoggingOutbound<>( - new RaftOutbound( discoveryService, senderService, localDatabase ), myself, messageLogger ); + raftOutbound, myself, messageLogger ); RaftServer raftServer; CoreState coreState; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java index 27ee7ff1eafdc..ce080d01fa260 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java @@ -160,8 +160,10 @@ public EnterpriseEdgeEditionModule( final PlatformModule platformModule, NonBlockingChannels nonBlockingChannels = new NonBlockingChannels(); EdgeToCoreClient.ChannelInitializer channelInitializer = new EdgeToCoreClient.ChannelInitializer( logProvider, nonBlockingChannels ); int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size ); + long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle ); EdgeToCoreClient edgeToCoreClient = life.add( new EdgeToCoreClient( logProvider, - channelInitializer, platformModule.monitors, maxQueueSize, nonBlockingChannels, discoveryService ) ); + channelInitializer, platformModule.monitors, maxQueueSize, nonBlockingChannels, discoveryService, + logThresholdMillis ) ); channelInitializer.setOwner( edgeToCoreClient ); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java index 2f213d129bacc..9553eafd49d3c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java @@ -224,7 +224,7 @@ public void shouldRegisterEdgeServerInTopology() throws Exception client.registerEdgeServer( new AdvertisedSocketAddress( "localhost:7000" ) ); // then - assertEquals( 1, client.currentTopology().edgeMembers().size() ); + assertEquals( 1, client.currentTopology().edgeMemberAddresses().size() ); } private Member makeMember( int id ) throws UnknownHostException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitorTest.java new file mode 100644 index 0000000000000..e544c1ceb66b4 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/UnknownAddressMonitorTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.net; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import org.neo4j.logging.Log; +import org.neo4j.time.FakeClock; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.neo4j.coreedge.server.RaftTestMember.member; + +public class UnknownAddressMonitorTest +{ + @Test + public void shouldLogFirstFailure() throws Exception + { + // given + Log log = mock( Log.class ); + UnknownAddressMonitor logger = new UnknownAddressMonitor( log, new FakeClock(), 10000 ); + + // when + logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ); + + // then + verify( log ).info( anyString(), eq( member( 0 ) ) ); + } + + @Test + public void shouldThrottleLogging() throws Exception + { + // given + Log log = mock( Log.class ); + FakeClock clock = new FakeClock(); + UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 ); + + // when + logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ); + clock.forward( 1000, TimeUnit.MILLISECONDS ); + logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ); + + // then + verify( log, times( 1 ) ).info( anyString(), eq( member( 0 ) ) ); + } + + @Test + public void shouldResumeLoggingAfterQuietPeriod() throws Exception + { + // given + Log log = mock( Log.class ); + FakeClock clock = new FakeClock(); + UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 ); + + // when + logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ); + clock.forward( 11000, TimeUnit.MILLISECONDS ); + logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ); + + // then + verify( log, times( 2 ) ).info( anyString(), eq( member( 0 ) ) ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java index 5616c7a7446c9..0b318d76428b1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java @@ -33,16 +33,14 @@ import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.NoLeaderFoundException; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.logging.NullLogProvider; import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.coreedge.server.RaftTestMember.member; import static org.neo4j.coreedge.server.core.DiscoverMembersProcedureTest.addresses; import static org.neo4j.coreedge.server.core.DiscoverMembersProcedureTest.coreAddresses; import static org.neo4j.helpers.collection.Iterators.asList; @@ -56,7 +54,7 @@ public void shouldRecommendTheCoreLeaderForWriteAndEdgeForRead() throws Exceptio final CoreTopologyService topologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - CoreMember theLeader = new CoreMember( UUID.randomUUID() ); + CoreMember theLeader = member( 0 ); coreMembers.put( theLeader, coreAddresses( 0 ) ); final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 1 ) ); @@ -85,7 +83,7 @@ public void shouldOnlyRecommendOneReadServerEvenIfMultipleAreAvailable() throws final CoreTopologyService topologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - CoreMember theLeader = new CoreMember( UUID.randomUUID() ); + CoreMember theLeader = member( 0 ); coreMembers.put( theLeader, coreAddresses( 0 ) ); final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 1, 2, 3 ) ); @@ -111,7 +109,7 @@ public void shouldReturnCoreServerAsReadServerIfNoEdgeServersAvailable() throws final CoreTopologyService topologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - CoreMember theLeader = new CoreMember( UUID.randomUUID() ); + CoreMember theLeader = member( 0 ); coreMembers.put( theLeader, coreAddresses( 0 ) ); final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses() ); @@ -127,30 +125,63 @@ public void shouldReturnCoreServerAsReadServerIfNoEdgeServersAvailable() throws final List members = asList( procedure.apply( null, new Object[0] ) ); // then - List readAddresses = members.stream().filter( row -> row[1].equals( "read" ) ).collect( toList() ); + MatcherAssert.assertThat( members, containsInAnyOrder( + new Object[]{coreAddresses( 0 ).getRaftServer().toString(), "write"}, + new Object[]{coreAddresses( 0 ).getRaftServer().toString(), "read"} + ) ); + } + + @Test + public void shouldReturnNoWriteEndpointsIfThereIsNoLeader() throws Exception + { + // given + final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - assertEquals( 1, readAddresses.size() ); - assertArrayEquals( readAddresses.get( 0 ), new Object[]{coreAddresses( 0 ).getRaftServer().toString(), "read"} ); + Map coreMembers = new HashMap<>(); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); + + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses() ); + + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenThrow( new NoLeaderFoundException() ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, + NullLogProvider.getInstance() ); + + // when + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + assertEquals( 1, members.stream().filter( row1 -> row1[1].equals( "read" ) ).collect( toList() ).size() ); + assertEquals( 0, members.stream().filter( row1 -> row1[1].equals( "write" ) ).collect( toList() ).size() ); } @Test - public void shouldThrowExceptionIfThereIsNoLeader() throws Exception + public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws Exception { // given + final CoreTopologyService topologyService = mock( CoreTopologyService.class ); + + Map coreMembers = new HashMap<>(); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); + + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses() ); + + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); - when( leaderLocator.getLeader() ).thenThrow( NoLeaderFoundException.class ); + when( leaderLocator.getLeader() ).thenReturn( member( 1 ) ); - AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( - mock( CoreTopologyService.class ), leaderLocator, NullLogProvider.getInstance() ); + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, + NullLogProvider.getInstance() ); // when - try - { - procedure.apply( null, new Object[]{"bam"} ); - } - catch ( ProcedureException e ) - { - assertEquals( Status.Cluster.NoLeader, e.status() ); - } + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + assertEquals( 1, members.stream().filter( row1 -> row1[1].equals( "read" ) ).collect( toList() ).size() ); + assertEquals( 0, members.stream().filter( row1 -> row1[1].equals( "write" ) ).collect( toList() ).size() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedureTest.java index 7201801ce393e..385238a76877d 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/ClusterOverviewProcedureTest.java @@ -19,28 +19,26 @@ */ package org.neo4j.coreedge.server.core; +import org.junit.Test; + import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import org.junit.Test; - import org.neo4j.coreedge.discovery.ClusterTopology; import org.neo4j.coreedge.discovery.CoreAddresses; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.EdgeAddresses; import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.logging.NullLogProvider; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import static org.neo4j.coreedge.server.core.DiscoverMembersProcedureTest.addresses; import static org.neo4j.coreedge.server.core.DiscoverMembersProcedureTest.coreAddresses; import static org.neo4j.helpers.collection.Iterators.asList; @@ -70,7 +68,8 @@ public void shouldRecommendTheCoreLeaderForWriteAndEdgeForRead() throws Exceptio LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( theLeader ); - ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator ); + ClusterOverviewProcedure procedure = new ClusterOverviewProcedure( topologyService, leaderLocator, + NullLogProvider.getInstance() ); // when final List members = asList( procedure.apply( null, new Object[0] ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java index 830da3eef5180..8c7375939377e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import org.neo4j.coreedge.discovery.ClusterTopology; @@ -43,6 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.coreedge.server.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; public class DiscoverMembersProcedureTest @@ -54,11 +54,11 @@ public void shouldOnlyReturnCoreMembers() throws Exception final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 1 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 2 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 3 ) ); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); + coreMembers.put( member( 1 ), coreAddresses( 1 ) ); + coreMembers.put( member( 2 ), coreAddresses( 2 ) ); - final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 4, 5, 6 ) ); + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 3, 4, 5 ) ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); final DiscoverMembersProcedure proc = @@ -68,9 +68,11 @@ public void shouldOnlyReturnCoreMembers() throws Exception final List members = asList( proc.apply( null, new Object[0] ) ); // then - assertThat( members, containsInAnyOrder( new Object[]{coreAddresses( 1 ).getRaftServer().toString()}, - new Object[]{coreAddresses( 2 ).getRaftServer().toString()}, - new Object[]{coreAddresses( 3 ).getRaftServer().toString()} ) ); + assertThat( members, containsInAnyOrder( + new Object[]{coreAddresses( 0 ).getRaftServer().toString()}, + new Object[]{coreAddresses( 1 ).getRaftServer().toString()}, + new Object[]{coreAddresses( 2 ).getRaftServer().toString()} ) + ); } @Test @@ -79,9 +81,9 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 1 ) ); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); - final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 4, 5, 6 ) ); + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 3, 4, 5 ) ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); @@ -90,7 +92,7 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception final List members = asList( proc.apply( null, new Object[0] ) ); // then - assertArrayEquals( members.get( 0 ), new Object[]{coreAddresses( 1 ).getRaftServer().toString()} ); + assertArrayEquals( members.get( 0 ), new Object[]{coreAddresses( 0 ).getRaftServer().toString()} ); } @Test @@ -100,11 +102,11 @@ public void shouldReturnLimitedNumberOfAddresses() throws Exception final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 1 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 2 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 3 ) ); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); + coreMembers.put( member( 1 ), coreAddresses( 1 ) ); + coreMembers.put( member( 2 ), coreAddresses( 2 ) ); - final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 4, 5, 6 ) ); + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 3, 4, 5) ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); final DiscoverMembersProcedure proc = @@ -118,24 +120,24 @@ public void shouldReturnLimitedNumberOfAddresses() throws Exception } @Test - public void shouldReturnAllAddressesForStupidLimit() throws Exception + public void shouldReturnAllAddressesWhenLimitInNotNumeric() throws Exception { // given final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); Map coreMembers = new HashMap<>(); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 1 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 2 ) ); - coreMembers.put( new CoreMember( UUID.randomUUID() ), coreAddresses( 3 ) ); + coreMembers.put( member( 0 ), coreAddresses( 0 ) ); + coreMembers.put( member( 1 ), coreAddresses( 1 ) ); + coreMembers.put( member( 2 ), coreAddresses( 2 ) ); - final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 4, 5, 6 ) ); + final ClusterTopology clusterTopology = new ClusterTopology( false, coreMembers, addresses( 3, 4, 5 ) ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); // when - final List members = asList( proc.apply( null, new Object[]{"bam"} ) ); + final List members = asList( proc.apply( null, new Object[]{"not numeric"} ) ); // then assertEquals( 3, members.size() );