diff --git a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java index c155932e455de..deca80a9189da 100644 --- a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java +++ b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java @@ -458,6 +458,28 @@ public Code code() } } + enum Cluster implements Status + { + // transient errors + NoLeader( TransientError, + "No leader available at the moment. Retrying your request at a later time may succeed." ), + + ; + + private final Code code; + + @Override + public Code code() + { + return code; + } + + Cluster( Classification classification, String description ) + { + this.code = new Code( classification, this, description ); + } + } + Code code(); class Code diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java index c9f22e53ea138..ba491647de9ae 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java @@ -93,7 +93,8 @@ private Set toCoreMembers( Set members ) { coreMembers.add( new CoreMember( new AdvertisedSocketAddress( member.getStringAttribute( TRANSACTION_SERVER ) ), - new AdvertisedSocketAddress( member.getStringAttribute( RAFT_SERVER ) ) + new AdvertisedSocketAddress( member.getStringAttribute( RAFT_SERVER ) ), + new AdvertisedSocketAddress( member.getStringAttribute( BOLT_SERVER ) ) ) ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java index 504615e67c02f..332c1e566aa8f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java @@ -21,11 +21,11 @@ import java.util.Set; +import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftInstance.BootstrapException; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; -import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.membership.CoreMemberSet; +import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.lifecycle.LifecycleAdapter; public class RaftDiscoveryServiceConnector extends LifecycleAdapter implements CoreTopologyService.Listener diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index c0d99ed03a18f..5bc5ecb5a526b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -19,6 +19,14 @@ */ package org.neo4j.coreedge.raft; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; + import org.neo4j.coreedge.helper.VolatileFuture; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; @@ -42,16 +50,9 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; -import java.util.function.Supplier; - import static java.lang.String.format; import static java.util.Collections.singletonList; + import static org.neo4j.coreedge.raft.roles.Role.LEADER; /** @@ -203,7 +204,7 @@ public MEMBER getLeader() throws NoLeaderFoundException return waitForLeader( 0, member -> member != null ); } - public MEMBER waitForLeader( long timeoutMillis, Predicate predicate ) throws NoLeaderFoundException + private MEMBER waitForLeader( long timeoutMillis, Predicate predicate ) throws NoLeaderFoundException { try { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java index f47d3f7674b78..6dbbf233aae4e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java @@ -145,8 +145,9 @@ private CoreMember retrieveMember( ByteBuf buffer ) throws UnsupportedEncodingEx AdvertisedSocketAddress coreAddress = marshal.unmarshal( buffer ); AdvertisedSocketAddress raftAddress = marshal.unmarshal( buffer ); + AdvertisedSocketAddress boltAddress = marshal.unmarshal( buffer ); - return new CoreMember( coreAddress, raftAddress ); + return new CoreMember( coreAddress, raftAddress, boltAddress ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java index aa22c13a52ee8..55ebdb06fd47f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java @@ -85,8 +85,8 @@ else if ( message instanceof RaftMessages.AppendEntries.Request ) } else if ( message instanceof RaftMessages.AppendEntries.Response ) { - RaftMessages.AppendEntries.Response appendResponse = (RaftMessages.AppendEntries - .Response) message; + RaftMessages.AppendEntries.Response appendResponse = + (RaftMessages.AppendEntries.Response) message; buf.writeLong( appendResponse.term() ); buf.writeBoolean( appendResponse.success() ); @@ -126,5 +126,6 @@ private void writeMember( CoreMember member, ByteBuf buffer ) throws Unsupported marshal.marshal( member.getCoreAddress(), buffer ); marshal.marshal( member.getRaftAddress(), buffer ); + marshal.marshal( member.getBoltAddress(), buffer ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreMember.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreMember.java index 3a521826916e2..80eea0c2ece60 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreMember.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreMember.java @@ -36,17 +36,21 @@ public class CoreMember { private final AdvertisedSocketAddress coreAddress; private final AdvertisedSocketAddress raftAddress; + private final AdvertisedSocketAddress boltAddress; - public CoreMember( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress raftAddress ) + public CoreMember( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress raftAddress, + AdvertisedSocketAddress boltAddress) { this.coreAddress = coreAddress; this.raftAddress = raftAddress; + this.boltAddress = boltAddress; } @Override public String toString() { - return format( "CoreMember{coreAddress=%s, raftAddress=%s}", coreAddress, raftAddress ); + return format( "CoreMember{coreAddress=%s, raftAddress=%s, boltAddress=%s}", + coreAddress, raftAddress, boltAddress ); } @Override @@ -62,13 +66,14 @@ public boolean equals( Object o ) } CoreMember that = (CoreMember) o; return Objects.equals( coreAddress, that.coreAddress ) && - Objects.equals( raftAddress, that.raftAddress ); + Objects.equals( raftAddress, that.raftAddress ) && + Objects.equals( boltAddress, that.boltAddress ); } @Override public int hashCode() { - return Objects.hash( coreAddress, raftAddress ); + return Objects.hash( coreAddress, raftAddress, boltAddress ); } public AdvertisedSocketAddress getCoreAddress() @@ -81,6 +86,11 @@ public AdvertisedSocketAddress getRaftAddress() return raftAddress; } + public AdvertisedSocketAddress getBoltAddress() + { + return boltAddress; + } + /** * Format: * ┌────────────────────────────────────────────┐ @@ -95,6 +105,13 @@ public AdvertisedSocketAddress getRaftAddress() * │ │port 4 bytes││ * │ └─────────────────────────────┘│ * └────────────────────────────────────────────┘ + * │bolt address ┌─────────────────────────────┐│ + * │ │hostnameLength 4 bytes││ + * │ │hostnameBytes variable││ + * │ │port 4 bytes││ + * │ └─────────────────────────────┘│ + * └────────────────────────────────────────────┘ + * *

* This Marshal implementation can also serialize and deserialize null values. They are encoded as a CoreMember * with empty strings in the address fields, so they still adhere to the format displayed above. @@ -116,11 +133,13 @@ public void marshal( CoreMember member, ByteBuffer buffer ) { byteBufMarshal.marshal( NULL_ADDRESS, buffer ); byteBufMarshal.marshal( NULL_ADDRESS, buffer ); + byteBufMarshal.marshal( NULL_ADDRESS, buffer ); } else { byteBufMarshal.marshal( member.getCoreAddress(), buffer ); byteBufMarshal.marshal( member.getRaftAddress(), buffer ); + byteBufMarshal.marshal( member.getBoltAddress(), buffer ); } } @@ -129,14 +148,16 @@ public void marshal( CoreMember member, ByteBuf buffer ) { byteBufMarshal.marshal( member.getCoreAddress(), buffer ); byteBufMarshal.marshal( member.getRaftAddress(), buffer ); + byteBufMarshal.marshal( member.getBoltAddress(), buffer ); } public CoreMember unmarshal( ByteBuffer buffer ) { AdvertisedSocketAddress coreAddress = byteBufMarshal.unmarshal( buffer ); AdvertisedSocketAddress raftAddress = byteBufMarshal.unmarshal( buffer ); + AdvertisedSocketAddress boltAddress = byteBufMarshal.unmarshal( buffer ); - return dealWithPossibleNullAddress( coreAddress, raftAddress ); + return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress ); } @Override @@ -144,7 +165,8 @@ public CoreMember unmarshal( ByteBuf source ) { AdvertisedSocketAddress coreAddress = byteBufMarshal.unmarshal( source ); AdvertisedSocketAddress raftAddress = byteBufMarshal.unmarshal( source ); - return dealWithPossibleNullAddress( coreAddress, raftAddress ); + AdvertisedSocketAddress boltAddress = byteBufMarshal.unmarshal( source ); + return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress ); } @Override @@ -154,11 +176,13 @@ public void marshal( CoreMember member, WritableChannel channel ) throws IOExcep { channelMarshal.marshal( NULL_ADDRESS, channel ); channelMarshal.marshal( NULL_ADDRESS, channel ); + channelMarshal.marshal( NULL_ADDRESS, channel ); } else { channelMarshal.marshal( member.getCoreAddress(), channel ); channelMarshal.marshal( member.getRaftAddress(), channel ); + channelMarshal.marshal( member.getBoltAddress(), channel ); } } @@ -167,21 +191,26 @@ public CoreMember unmarshal( ReadableChannel source ) throws IOException { AdvertisedSocketAddress coreAddress = channelMarshal.unmarshal( source ); AdvertisedSocketAddress raftAddress = channelMarshal.unmarshal( source ); - return dealWithPossibleNullAddress( coreAddress, raftAddress ); - + AdvertisedSocketAddress boltAddress = channelMarshal.unmarshal( source ); + return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress ); } - private CoreMember dealWithPossibleNullAddress( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress - raftAddress ) + private CoreMember dealWithPossibleNullAddress( AdvertisedSocketAddress coreAddress, + AdvertisedSocketAddress raftAddress, + AdvertisedSocketAddress boltAddress) { - if ( coreAddress == null || raftAddress == null || (coreAddress.equals( NULL_ADDRESS ) && raftAddress - .equals( NULL_ADDRESS )) ) + if ( coreAddress == null || + raftAddress == null || + boltAddress == null || + (coreAddress.equals( NULL_ADDRESS ) && + raftAddress.equals( NULL_ADDRESS ) && + boltAddress.equals( NULL_ADDRESS )) ) { return null; } else { - return new CoreMember( coreAddress, raftAddress ); + return new CoreMember( coreAddress, raftAddress, boltAddress ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java new file mode 100644 index 0000000000000..5a799a0c8bb97 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedure.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.server.core; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.neo4j.collection.RawIterator; +import org.neo4j.coreedge.discovery.ClusterTopology; +import org.neo4j.coreedge.discovery.ReadOnlyTopologyService; +import org.neo4j.coreedge.raft.LeaderLocator; +import org.neo4j.coreedge.raft.NoLeaderFoundException; +import org.neo4j.coreedge.server.AdvertisedSocketAddress; +import org.neo4j.coreedge.server.BoltAddress; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.helpers.collection.Iterators; +import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.api.proc.CallableProcedure; +import org.neo4j.kernel.api.proc.ProcedureSignature; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static java.lang.Integer.parseInt; +import static java.util.stream.Collectors.toSet; + +import static org.neo4j.helpers.collection.Iterators.asList; +import static org.neo4j.helpers.collection.Iterators.asRawIterator; + +public class AcquireEndpointsProcedure extends CallableProcedure.BasicProcedure +{ + public static final String NAME = "acquireEndpoints"; + private final ReadOnlyTopologyService discoveryService; + private final LeaderLocator leaderLocator; + private final Log log; + + public AcquireEndpointsProcedure( ReadOnlyTopologyService discoveryService, + LeaderLocator leaderLocator, LogProvider logProvider ) + { + super( new ProcedureSignature( + new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, NAME ) ) ); + this.discoveryService = discoveryService; + this.leaderLocator = leaderLocator; + this.log = logProvider.getLog( getClass() ); + } + + @Override + public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + { + try + { + AdvertisedSocketAddress leader = leaderLocator.getLeader().getBoltAddress(); + Set writeEndpoints = writeEndpoints( leader ); + Set readEndpoints = readEndpoints( leader ); + + log.info( "Write: %s, Read: %s", + writeEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ), + readEndpoints.stream().map( ReadWriteEndPoint::address ).collect( toSet() ) ); + + return wrapUpEndpoints( writeEndpoints, readEndpoints ); + } + catch ( NoLeaderFoundException e ) + { + throw new ProcedureException( Status.Cluster.NoLeader, + "No write server found. This can happen during a leader switch. " ); + } + } + + private Set writeEndpoints( AdvertisedSocketAddress leader ) + { + return Stream.of( leader ).map( ReadWriteEndPoint::write ).collect( Collectors.toSet() ); + } + + private RawIterator wrapUpEndpoints( Set writeEndpoints, + Set readEndpoints ) + { + return Iterators.map( ( l ) -> new Object[]{l.address(), l.type()}, + asRawIterator( Stream.concat( writeEndpoints.stream(), readEndpoints.stream() ).iterator() ) ); + } + + private Set readEndpoints( AdvertisedSocketAddress leader ) throws NoLeaderFoundException + { + ClusterTopology clusterTopology = discoveryService.currentTopology(); + + Stream readEdge = boltAddressesFor( clusterTopology.edgeMembers() ); + Stream readCore = boltAddressesFor( clusterTopology.boltCoreMembers() ); + Stream readLeader = Stream.of( leader ); + + return Stream.concat(Stream.concat( readEdge, readCore ), readLeader).map( ReadWriteEndPoint::read ) + .limit( 1 ).collect( toSet() ); + } + + private Stream boltAddressesFor( Set boltAddresses ) + { + return boltAddresses.stream().map( BoltAddress::getBoltAddress ); + } + + public enum Type + { + READ, WRITE + } + + static class ReadWriteEndPoint + { + private final AdvertisedSocketAddress address; + private final Type type; + + public String address() + { + return address.toString(); + } + + public String type() + { + return type.toString().toLowerCase(); + } + + public ReadWriteEndPoint( AdvertisedSocketAddress address, Type type ) + { + this.address = address; + this.type = type; + } + + public static ReadWriteEndPoint write( AdvertisedSocketAddress address ) + { + return new ReadWriteEndPoint( address, Type.WRITE ); + } + + public static ReadWriteEndPoint read( AdvertisedSocketAddress address ) + { + return new ReadWriteEndPoint( address, Type.READ ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedure.java deleted file mode 100644 index e159a128aee1b..0000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedure.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.server.core; - -import java.util.Arrays; - -import org.neo4j.collection.RawIterator; -import org.neo4j.helpers.collection.Iterators; -import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.proc.CallableProcedure; -import org.neo4j.kernel.api.proc.ProcedureSignature; - -class DiscoverConsistencyLevelsProcedure extends CallableProcedure.BasicProcedure -{ - DiscoverConsistencyLevelsProcedure() - { - super( new ProcedureSignature( - new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, "discoverConsistencyLevels" ) - ) ); - } - - @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException - { - - RawIterator consistencyLevels = - Iterators.asRawIterator( - Arrays.asList( EnterpriseCoreEditionModule.ConsistencyLevel.values() ).iterator() ); - - return Iterators.map( ( l ) -> new Object[]{l.name()}, consistencyLevels ); - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java index b9acbb867d754..b5090f00ce2ee 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedure.java @@ -19,68 +19,73 @@ */ package org.neo4j.coreedge.server.core; -import java.util.Arrays; import java.util.Set; import java.util.stream.Stream; import org.neo4j.collection.RawIterator; -import org.neo4j.coreedge.discovery.ClusterTopology; -import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.ReadOnlyTopologyService; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.BoltAddress; import org.neo4j.helpers.collection.Iterators; import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.ProcedureSignature; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; -class DiscoverMembersProcedure extends CallableProcedure.BasicProcedure +import static java.lang.Integer.parseInt; +import static java.util.stream.Collectors.toSet; + +public class DiscoverMembersProcedure extends CallableProcedure.BasicProcedure { - private final CoreTopologyService discoveryService; + public static final String NAME = "discoverMembers"; + private final ReadOnlyTopologyService discoveryService; + private final Log log; - DiscoverMembersProcedure( CoreTopologyService discoveryService ) + public DiscoverMembersProcedure( ReadOnlyTopologyService discoveryService, LogProvider logProvider ) { super( new ProcedureSignature( - new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, "discoverMembers" ) ) ); + new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, NAME ) ) ); this.discoveryService = discoveryService; + this.log = logProvider.getLog( getClass() ); } @Override public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException { - final String consistencyLevelInput = input[0].toString(); - EnterpriseCoreEditionModule.ConsistencyLevel consistencyLevel; - try - { - consistencyLevel = EnterpriseCoreEditionModule.ConsistencyLevel.valueOf( consistencyLevelInput ); - } - catch ( IllegalArgumentException badEnum ) + Set addresses = findAddresses().map( BoltAddress::getBoltAddress ) + .limit( noOfAddressesToReturn( input ) ).collect( toSet() ); + log.info( "Discovery members: %s", addresses.stream().collect( toSet() ) ); + return wrapUpAddresses( addresses ); + } + + private Stream findAddresses() + { + return discoveryService.currentTopology().boltCoreMembers().stream(); + } + + private int noOfAddressesToReturn( Object[] input ) + { + if ( input.length == 0 ) { - throw new ProcedureException( Status.Procedure.ProcedureCallFailed, errorMessage( consistencyLevelInput ) ); + return Integer.MAX_VALUE; } - ClusterTopology clusterTopology = discoveryService.currentTopology(); - - if ( EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_CORE.equals( consistencyLevel ) ) + try { - return wrapUpAddresses( clusterTopology.boltCoreMembers() ); + return parseInt( input[0].toString() ); } - else // RYOW_EDGE otherwise + catch ( NumberFormatException e ) { - return wrapUpAddresses( clusterTopology.edgeMembers() ); + return Integer.MAX_VALUE; } } - private static RawIterator wrapUpAddresses( Set boltAddresses ) - { - Stream members = boltAddresses.stream().map( BoltAddress::getBoltAddress ); - return Iterators.map( ( l ) -> new Object[]{l.toString()}, Iterators.asRawIterator( members.iterator() ) ); - } - - private String errorMessage( String consistencyLevel ) + private static RawIterator wrapUpAddresses( + Set boltAddresses ) { - return String.format( "Invalid consistency level provided: [%s]. Valid consistency levels are: %s", - consistencyLevel , - Arrays.toString( EnterpriseCoreEditionModule.ConsistencyLevel.values() ) ); + return Iterators.map( ( l ) -> new Object[]{l.toString()}, + Iterators.asRawIterator( boltAddresses.stream().iterator() ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index e1d93f61f47dc..3862cf4644a64 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -110,6 +110,7 @@ import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager; import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState; import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenStateMachine; +import org.neo4j.coreedge.server.edge.EnterpriseEdgeEditionModule; import org.neo4j.coreedge.server.logging.BetterMessageLogger; import org.neo4j.coreedge.server.logging.MessageLogger; import org.neo4j.coreedge.server.logging.NullMessageLogger; @@ -172,6 +173,7 @@ public class EnterpriseCoreEditionModule private final CoreState coreState; private final CoreMember myself; private final CoreTopologyService discoveryService; + private final LogProvider logProvider; @Override public CoreMember id() @@ -217,23 +219,17 @@ public static RaftLogImplementation fromString( String value ) } } - public enum ConsistencyLevel - { - RYOW_CORE, - RYOW_EDGE - } - @Override public void registerProcedures( Procedures procedures ) { try { - procedures.register( new DiscoverConsistencyLevelsProcedure() ); - procedures.register( new DiscoverMembersProcedure( discoveryService ) ); + procedures.register( new DiscoverMembersProcedure( discoveryService, logProvider ) ); + procedures.register( new AcquireEndpointsProcedure( discoveryService, raft, logProvider ) ); } catch ( ProcedureException e ) { - e.printStackTrace(); + throw new RuntimeException( e ); } } @@ -258,7 +254,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, final LifeSupport life = platformModule.life; final GraphDatabaseFacade graphDatabaseFacade = platformModule.graphDatabaseFacade; - LogProvider logProvider = logging.getInternalLogProvider(); + logProvider = logging.getInternalLogProvider(); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); @@ -275,7 +271,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, myself = new CoreMember( config.get( CoreEdgeClusterSettings.transaction_advertised_address ), - config.get( CoreEdgeClusterSettings.raft_advertised_address ) + config.get( CoreEdgeClusterSettings.raft_advertised_address ), + new AdvertisedSocketAddress( EnterpriseEdgeEditionModule.extractBoltAddress(config).toString()) ); final MessageLogger messageLogger; @@ -352,7 +349,8 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session } CoreStateApplier applier = new CoreStateApplier( logProvider ); - CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, logProvider ); + CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, + logProvider ); coreState = new CoreState( raftLog, config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java index 5e6fdcc17b75a..79673bcb85129 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java @@ -96,6 +96,8 @@ */ public class EnterpriseEdgeEditionModule extends EditionModule { + private EdgeTopologyService discoveryService; + public EnterpriseEdgeEditionModule( final PlatformModule platformModule, DiscoveryServiceFactory discoveryServiceFactory ) { @@ -154,7 +156,7 @@ public EnterpriseEdgeEditionModule( final PlatformModule platformModule, LogProvider logProvider = platformModule.logging.getInternalLogProvider(); - EdgeTopologyService discoveryService = discoveryServiceFactory.edgeDiscoveryService( config, logProvider); + discoveryService = discoveryServiceFactory.edgeDiscoveryService( config, logProvider); life.add(dependencies.satisfyDependency( discoveryService )); Supplier transactionApplierSupplier = diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java index 8f46566ba5454..74f1112489971 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java @@ -122,7 +122,7 @@ txPuller, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ), private Set coreMembers( AdvertisedSocketAddress coreServerAddress ) { final Set coreMembers = new HashSet<>(); - coreMembers.add( new CoreMember( coreServerAddress, null ) ); + coreMembers.add( new CoreMember( coreServerAddress, null, null ) ); return coreMembers; } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/TestOnlyCoreTopologyService.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/TestOnlyCoreTopologyService.java index 6259376cab477..129771c1e274f 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/TestOnlyCoreTopologyService.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/TestOnlyCoreTopologyService.java @@ -19,25 +19,16 @@ */ package org.neo4j.coreedge.discovery; -import java.util.List; - import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.BoltAddress; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.server.core.NoBoltConnectivityException; import org.neo4j.coreedge.server.edge.EnterpriseEdgeEditionModule; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.helpers.HostnamePort; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; -import static java.util.stream.Collectors.toList; - -import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT; import static org.neo4j.helpers.Listeners.notifyListeners; import static org.neo4j.helpers.collection.Iterables.firstOrNull; -import static org.neo4j.kernel.configuration.GroupSettingSupport.enumerate; class TestOnlyCoreTopologyService extends LifecycleAdapter implements CoreTopologyService { @@ -70,7 +61,8 @@ private CoreMember toCoreMember( Config config ) { return new CoreMember( config.get( CoreEdgeClusterSettings.transaction_advertised_address ), - config.get( CoreEdgeClusterSettings.raft_advertised_address ) + config.get( CoreEdgeClusterSettings.raft_advertised_address ), + new AdvertisedSocketAddress( EnterpriseEdgeEditionModule.extractBoltAddress( config ).toString()) ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java index 855fb56bebd23..a3efa0cc17680 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java @@ -29,17 +29,16 @@ import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; import org.neo4j.coreedge.raft.membership.RaftTestGroup; -import org.neo4j.coreedge.server.RaftTestMember; -import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; -import org.neo4j.coreedge.raft.net.LoggingOutbound; import org.neo4j.coreedge.raft.net.Inbound; +import org.neo4j.coreedge.raft.net.LoggingOutbound; import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.roles.Role; -import org.neo4j.coreedge.server.logging.MessageLogger; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; import org.neo4j.coreedge.server.logging.NullMessageLogger; -import org.neo4j.helpers.Clock; import static java.lang.String.format; + import static org.neo4j.coreedge.server.RaftTestMember.member; public class RaftTestFixture @@ -47,17 +46,6 @@ public class RaftTestFixture private Members members = new Members(); public RaftTestFixture( DirectNetworking net, int expectedClusterSize, long... ids ) - { - this( Clock.SYSTEM_CLOCK, net, new NullMessageLogger<>(), expectedClusterSize, ids ); - } - - public RaftTestFixture( Clock clock, DirectNetworking net, int expectedClusterSize, long... ids ) - { - this( clock, net, new NullMessageLogger<>(), expectedClusterSize, ids ); - } - - public RaftTestFixture( Clock clock, DirectNetworking net, MessageLogger logger, - int expectedClusterSize, long... ids ) { for ( long id : ids ) { @@ -69,13 +57,14 @@ public RaftTestFixture( Clock clock, DirectNetworking net, MessageLogger outbound = new LoggingOutbound<>( net.new Outbound( id ), fixtureMember.member, new NullMessageLogger<>() ); + Outbound outbound = new LoggingOutbound<>( net.new Outbound( id ), fixtureMember.member, + new NullMessageLogger<>() ); fixtureMember.raftInstance = new RaftInstanceBuilder<>( fixtureMember.member, expectedClusterSize, RaftTestMemberSetBuilder.INSTANCE ) .inbound( inbound ) .outbound( outbound ) - .raftLog ( fixtureMember.raftLog ) + .raftLog( fixtureMember.raftLog ) .timeoutService( fixtureMember.timeoutService ) .build(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftContentByteBufferMarshalTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftContentByteBufferMarshalTest.java index 16512f5eb3162..11a3c1847b872 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftContentByteBufferMarshalTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftContentByteBufferMarshalTest.java @@ -48,7 +48,8 @@ public class RaftContentByteBufferMarshalTest { private CoreMember coreMember = new CoreMember( new AdvertisedSocketAddress( "core:1" ), - new AdvertisedSocketAddress( "raft:1" ) + new AdvertisedSocketAddress( "raft:1" ), + new AdvertisedSocketAddress( "bolt:1" ) ); @Test @@ -58,10 +59,10 @@ public void shouldSerializeMemberSet() throws Exception CoreReplicatedContentMarshal serializer = new CoreReplicatedContentMarshal(); CoreMemberSet in = new CoreMemberSet( asSet( new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:1002" ) + new AdvertisedSocketAddress( "host1:1002" ), new AdvertisedSocketAddress( "host1:1003" ) ), new CoreMember( new AdvertisedSocketAddress( "host2:1002" ), - new AdvertisedSocketAddress( "host2:1002" ) + new AdvertisedSocketAddress( "host2:1002" ), new AdvertisedSocketAddress( "host2:1003" ) ) ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/CoreMemberMarshalTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/CoreMemberMarshalTest.java index 91e86afd08bfc..00933b186538b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/CoreMemberMarshalTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/CoreMemberMarshalTest.java @@ -38,7 +38,7 @@ public void shouldSerializeAndDeserializeUsingByteBuffer() throws Exception CoreMember.CoreMemberMarshal marshal = new CoreMember.CoreMemberMarshal(); final CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) ); + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); // when final ByteBuffer buffer = ByteBuffer.allocate( 1_000 ); @@ -57,12 +57,12 @@ public void shouldManageNull() throws Exception CoreMember.CoreMemberMarshal marshal = new CoreMember.CoreMemberMarshal(); final CoreMember aRealMember = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) ); + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); final CoreMember aNullMember = null; final CoreMember anotherRealMember = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) ); + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); // when final ByteBuffer buffer = ByteBuffer.allocate( 1_000 ); @@ -89,7 +89,7 @@ public void shouldReturnNullForHalfWrittenInstance() throws Exception // a CoreMember and a ByteBuffer to write it to CoreMember.CoreMemberMarshal marshal = new CoreMember.CoreMemberMarshal(); final CoreMember aRealMember = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) ); + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); final ByteBuffer buffer = ByteBuffer.allocate( 1_000 ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java index fab8d1047fb5f..14624b6d952c3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java @@ -93,7 +93,7 @@ public void shouldEncodeAndDecodeVoteRequest() { // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), - new AdvertisedSocketAddress( "host1:9001" ) ); + new AdvertisedSocketAddress( "host1:9001" ), new AdvertisedSocketAddress( "host1:9002" ) ); RaftMessages.Vote.Request request = new RaftMessages.Vote.Request<>( member, 1, member, 1, 1 ); // when @@ -109,7 +109,7 @@ public void shouldEncodeAndDecodeVoteResponse() { // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), - new AdvertisedSocketAddress( "host1:9001" ) ); + new AdvertisedSocketAddress( "host1:9001" ), new AdvertisedSocketAddress( "host1:9002" ) ); RaftMessages.Vote.Response response = new RaftMessages.Vote.Response<>( member, 1, true ); // when @@ -125,7 +125,7 @@ public void shouldEncodeAndDecodeAppendEntriesRequest() { // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), - new AdvertisedSocketAddress( "host1:9001" ) ); + new AdvertisedSocketAddress( "host1:9001" ), new AdvertisedSocketAddress( "host1:9002" ) ); RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) ); RaftMessages.AppendEntries.Request request = new RaftMessages.AppendEntries.Request<>( member, 1, 1, 99, new RaftLogEntry[] { logEntry }, 1 ); @@ -143,7 +143,7 @@ public void shouldEncodeAndDecodeAppendEntriesResponse() { // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), - new AdvertisedSocketAddress( "host1:9001" ) ); + new AdvertisedSocketAddress( "host1:9001" ), new AdvertisedSocketAddress( "host1:9002" ) ); RaftMessages.AppendEntries.Response response = new RaftMessages.AppendEntries.Response<>( member, 1, false, -1, 0 ); @@ -160,7 +160,7 @@ public void shouldEncodeAndDecodeNewEntryRequest() { // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), - new AdvertisedSocketAddress( "host1:9001" ) ); + new AdvertisedSocketAddress( "host1:9001" ), new AdvertisedSocketAddress( "host1:9002" ) ); RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request<>( member, ReplicatedInteger.valueOf( 12 ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java index 098393536b7c8..db4f64abc51cd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java @@ -49,7 +49,7 @@ public class RaftMessageEncodingDecodingTest public void shouldSerializeAppendRequestWithMultipleEntries() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.AppendEntries.Request request = new AppendEntriesRequestBuilder() .from( sender ) .leader( sender ) @@ -65,7 +65,7 @@ public void shouldSerializeAppendRequestWithMultipleEntries() throws Exception public void shouldSerializeAppendRequestWithNoEntries() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.AppendEntries.Request request = new AppendEntriesRequestBuilder() .from( sender ) .leader( sender ) @@ -79,7 +79,7 @@ public void shouldSerializeAppendRequestWithNoEntries() throws Exception public void shouldSerializeAppendResponse() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.AppendEntries.Response request = new AppendEntriesResponseBuilder() .from( sender ) .success() @@ -102,7 +102,7 @@ public void shouldSerializeHeartbeats( ) throws Exception // When CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.Heartbeat message = new RaftMessages.Heartbeat<>( sender, 1, 2, 3 ); encoder.encode( setupContext(), message, resultingBuffers ); @@ -121,7 +121,7 @@ public void shouldSerializeHeartbeats( ) throws Exception public void shouldSerializeVoteRequest() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.Vote.Request request = new VoteRequestBuilder<>() .candidate( sender ) .from( sender ) @@ -136,7 +136,7 @@ public void shouldSerializeVoteRequest() throws Exception public void shouldSerializeVoteResponse() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), - new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); + new AdvertisedSocketAddress( "127.0.0.2:5001" ), new AdvertisedSocketAddress( "127.0.0.3:5001" ) ); RaftMessages.Vote.Response request = new VoteResponseBuilder<>() .from( sender ) .grant() diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/CoreReplicatedContentByteBufferMarshalTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/CoreReplicatedContentByteBufferMarshalTest.java index e003c6c954b6a..f707e8fc7ee93 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/CoreReplicatedContentByteBufferMarshalTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/CoreReplicatedContentByteBufferMarshalTest.java @@ -83,10 +83,10 @@ public void shouldMarshalMemberSet() throws Exception // given ByteBuf buffer = Unpooled.buffer(); ReplicatedContent message = new CoreMemberSet( asSet( - new CoreMember( new AdvertisedSocketAddress( "host_a:1" ), new AdvertisedSocketAddress( "host_a:2" ) - ), + new CoreMember( new AdvertisedSocketAddress( "host_a:1" ), new AdvertisedSocketAddress( "host_a:2" ), + new AdvertisedSocketAddress( "host_a:3" ) ), new CoreMember( new AdvertisedSocketAddress( "host_b:101" ), - new AdvertisedSocketAddress( "host_b:102" ) ) + new AdvertisedSocketAddress( "host_b:102" ), new AdvertisedSocketAddress( "host_c:102" ) ) ) ); // when @@ -104,7 +104,8 @@ public void shouldMarshalIdRangeRequest() throws Exception ReplicatedContent message = new ReplicatedIdAllocationRequest( new CoreMember( new AdvertisedSocketAddress( "host_a:1" ), - new AdvertisedSocketAddress( "host_a:2" ) + new AdvertisedSocketAddress( "host_a:2" ), + new AdvertisedSocketAddress( "host_a:3" ) ), IdType.PROPERTY, 100, 200 ); // when diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java index 69748546abbac..b820ab24f9bf5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java @@ -33,7 +33,10 @@ public class ReplicatedIdAllocationStateMachineTest { - private CoreMember me = new CoreMember( new AdvertisedSocketAddress( "a:1" ), new AdvertisedSocketAddress( "a:2" ) + private CoreMember me = new CoreMember( + new AdvertisedSocketAddress( "a:1" ), + new AdvertisedSocketAddress( "a:2" ), + new AdvertisedSocketAddress( "a:3" ) ); private IdType someType = IdType.NODE; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java index 617d0d5ed7855..ad57f03805420 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java @@ -37,9 +37,11 @@ public class ReplicatedIdRangeAcquirerTest { private final CoreMember memberA = - new CoreMember( new AdvertisedSocketAddress( "a:1" ), new AdvertisedSocketAddress( "a:2" ) ); + new CoreMember( new AdvertisedSocketAddress( "a:1" ), new AdvertisedSocketAddress( "a:2" ), + new AdvertisedSocketAddress( "a:3" ) ); private final CoreMember memberB = - new CoreMember( new AdvertisedSocketAddress( "b:1" ), new AdvertisedSocketAddress( "b:2" ) ); + new CoreMember( new AdvertisedSocketAddress( "b:1" ), new AdvertisedSocketAddress( "b:2" ), + new AdvertisedSocketAddress( "c:2" )); private final ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( new InMemoryStateStorage<>( new IdAllocationState() ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/LocalSessionPoolTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/LocalSessionPoolTest.java index 09d807cf6d717..984ff203fdbb1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/LocalSessionPoolTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/LocalSessionPoolTest.java @@ -32,7 +32,7 @@ public class LocalSessionPoolTest { private CoreMember coreMember = new CoreMember( new AdvertisedSocketAddress( "core:1" ), - new AdvertisedSocketAddress( "raft:1" ) ); + new AdvertisedSocketAddress( "raft:1" ), new AdvertisedSocketAddress( "bolt:1" )); private GlobalSession globalSession = new GlobalSession<>( UUID.randomUUID(), coreMember ); @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/VoteStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/VoteStateTest.java index d21461889159c..9a832bf46aae5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/VoteStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/VoteStateTest.java @@ -38,8 +38,10 @@ public void shouldStoreVote() throws Exception { // given VoteState voteState = new VoteState<>(); - CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) ); + CoreMember member = new CoreMember( + new AdvertisedSocketAddress( "host1:1001" ), + new AdvertisedSocketAddress( "host1:2001" ), + new AdvertisedSocketAddress( "host1:3001" )); // when voteState.update( member, 0 ); @@ -64,10 +66,10 @@ public void shouldUpdateVote() throws Exception // given VoteState voteState = new VoteState<>(); CoreMember member1 = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); CoreMember member2 = new CoreMember( new AdvertisedSocketAddress( "host2:1001" ), - new AdvertisedSocketAddress( "host2:2001" ) + new AdvertisedSocketAddress( "host2:2001" ), new AdvertisedSocketAddress( "host2:3001" ) ); // when @@ -84,7 +86,7 @@ public void shouldClearVote() throws Exception // given VoteState voteState = new VoteState<>(); CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:2001" ) ); voteState.update( member, 0 ); @@ -101,10 +103,10 @@ public void shouldNotUpdateVoteForSameTerm() throws Exception // given VoteState voteState = new VoteState<>(); CoreMember member1 = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); CoreMember member2 = new CoreMember( new AdvertisedSocketAddress( "host2:1001" ), - new AdvertisedSocketAddress( "host2:2001" ) + new AdvertisedSocketAddress( "host2:2001" ), new AdvertisedSocketAddress( "host2:3001" ) ); voteState.update( member1, 0 ); @@ -127,7 +129,7 @@ public void shouldNotClearVoteForSameTerm() throws Exception // given VoteState voteState = new VoteState<>(); CoreMember member1 = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); voteState.update( member1, 0 ); @@ -150,10 +152,10 @@ public void shouldReportNoUpdateWhenVoteStateUnchanged() throws Exception // given VoteState voteState = new VoteState<>(); CoreMember member1 = new CoreMember( new AdvertisedSocketAddress( "host1:1001" ), - new AdvertisedSocketAddress( "host1:2001" ) + new AdvertisedSocketAddress( "host1:2001" ), new AdvertisedSocketAddress( "host1:3001" ) ); CoreMember member2 = new CoreMember( new AdvertisedSocketAddress( "host2:1001" ), - new AdvertisedSocketAddress( "host2:2001" ) + new AdvertisedSocketAddress( "host2:2001" ), new AdvertisedSocketAddress( "host2:3001" ) ); // when diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java new file mode 100644 index 0000000000000..9f8d9a9490d28 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.scenarios; + +import java.io.File; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.server.core.AcquireEndpointsProcedure; +import org.neo4j.coreedge.server.core.DiscoverMembersProcedure; +import org.neo4j.kernel.api.KernelAPI; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.KernelTransaction.Type; +import org.neo4j.kernel.api.Statement; +import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; +import org.neo4j.test.rule.TargetDirectory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import static org.neo4j.helpers.collection.Iterators.asList; +import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName; +import static org.neo4j.kernel.api.security.AccessMode.Static.READ; + +public class ClusterDiscoveryIT +{ + @Rule + public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); + + private Cluster cluster; + + @After + public void shutdown() throws ExecutionException, InterruptedException + { + if ( cluster != null ) + { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void shouldDiscoverCoreClusterMembers() throws Exception + { + // given + File dbDir = dir.directory(); + + System.out.println( "dbDir = " + dbDir ); + + // when + cluster = Cluster.start( dbDir, 3, 0 ); + + // then + + List currentMembers; + for ( int i = 0; i < 3; i++ ) + { + currentMembers = discoverClusterMembers( cluster.getCoreServerById( i ) ); + assertThat( currentMembers, containsInAnyOrder( + new Object[]{"127.0.0.1:8000"}, + new Object[]{"127.0.0.1:8001"}, + new Object[]{"127.0.0.1:8002"} ) ); + } + + System.exit(1); + } + + @Test + public void shouldFindReadAndWriteServers() throws Exception + { + // given + File dbDir = dir.directory(); + + System.out.println( "dbDir = " + dbDir ); + + // when + cluster = Cluster.start( dbDir, 3, 1 ); + + // then + + List currentMembers; + for ( int i = 0; i < 3; i++ ) + { + currentMembers = endPoints( cluster.getCoreServerById( i ) ); + + assertEquals(1, currentMembers.stream().filter( x -> x[1].equals( "write" ) ).count()); + assertEquals(1, currentMembers.stream().filter( x -> x[1].equals( "read" ) ).count()); + } + + System.exit(1); + } + + @Test + public void shouldNotBeAbleToDiscoverFromEdgeMembers() throws Exception + { + // given + File dbDir = dir.directory(); + cluster = Cluster.start( dbDir, 3, 2 ); + + try + { + // when + discoverClusterMembers( cluster.getEdgeServerById( 0 ) ); + fail( "Should not be able to discover members from edge members" ); + } + catch ( ProcedureException ex ) + { + // then + assertThat( ex.getMessage(), containsString( "There is no procedure with the name" ) ); + } + } + + private List discoverClusterMembers( GraphDatabaseFacade db ) throws TransactionFailureException, org + .neo4j.kernel.api.exceptions.ProcedureException + { + KernelAPI kernel = db.getDependencyResolver().resolveDependency( KernelAPI.class ); + KernelTransaction transaction = kernel.newTransaction( Type.implicit, READ ); + Statement statement = transaction.acquireStatement(); + + // when + return asList( statement.readOperations().procedureCallRead( + procedureName( "dbms", "cluster", DiscoverMembersProcedure.NAME ), + new Object[0] ) ); + } + + private List endPoints( GraphDatabaseFacade db ) throws TransactionFailureException, org + .neo4j.kernel.api.exceptions.ProcedureException + { + KernelAPI kernel = db.getDependencyResolver().resolveDependency( KernelAPI.class ); + KernelTransaction transaction = kernel.newTransaction( Type.implicit, READ ); + Statement statement = transaction.acquireStatement(); + + // when + return asList( statement.readOperations().procedureCallRead( + procedureName( "dbms", "cluster", AcquireEndpointsProcedure.NAME ), + new Object[0] ) ); + } + +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerDiscoveryIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerDiscoveryIT.java deleted file mode 100644 index dbd9998bf7e19..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerDiscoveryIT.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.scenarios; - -import java.io.File; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import org.neo4j.coreedge.discovery.Cluster; -import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; -import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory; -import org.neo4j.coreedge.discovery.TestOnlyDiscoveryServiceFactory; -import org.neo4j.coreedge.server.core.CoreGraphDatabase; -import org.neo4j.kernel.api.KernelAPI; -import org.neo4j.kernel.api.KernelTransaction; -import org.neo4j.kernel.api.KernelTransaction.Type; -import org.neo4j.kernel.api.Statement; -import org.neo4j.test.rule.TargetDirectory; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; - -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_CORE; -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_EDGE; -import static org.neo4j.helpers.collection.Iterators.asList; -import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName; -import static org.neo4j.kernel.api.security.AccessMode.Static.READ; - -@RunWith( Parameterized.class ) -public class CoreServerDiscoveryIT -{ - @Rule - public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); - - private Cluster cluster; - - @Parameterized.Parameter - public DiscoveryServiceFactory discoveryServiceFactory; - - @Parameterized.Parameters( name = "{0}" ) - public static Iterable params() - { - return Arrays.asList( new Object[][]{ - {new HazelcastDiscoveryServiceFactory()}, - {new TestOnlyDiscoveryServiceFactory()}, - } ); - } - - @After - public void shutdown() throws ExecutionException, InterruptedException - { - if ( cluster != null ) - { - cluster.shutdown(); - cluster = null; - } - } - - @Test - public void shouldDiscoverClusterMembers() throws Exception - { - // given - File dbDir = dir.directory(); - cluster = Cluster.start( dbDir, 3, 2, discoveryServiceFactory ); - - CoreGraphDatabase db = cluster.getCoreServerById( 0 ); - KernelAPI kernel = db.getDependencyResolver().resolveDependency( KernelAPI.class ); - KernelTransaction transaction = kernel.newTransaction( Type.implicit, READ ); - Statement statement = transaction.acquireStatement(); - - // when - List levels = asList( statement.readOperations() - .procedureCallRead( procedureName( "dbms", "cluster", "discoverConsistencyLevels" ), new Object[0] ) ); - - // then - assertThat( levels, containsInAnyOrder( - new Object[]{RYOW_CORE.name()}, - new Object[]{RYOW_EDGE.name()} - ) ); - - // when - List coreMembers = asList( statement.readOperations().procedureCallRead( - procedureName( "dbms", "cluster", "discoverMembers" ), - new Object[]{RYOW_CORE.name()} ) ); - - // then - assertEquals( 3, coreMembers.size() ); - - assertThat( coreMembers, containsInAnyOrder( - new Object[]{"127.0.0.1:8000"}, - new Object[]{"127.0.0.1:8001"}, - new Object[]{"127.0.0.1:8002"} - ) ); - - // when - List edgeMembers = asList( statement.readOperations().procedureCallRead( - procedureName( "dbms", "cluster", "discoverMembers" ), - new Object[]{RYOW_EDGE.name()} ) ); - - // then - assertEquals( 2, edgeMembers.size() ); - - assertThat( edgeMembers, containsInAnyOrder( - new Object[]{"127.0.0.1:9000"}, - new Object[]{"127.0.0.1:9001"} - ) ); - } -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java new file mode 100644 index 0000000000000..ba3f18d452d09 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/AcquireEndpointsProcedureTest.java @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.server.core; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.hamcrest.MatcherAssert; +import org.junit.Test; + +import org.neo4j.coreedge.discovery.ClusterTopology; +import org.neo4j.coreedge.discovery.ReadOnlyTopologyService; +import org.neo4j.coreedge.raft.LeaderLocator; +import org.neo4j.coreedge.raft.NoLeaderFoundException; +import org.neo4j.coreedge.server.AdvertisedSocketAddress; +import org.neo4j.coreedge.server.BoltAddress; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.logging.NullLogProvider; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.neo4j.helpers.collection.Iterators.asList; + +public class AcquireEndpointsProcedureTest +{ + @Test + public void shouldRecommendTheCoreLeaderForWriteAndEdgeForRead() throws Exception + { + // given + final ReadOnlyTopologyService topologyService = mock( ReadOnlyTopologyService.class ); + + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.edgeMembers() ).thenReturn( addresses( 1 ) ); + + LeaderLocator leaderLocator = mock(LeaderLocator.class); + CoreMember theLeader = coreMemberAtBoltPort(9000); + when(leaderLocator.getLeader()).thenReturn( theLeader ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, + NullLogProvider.getInstance() ); + + // when + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + MatcherAssert.assertThat( members, containsInAnyOrder( + new Object[]{"127.0.0.1:9000", "write"}, + new Object[]{"127.0.0.1:3001", "read"} + ) ); + } + + @Test + public void shouldOnlyRecommendOneReadServerEvenIfMultipleAreAvailable() throws Exception + { + // given + final ReadOnlyTopologyService topologyService = mock( ReadOnlyTopologyService.class ); + + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.edgeMembers() ).thenReturn( addresses( 1, 2, 3 ) ); + + LeaderLocator leaderLocator = mock(LeaderLocator.class); + CoreMember theLeader = coreMemberAtBoltPort(9000); + when(leaderLocator.getLeader()).thenReturn( theLeader ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, NullLogProvider.getInstance() ); + + // when + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + assertEquals(1, members.stream().filter( row -> row[1].equals( "read" ) ).count()); + } + + @Test + public void shouldReturnCoreServerAsReadServerIfNoEdgeServersAvailable() throws Exception + { + // given + final ReadOnlyTopologyService topologyService = mock( ReadOnlyTopologyService.class ); + + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.edgeMembers() ).thenReturn( Collections.emptySet() ); + when( clusterTopology.boltCoreMembers() ).thenReturn( addresses( 1 ) ); + + LeaderLocator leaderLocator = mock(LeaderLocator.class); + CoreMember theLeader = coreMemberAtBoltPort(9000); + when(leaderLocator.getLeader()).thenReturn( theLeader ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, NullLogProvider.getInstance() ); + + // when + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + List readAddresses = members.stream().filter( row -> row[1].equals( "read" ) ).collect( toList() ); + + assertEquals(1, readAddresses.size()); + assertArrayEquals(readAddresses.get( 0 ), new Object[]{"127.0.0.1:3001", "read"}); + } + + @Test + public void shouldReturnLeaderAsReadServerIfThereAreNoCoreOrEdgeServers() throws Exception + { + // given + final ReadOnlyTopologyService topologyService = mock( ReadOnlyTopologyService.class ); + + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( topologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.edgeMembers() ).thenReturn( Collections.emptySet() ); + when( clusterTopology.boltCoreMembers() ).thenReturn( Collections.emptySet() ); + + LeaderLocator leaderLocator = mock(LeaderLocator.class); + CoreMember theLeader = coreMemberAtBoltPort(9000); + when(leaderLocator.getLeader()).thenReturn( theLeader ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( topologyService, leaderLocator, NullLogProvider.getInstance() ); + + // when + final List members = asList( procedure.apply( null, new Object[0] ) ); + + // then + List readAddresses = members.stream().filter( row -> row[1].equals( "read" ) ).collect( toList() ); + + assertEquals(1, readAddresses.size()); + assertArrayEquals(readAddresses.get( 0 ), new Object[]{"127.0.0.1:9000", "read"}); + } + + @Test + public void shouldThrowExceptionIfThereIsNoLeader() throws Exception + { + // given + LeaderLocator leaderLocator = mock(LeaderLocator.class); + when(leaderLocator.getLeader()).thenThrow( NoLeaderFoundException.class ); + + AcquireEndpointsProcedure procedure = new AcquireEndpointsProcedure( + mock( ReadOnlyTopologyService.class ), leaderLocator, NullLogProvider.getInstance() ); + + // when + try + { + procedure.apply( null, new Object[] { "bam" } ); + } + catch ( ProcedureException e ) + { + assertEquals( Status.Cluster.NoLeader, e.status() ); + } + } + + private CoreMember coreMemberAtBoltPort( int boltPort) + { + return new CoreMember( + null, + null, + new AdvertisedSocketAddress( "127.0.0.1:" + boltPort) ); + } + + private Set addresses( int... ids ) + { + return Arrays.stream(ids ).mapToObj( id -> + new BoltAddress( new AdvertisedSocketAddress( "127.0.0.1:" + (3000 + id) ) ) ).collect( toSet() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedureTest.java deleted file mode 100644 index 9a57406abc3d2..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverConsistencyLevelsProcedureTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.server.core; - -import java.util.List; - -import org.junit.Test; - -import org.neo4j.collection.RawIterator; -import org.neo4j.kernel.api.exceptions.ProcedureException; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_CORE; -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_EDGE; -import static org.neo4j.helpers.collection.Iterators.asList; - -public class DiscoverConsistencyLevelsProcedureTest -{ - @Test - public void shouldRoundTripConsistencyLevelEnum() throws Exception - { - // given - DiscoverConsistencyLevelsProcedure proc = new DiscoverConsistencyLevelsProcedure(); - - // when - final RawIterator rawIterator = proc.apply( null, new Object[0] ); - final List levels = asList( rawIterator ); - - // then - assertThat( levels, containsInAnyOrder( - new Object[]{RYOW_CORE.name()}, - new Object[]{RYOW_EDGE.name()} - ) ); - } -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java index 0709a6c08942a..4931814ce884e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/DiscoverMembersProcedureTest.java @@ -26,39 +26,38 @@ import org.junit.Test; import org.neo4j.coreedge.discovery.ClusterTopology; -import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.ReadOnlyTopologyService; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.BoltAddress; -import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.logging.NullLogProvider; -import static junit.framework.TestCase.fail; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_CORE; -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.ConsistencyLevel.RYOW_EDGE; import static org.neo4j.helpers.collection.Iterators.asList; public class DiscoverMembersProcedureTest { @Test - public void shouldDiscoverCoreMachinesBoltAddresses() throws Exception + public void shouldOnlyReturnCoreMembers() throws Exception { // given - final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); + final ReadOnlyTopologyService coreTopologyService = mock( ReadOnlyTopologyService.class ); final ClusterTopology clusterTopology = mock( ClusterTopology.class ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); - when( clusterTopology.boltCoreMembers() ).thenReturn( boltCoreMembers( 1, 2, 3 ) ); + when( clusterTopology.boltCoreMembers() ).thenReturn( addresses( 1, 2, 3 ) ); + when( clusterTopology.edgeMembers() ).thenReturn( addresses( 4, 5, 6 ) ); - final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService ); + final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); // when - final List members = asList( proc.apply( null, new Object[]{RYOW_CORE.name()} ) ); + final List members = asList( proc.apply( null, new Object[0] ) ); // then assertThat( members, containsInAnyOrder( @@ -68,62 +67,67 @@ public void shouldDiscoverCoreMachinesBoltAddresses() throws Exception } @Test - public void shouldDiscoverEdgeMachinesBoltAddresses() throws Exception + public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception { - // given - final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class ); + final ReadOnlyTopologyService coreTopologyService = mock( ReadOnlyTopologyService.class ); final ClusterTopology clusterTopology = mock( ClusterTopology.class ); when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); - when( clusterTopology.edgeMembers() ).thenReturn( edgeMembers( 1, 2, 3, 4, 5 ) ); + when( clusterTopology.boltCoreMembers() ).thenReturn( addresses( 1 ) ); - final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService ); + final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); // when - final List members = asList( proc.apply( null, new - Object[]{RYOW_EDGE.name()} ) ); + final List members = asList( proc.apply( null, new Object[0] ) ); // then - assertThat( members, containsInAnyOrder( - new Object[]{"localhost:3001"}, - new Object[]{"localhost:3002"}, - new Object[]{"localhost:3003"}, - new Object[]{"localhost:3004"}, - new Object[]{"localhost:3005"} ) ); + assertArrayEquals( members.get( 0 ), new Object[]{"localhost:3001"} ); } @Test - public void shouldThrowExceptionForUnknownConsistencyLevel() throws Exception + public void shouldReturnLimitedNumberOfAddresses() throws Exception { // given - try - { - // when - new DiscoverMembersProcedure( null ).apply( null, new Object[]{"FOOBAR"} ); - fail( "ProcedureException should have been thrown" ); - } - catch ( ProcedureException expected ) - { - // then - assertThat( expected.getMessage(), containsString( RYOW_CORE.name() ) ); - assertThat( expected.getMessage(), containsString( RYOW_EDGE.name() ) ); - } + final ReadOnlyTopologyService coreTopologyService = mock( ReadOnlyTopologyService.class ); + + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.boltCoreMembers() ).thenReturn( addresses( 1, 2, 3 ) ); + when( clusterTopology.edgeMembers() ).thenReturn( addresses( 4, 5, 6 ) ); + + final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); + + // when + final List members = asList( proc.apply( null, new Object[] { 1 } ) ); + + // then + assertEquals( 1, members.size() ); } - private Set edgeMembers( int... ids ) + @Test + public void shouldReturnAllAddressesForStupidLimit() throws Exception { - final HashSet edgeMembers = new HashSet<>(); + // given + final ReadOnlyTopologyService coreTopologyService = mock( ReadOnlyTopologyService.class ); - for ( int id : ids ) - { - edgeMembers.add( new BoltAddress( new AdvertisedSocketAddress( "localhost:" + (3000 + id) ) ) ); - } + final ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( coreTopologyService.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.boltCoreMembers() ).thenReturn( addresses( 1, 2, 3 ) ); + when( clusterTopology.edgeMembers() ).thenReturn( addresses( 4, 5, 6 ) ); + + final DiscoverMembersProcedure proc = new DiscoverMembersProcedure( coreTopologyService, NullLogProvider.getInstance() ); - return edgeMembers; + // when + final List members = asList( proc.apply( null, new Object[] { "bam" } ) ); + + // then + assertEquals( 3, members.size() ); } - private Set boltCoreMembers( int... ids ) + private Set addresses( int... ids ) { final HashSet coreMembers = new HashSet<>();