From f399eecf78214ec990e87e29bab984446909e267 Mon Sep 17 00:00:00 2001 From: Alistair Jones Date: Fri, 23 Sep 2016 13:41:05 +0100 Subject: [PATCH] Expose all connector addresses in dbms.cluster.overview --- .../ClientConnectorSettings.java | 19 ++ .../catchup/tx/TxPullRequestHandler.java | 7 - .../discovery/ClientConnectorAddresses.java | 177 ++++++++++++++++++ .../coreedge/discovery/CoreAddresses.java | 10 +- .../discovery/DiscoveryServiceFactory.java | 3 +- .../coreedge/discovery/EdgeAddresses.java | 14 +- .../coreedge/discovery/HazelcastClient.java | 14 +- .../discovery/HazelcastClusterTopology.java | 26 ++- .../HazelcastDiscoveryServiceFactory.java | 6 +- .../procedures/ClusterOverviewProcedure.java | 31 +-- .../procedures/GetServersProcedure.java | 15 +- .../edge/EnterpriseEdgeEditionModule.java | 10 +- .../ClientConnectorAddressesTest.java | 69 +++++++ .../coreedge/discovery/ClusterMember.java | 2 + .../coreedge/discovery/CoreClusterMember.java | 11 +- .../coreedge/discovery/EdgeClusterMember.java | 12 +- .../discovery/HazelcastClientTest.java | 82 ++++---- .../HazelcastClusterTopologyTest.java | 15 +- .../discovery/SharedDiscoveryCoreClient.java | 5 +- .../discovery/SharedDiscoveryEdgeClient.java | 6 +- .../discovery/SharedDiscoveryService.java | 9 +- .../ClusterOverviewProcedureTest.java | 12 +- .../procedures/GetServersProcedureTest.java | 15 +- .../coreedge/scenarios/ClusterOverviewIT.java | 118 ++++++++---- .../text/enterprise/conf/neo4j.conf | 15 +- 25 files changed, 519 insertions(+), 184 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClientConnectorAddresses.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClientConnectorAddressesTest.java diff --git a/community/dbms/src/main/java/org/neo4j/server/configuration/ClientConnectorSettings.java b/community/dbms/src/main/java/org/neo4j/server/configuration/ClientConnectorSettings.java index a75c433c17caf..609765b60cb83 100644 --- a/community/dbms/src/main/java/org/neo4j/server/configuration/ClientConnectorSettings.java +++ b/community/dbms/src/main/java/org/neo4j/server/configuration/ClientConnectorSettings.java @@ -1,3 +1,22 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ package org.neo4j.server.configuration; import java.util.List; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandler.java index 3b7b75e45a16d..3e5ecd6e12411 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandler.java @@ -119,11 +119,4 @@ else if ( transactionIdStore.getLastCommittedTransactionId() >= firstTxId ) monitor.increment(); protocol.expect( State.MESSAGE_TYPE ); } - - @Override - public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) - { - cause.printStackTrace(); - ctx.close(); - } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClientConnectorAddresses.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClientConnectorAddresses.java new file mode 100644 index 0000000000000..7b6717613825c --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/ClientConnectorAddresses.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.server.configuration.ClientConnectorSettings.HttpConnector.Encryption; + +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.bolt; +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.http; +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.https; +import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnectors; +import static org.neo4j.server.configuration.ClientConnectorSettings.httpConnector; + +public class ClientConnectorAddresses +{ + private final List connectorUris; + + public ClientConnectorAddresses( List connectorUris ) + { + this.connectorUris = connectorUris; + } + + static ClientConnectorAddresses extractFromConfig( Config config ) + { + List connectorUris = new ArrayList<>(); + + connectorUris.add( new ConnectorUri( bolt, boltConnectors( config ).stream().findFirst() + .map( boltConnector -> config.get( boltConnector.advertised_address ) ).orElseThrow( () -> + new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) ) ) ); + + connectorUris.add( new ConnectorUri( http, config.get( httpConnector( config, Encryption.NONE ).orElseThrow( + () -> new IllegalArgumentException( "An HTTP connector must be configured to run the server" ) ) + .advertised_address ) ) ); + + httpConnector( config, Encryption.TLS ) + .map( ( connector ) -> config.get( connector.advertised_address ) ) + .ifPresent( httpsAddress -> connectorUris.add( new ConnectorUri( https, httpsAddress ) ) ); + + return new ClientConnectorAddresses( connectorUris ); + } + + public AdvertisedSocketAddress getBoltAddress() + { + return connectorUris.stream().filter( connectorUri -> connectorUri.scheme == bolt ).findFirst().orElseThrow( + () -> new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) ) + .socketAddress; + } + + public List uriList() + { + return connectorUris.stream().map( ConnectorUri::toUri ).collect( Collectors.toList() ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + ClientConnectorAddresses that = (ClientConnectorAddresses) o; + return Objects.equals( connectorUris, that.connectorUris ); + } + + @Override + public int hashCode() + { + return Objects.hash( connectorUris ); + } + + @Override + public String toString() + { + return connectorUris.stream().map( ConnectorUri::toString ).collect( Collectors.joining( "," ) ); + } + + static ClientConnectorAddresses fromString( String value ) + { + return new ClientConnectorAddresses( Stream.of( value.split( "," ) ) + .map( ConnectorUri::fromString ).collect( Collectors.toList() ) ); + } + + public enum Scheme + { + bolt, http, https + } + + public static class ConnectorUri + { + private final Scheme scheme; + private final AdvertisedSocketAddress socketAddress; + + public ConnectorUri( Scheme scheme, AdvertisedSocketAddress socketAddress ) + { + this.scheme = scheme; + this.socketAddress = socketAddress; + } + + private URI toUri() + { + try + { + return new URI( scheme.name().toLowerCase(), null, socketAddress.getHostname(), socketAddress.getPort(), + null, null, null ); + } + catch ( URISyntaxException e ) + { + throw new IllegalArgumentException( e ); + } + } + + @Override + public String toString() + { + return toUri().toString(); + } + + private static ConnectorUri fromString( String string ) + { + URI uri = URI.create( string ); + return new ConnectorUri( Scheme.valueOf( uri.getScheme() ), + new AdvertisedSocketAddress( uri.getHost(), uri.getPort() ) ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + ConnectorUri that = (ConnectorUri) o; + return scheme == that.scheme && + Objects.equals( socketAddress, that.socketAddress ); + } + + @Override + public int hashCode() + { + return Objects.hash( scheme, socketAddress ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreAddresses.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreAddresses.java index bcb494229aced..07fd43727d2c7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreAddresses.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreAddresses.java @@ -25,14 +25,14 @@ public class CoreAddresses { private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress catchupServer; - private final AdvertisedSocketAddress boltServer; + private final ClientConnectorAddresses clientConnectorAddresses; public CoreAddresses( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer, - AdvertisedSocketAddress boltServer ) + ClientConnectorAddresses clientConnectorAddresses ) { this.raftServer = raftServer; this.catchupServer = catchupServer; - this.boltServer = boltServer; + this.clientConnectorAddresses = clientConnectorAddresses; } public AdvertisedSocketAddress getRaftServer() @@ -45,8 +45,8 @@ public AdvertisedSocketAddress getCatchupServer() return catchupServer; } - public AdvertisedSocketAddress getBoltServer() + public ClientConnectorAddresses getClientConnectorAddresses() { - return boltServer; + return clientConnectorAddresses; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java index dedb481a4d35f..df21b48f5f1d2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java @@ -21,7 +21,6 @@ import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.logging.LogProvider; @@ -31,6 +30,6 @@ public interface DiscoveryServiceFactory CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider ); - TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, + TopologyService edgeDiscoveryService( Config config, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/EdgeAddresses.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/EdgeAddresses.java index 4c911efee6ddf..6222f0be192c5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/EdgeAddresses.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/EdgeAddresses.java @@ -19,25 +19,23 @@ */ package org.neo4j.coreedge.discovery; -import org.neo4j.helpers.AdvertisedSocketAddress; - public class EdgeAddresses { - private final AdvertisedSocketAddress boltAddress; + private final ClientConnectorAddresses clientConnectorAddresses; - public EdgeAddresses( AdvertisedSocketAddress boltAddress ) + public EdgeAddresses( ClientConnectorAddresses clientConnectorAddresses ) { - this.boltAddress = boltAddress; + this.clientConnectorAddresses = clientConnectorAddresses; } - public AdvertisedSocketAddress getBoltAddress() + public ClientConnectorAddresses getClientConnectorAddresses() { - return boltAddress; + return clientConnectorAddresses; } @Override public String toString() { - return String.format( "EdgeAddresses{boltAddress=%s}", boltAddress ); + return String.format( "EdgeAddresses{clientConnectorAddresses=%s}", clientConnectorAddresses ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java index 4112e7f5761c0..e8d652ca52955 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java @@ -28,7 +28,7 @@ import com.hazelcast.spi.exception.RetryableIOException; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService; -import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -41,7 +41,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService { static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge"; private final Log log; - private final AdvertisedSocketAddress boltAddress; + private final ClientConnectorAddresses connectorAddresses; private final HazelcastConnector connector; private final RenewableTimeoutService renewableTimeoutService; private HazelcastInstance hazelcastInstance; @@ -49,14 +49,14 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService private final long edgeTimeToLiveTimeout; private final long edgeRefreshRate; - HazelcastClient( HazelcastConnector connector, LogProvider logProvider, AdvertisedSocketAddress boltAddress, + HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config, RenewableTimeoutService renewableTimeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate ) { this.connector = connector; this.renewableTimeoutService = renewableTimeoutService; this.edgeRefreshRate = edgeRefreshRate; this.log = logProvider.getLog( getClass() ); - this.boltAddress = boltAddress; + this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); this.edgeTimeToLiveTimeout = edgeTimeToLiveTimeout; } @@ -88,12 +88,12 @@ public void start() throws Throwable private Object addEdgeServer( HazelcastInstance hazelcastInstance ) { String uuid = hazelcastInstance.getLocalEndpoint().getUuid(); - String address = boltAddress.toString(); + String addresses = connectorAddresses.toString(); - log.debug( "Adding edge server into cluster (%s -> %s)", uuid, address ); + log.debug( "Adding edge server into cluster (%s -> %s)", uuid, addresses ); return hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ) - .put( uuid, address, edgeTimeToLiveTimeout, MILLISECONDS ); + .put( uuid, addresses, edgeTimeToLiveTimeout, MILLISECONDS ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java index 3e6b4b22bedf6..f6714d0309b87 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClusterTopology.java @@ -25,8 +25,13 @@ import java.util.Set; import java.util.UUID; +import com.hazelcast.config.MemberAttributeConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.IMap; +import com.hazelcast.core.Member; + import org.neo4j.coreedge.core.CoreEdgeClusterSettings; -import org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule; import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -34,12 +39,6 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; -import com.hazelcast.config.MemberAttributeConfig; -import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.IAtomicReference; -import com.hazelcast.core.IMap; -import com.hazelcast.core.Member; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toSet; @@ -54,7 +53,7 @@ class HazelcastClusterTopology static final String TRANSACTION_SERVER = "transaction_server"; static final String DISCOVERY_SERVER = "discovery_server"; static final String RAFT_SERVER = "raft_server"; - static final String BOLT_SERVER = "bolt_server"; + static final String CLIENT_CONNECTOR_ADDRESSES = "client_connector_addresses"; static EdgeTopology getEdgeTopology( HazelcastInstance hazelcastInstance, Log log ) { @@ -117,8 +116,7 @@ private static Set edgeMembers( HazelcastInstance hazelcastInstan return edgeServerMap .entrySet().stream() - .map( entry -> new EdgeAddresses( - socketAddress( entry.getValue() /*boltAddress*/, AdvertisedSocketAddress::new ) ) ) + .map( entry -> new EdgeAddresses( ClientConnectorAddresses.fromString( entry.getValue() ) ) ) .collect( toSet() ); } @@ -164,8 +162,8 @@ static MemberAttributeConfig buildMemberAttributes( MemberId myself, Config conf AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address ); memberAttributeConfig.setStringAttribute( RAFT_SERVER, raftAddress.toString() ); - AdvertisedSocketAddress boltAddress = EnterpriseEdgeEditionModule.extractBoltAddress( config ); - memberAttributeConfig.setStringAttribute( BOLT_SERVER, boltAddress.toString() ); + ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); + memberAttributeConfig.setStringAttribute( CLIENT_CONNECTOR_ADDRESSES, clientConnectorAddresses.toString() ); return memberAttributeConfig; } @@ -176,7 +174,7 @@ static Pair extractMemberAttributes( Member member ) return Pair.of( memberId, new CoreAddresses( socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ), socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ), - socketAddress( member.getStringAttribute( BOLT_SERVER ), AdvertisedSocketAddress::new ) ) - ); + ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) + ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java index 384f0e92ad71b..1a8abbeafc3d6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java @@ -22,7 +22,6 @@ import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.logging.LogProvider; @@ -38,12 +37,13 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself, } @Override - public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, + public TopologyService edgeDiscoveryService( Config config, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate ) { configureHazelcast( config ); - return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, boltAddress, timeoutService, + + return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, config, timeoutService, edgeTimeToLiveTimeout, edgeRefreshRate ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedure.java index 05c75b337bcc2..b5857bf3697b8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedure.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.discovery.procedures; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -28,12 +29,12 @@ import org.neo4j.collection.RawIterator; import org.neo4j.coreedge.core.consensus.LeaderLocator; import org.neo4j.coreedge.core.consensus.NoLeaderFoundException; +import org.neo4j.coreedge.discovery.ClientConnectorAddresses; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.EdgeAddresses; import org.neo4j.coreedge.discovery.NoKnownAddressesException; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Context; @@ -58,7 +59,7 @@ public ClusterOverviewProcedure( CoreTopologyService discoveryService, LeaderLocator leaderLocator, LogProvider logProvider ) { super( procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) ) - .out( "id", Neo4jTypes.NTString ).out( "address", Neo4jTypes.NTString ) + .out( "id", Neo4jTypes.NTString ).out( "address", Neo4jTypes.NTList( Neo4jTypes.NTString ) ) .out( "role", Neo4jTypes.NTString ) .description( "Overview of all currently accessible cluster members and their roles." ) .build() ); @@ -85,26 +86,26 @@ public RawIterator apply( Context ctx, Object[] inp for ( MemberId memberId : coreMembers ) { - AdvertisedSocketAddress boltServerAddress = null; + ClientConnectorAddresses clientConnectorAddresses = null; try { - boltServerAddress = coreTopology.find( memberId ).getBoltServer(); + clientConnectorAddresses = coreTopology.find( memberId ).getClientConnectorAddresses(); } catch ( NoKnownAddressesException e ) { log.debug( "No Address found for " + memberId ); } Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER; - endpoints.add( new ReadWriteEndPoint( boltServerAddress, role, memberId.getUuid() ) ); + endpoints.add( new ReadWriteEndPoint( clientConnectorAddresses, role, memberId.getUuid() ) ); } for ( EdgeAddresses edgeAddresses : discoveryService.edgeServers().members() ) { - endpoints.add( new ReadWriteEndPoint( edgeAddresses.getBoltAddress(), Role.READ_REPLICA ) ); + endpoints.add( new ReadWriteEndPoint( edgeAddresses.getClientConnectorAddresses(), Role.READ_REPLICA ) ); } - Collections.sort( endpoints, ( o1, o2 ) -> o1.address().toString().compareTo( o2.address().toString() ) ); + Collections.sort( endpoints, ( o1, o2 ) -> o1.addresses().toString().compareTo( o2.addresses().toString() ) ); - return map( ( l ) -> new Object[]{l.identifier().toString(), l.address().toString(), l.role().name()}, + return map( ( l ) -> new Object[]{l.identifier().toString(), l.addresses().uriList().stream().map( URI::toString ).toArray(), l.role().name()}, asRawIterator( endpoints.iterator() ) ); } @@ -112,13 +113,13 @@ private static class ReadWriteEndPoint { private static final UUID ZERO_ID = new UUID( 0, 0 ); - private final AdvertisedSocketAddress address; + private final ClientConnectorAddresses clientConnectorAddresses; private final Role role; private final UUID identifier; - public AdvertisedSocketAddress address() + public ClientConnectorAddresses addresses() { - return address; + return clientConnectorAddresses; } public Role role() @@ -131,14 +132,14 @@ UUID identifier() return identifier == null ? ZERO_ID : identifier; } - ReadWriteEndPoint( AdvertisedSocketAddress address, Role role ) + ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role ) { - this( address, role, null ); + this( clientConnectorAddresses, role, null ); } - ReadWriteEndPoint( AdvertisedSocketAddress address, Role role, UUID identifier ) + ReadWriteEndPoint( ClientConnectorAddresses clientConnectorAddresses, Role role, UUID identifier ) { - this.address = address; + this.clientConnectorAddresses = clientConnectorAddresses; this.role = role; this.identifier = identifier; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedure.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedure.java index b6b60ef26e69b..b2237ee6030b3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedure.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedure.java @@ -31,7 +31,7 @@ import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.consensus.LeaderLocator; import org.neo4j.coreedge.core.consensus.NoLeaderFoundException; -import org.neo4j.coreedge.discovery.CoreAddresses; +import org.neo4j.coreedge.discovery.ClientConnectorAddresses; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.EdgeAddresses; import org.neo4j.coreedge.discovery.NoKnownAddressesException; @@ -92,7 +92,8 @@ public RawIterator apply( Context ctx, Object[] inp try { AdvertisedSocketAddress leaderAddress = - discoveryService.coreServers().find( leaderLocator.getLeader() ).getBoltServer(); + discoveryService.coreServers().find( leaderLocator.getLeader() ) + .getClientConnectorAddresses().getBoltAddress(); writeEndpoints = writeEndpoints( leaderAddress ); } catch ( NoLeaderFoundException | NoKnownAddressesException e ) @@ -106,7 +107,8 @@ public RawIterator apply( Context ctx, Object[] inp private Set routeEndpoints() { Stream routers = - discoveryService.coreServers().addresses().stream().map( CoreAddresses::getBoltServer ); + discoveryService.coreServers().addresses().stream() + .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); return routers.map( ReadWriteRouteEndPoint::route ).collect( toSet() ); } @@ -119,9 +121,12 @@ private Set writeEndpoints( AdvertisedSocketAddress lead private Set readEndpoints() { Stream readEdge = - discoveryService.edgeServers().members().stream().map( EdgeAddresses::getBoltAddress ); + discoveryService.edgeServers().members().stream() + .map( EdgeAddresses::getClientConnectorAddresses ) + .map( ClientConnectorAddresses::getBoltAddress ); Stream readCore = - discoveryService.coreServers().addresses().stream().map( CoreAddresses::getBoltServer ); + discoveryService.coreServers().addresses().stream() + .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); return concat( readEdge, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toSet() ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java index b58b8df98755d..3a24ec3c8f3f7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java @@ -192,8 +192,7 @@ protected Log authManagerLog() long edgeRefreshRate = config.get( CoreEdgeClusterSettings.edge_refresh_rate ); TopologyService discoveryService = discoveryServiceFactory.edgeDiscoveryService( config, - extractBoltAddress( config ), logProvider, refreshEdgeTimeoutService, edgeTimeToLiveTimeout, - edgeRefreshRate ); + logProvider, refreshEdgeTimeoutService, edgeTimeToLiveTimeout, edgeRefreshRate ); life.add( dependencies.satisfyDependency( discoveryService ) ); Clock clock = Clocks.systemClock(); @@ -251,13 +250,6 @@ protected Log authManagerLog() dependencies.satisfyDependency( createSessionTracker() ); } - public static AdvertisedSocketAddress extractBoltAddress( Config config ) - { - return boltConnectors( config ).stream().findFirst() - .map( boltConnector -> config.get( boltConnector.advertised_address ) ).orElseThrow( () -> - new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) ); - } - private void registerRecovery( final DatabaseInfo databaseInfo, LifeSupport life, final DependencyResolver dependencyResolver ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClientConnectorAddressesTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClientConnectorAddressesTest.java new file mode 100644 index 0000000000000..7359005bd5cba --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClientConnectorAddressesTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import org.junit.Test; + +import org.neo4j.coreedge.discovery.ClientConnectorAddresses.ConnectorUri; +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static java.util.Arrays.asList; + +import static org.junit.Assert.assertEquals; + +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.bolt; +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.http; +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.https; + +public class ClientConnectorAddressesTest +{ + @Test + public void shouldSerializeToString() throws Exception + { + // given + ClientConnectorAddresses connectorAddresses = new ClientConnectorAddresses( asList( + new ConnectorUri( bolt, new AdvertisedSocketAddress( "host", 1 ) ), + new ConnectorUri( http, new AdvertisedSocketAddress( "host", 2 ) ), + new ConnectorUri( https, new AdvertisedSocketAddress( "host", 3 ) ) ) + ); + + // when + ClientConnectorAddresses out = ClientConnectorAddresses.fromString( connectorAddresses.toString() ); + + // then + assertEquals( connectorAddresses, out ); + } + + @Test + public void shouldSerializeWithNoHttpsAddress() throws Exception + { + // given + ClientConnectorAddresses connectorAddresses = new ClientConnectorAddresses( asList( + new ConnectorUri( bolt, new AdvertisedSocketAddress( "host", 1 ) ), + new ConnectorUri( http, new AdvertisedSocketAddress( "host", 2 ) ) + ) ); + + // when + ClientConnectorAddresses out = ClientConnectorAddresses.fromString( connectorAddresses.toString() ); + + // then + assertEquals( connectorAddresses, out ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java index bb6086aecd621..eaf6f0fc53f01 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java @@ -28,4 +28,6 @@ public interface ClusterMember void shutdown(); T database(); + + ClientConnectorAddresses clientConnectorAddresses(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java index ab8bb78c36775..e12f91e9c1661 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java @@ -31,16 +31,18 @@ import org.neo4j.coreedge.core.consensus.log.segmented.FileNames; import org.neo4j.coreedge.core.state.CoreState; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.GraphDatabaseDependencies; +import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Level; import org.neo4j.server.configuration.ClientConnectorSettings; import org.neo4j.server.configuration.ClientConnectorSettings.HttpConnector.Encryption; import static java.lang.String.format; import static java.util.stream.Collectors.joining; + import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -83,7 +85,6 @@ public CoreClusterMember( int serverId, int clusterSize, config.put( GraphDatabaseSettings.record_format.name(), recordFormat ); config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); - config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + boltPort ); config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).listen_address.name(), "127.0.0.1:" + boltPort ); boltAdvertisedAddress = "127.0.0.1:" + boltPort; config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), boltAdvertisedAddress ); @@ -183,4 +184,10 @@ public int serverId() { return serverId; } + + @Override + public ClientConnectorAddresses clientConnectorAddresses() + { + return ClientConnectorAddresses.extractFromConfig( new Config( this.config ) ); + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java index 546a516f61b52..933468e841910 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java @@ -29,6 +29,7 @@ import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.GraphDatabaseDependencies; +import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Level; import org.neo4j.server.configuration.ClientConnectorSettings; import org.neo4j.server.configuration.ClientConnectorSettings.HttpConnector.Encryption; @@ -51,7 +52,7 @@ public EdgeClusterMember( File parentDir, int memberId, DiscoveryServiceFactory Map> instanceExtraParams, String recordFormat ) { this.memberId = memberId; - int boltPort = 8900 + memberId; + int boltPort = 9000 + memberId; int httpPort = 11000 + memberId; String initialHosts = coreMemberHazelcastAddresses.stream() @@ -72,12 +73,13 @@ public EdgeClusterMember( File parentDir, int memberId, DiscoveryServiceFactory config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); - config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + boltPort ); + config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).listen_address.name(), "127.0.0.1:" + boltPort ); boltAdvertisedAddress = "127.0.0.1:" + boltPort; config.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), boltAdvertisedAddress ); config.put( new ClientConnectorSettings.HttpConnector( "http", Encryption.NONE ).type.name(), "HTTP" ); config.put( new ClientConnectorSettings.HttpConnector( "http", Encryption.NONE ).enabled.name(), "true" ); config.put( new ClientConnectorSettings.HttpConnector( "http", Encryption.NONE ).listen_address.name(), "127.0.0.1:" + httpPort ); + config.put( new ClientConnectorSettings.HttpConnector( "http", Encryption.NONE ).advertised_address.name(), "127.0.0.1:" + httpPort ); File neo4jHome = new File( parentDir, "server-edge-" + memberId ); config.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() ); @@ -121,6 +123,12 @@ public EdgeGraphDatabase database() return database; } + @Override + public ClientConnectorAddresses clientConnectorAddresses() + { + return ClientConnectorAddresses.extractFromConfig( new Config( this.config ) ); + } + public File storeDir() { return storeDir; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java index b7c86c9d60508..1ab29b339aba2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java @@ -19,6 +19,27 @@ */ package org.neo4j.coreedge.discovery; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Spliterator; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + import com.hazelcast.core.Client; import com.hazelcast.core.ClientService; import com.hazelcast.core.Cluster; @@ -48,34 +69,15 @@ import com.hazelcast.query.Predicate; import org.junit.Test; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Spliterator; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Function; - import org.neo4j.coreedge.core.consensus.schedule.ControlledRenewableTimeoutService; -import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; import static java.lang.String.format; + import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -84,8 +86,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import static org.neo4j.coreedge.discovery.HazelcastClient.REFRESH_EDGE; -import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.BOLT_SERVER; +import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.CLIENT_CONNECTOR_ADDRESSES; import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.MEMBER_UUID; import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.RAFT_SERVER; import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.TRANSACTION_SERVER; @@ -93,14 +96,28 @@ public class HazelcastClientTest { - private static final AdvertisedSocketAddress ADDRESS = new AdvertisedSocketAddress( "localhost", 7000 ); + private Config config() + { + Config defaults = Config.defaults(); + + HashMap settings = new HashMap<>(); + settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), "bolt:3001" ); + + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).type.name(), "HTTP" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).enabled.name(), "true" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).advertised_address.name(), "http:3001" ); + + return defaults.augment( settings ); + } @Test public void shouldReturnTopologyUsingHazelcastMembers() throws Exception { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, new + HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new ControlledRenewableTimeoutService(), 60_000, 5_000 ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); @@ -128,7 +145,7 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, new + HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new ControlledRenewableTimeoutService(), 60_000, 5_000 ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); @@ -171,7 +188,7 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Excep when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); - HazelcastClient client = new HazelcastClient( connector, logProvider, ADDRESS, new + HazelcastClient client = new HazelcastClient( connector, logProvider, config(), new ControlledRenewableTimeoutService(), 60_000, 5_000 ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); @@ -193,7 +210,7 @@ public void shouldReturnEmptyTopologyIfInitiallyConnectedToHazelcastButItsNowUna { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, new + HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new ControlledRenewableTimeoutService(), 60_000, 5_000 ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); @@ -215,7 +232,7 @@ public void shouldReconnectIfHazelcastUnavailable() throws Exception { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, new + HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new ControlledRenewableTimeoutService(), 60_000, 5_000 ); HazelcastInstance hazelcastInstance1 = mock( HazelcastInstance.class ); @@ -285,7 +302,7 @@ public void shouldRegisterEdgeServerInTopology() throws Throwable when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, + HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), renewableTimeoutService, 60_000, 5_000 ); hazelcastClient.start(); @@ -329,7 +346,7 @@ public void shouldRemoveEdgeServersOnGracefulShutdown() throws Throwable when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), ADDRESS, + HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), renewableTimeoutService, 60_000, 5_000 ); hazelcastClient.start(); @@ -349,7 +366,8 @@ private Member makeMember( int id ) throws UnknownHostException when( member.getStringAttribute( MEMBER_UUID ) ).thenReturn( UUID.randomUUID().toString() ); when( member.getStringAttribute( TRANSACTION_SERVER ) ).thenReturn( format( "host%d:%d", id, (7000 + id) ) ); when( member.getStringAttribute( RAFT_SERVER ) ).thenReturn( format( "host%d:%d", id, (6000 + id) ) ); - when( member.getStringAttribute( BOLT_SERVER ) ).thenReturn( format( "host%d:%d", id, (5000 + id) ) ); + when( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) + .thenReturn( format( "bolt://host%d:%d,http://host%d:%d", id, (5000 + id), id, (5000 + id) ) ); return member; } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java index e496d409a67c3..482c47153367a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java @@ -62,6 +62,9 @@ public void shouldStoreMemberIdentityAndAddressesAsMemberAttributes() throws Exc settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), "bolt:3001" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).type.name(), "HTTP" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).enabled.name(), "true" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).advertised_address.name(), "http:3001" ); config.augment( settings ); // when @@ -74,7 +77,7 @@ public void shouldStoreMemberIdentityAndAddressesAsMemberAttributes() throws Exc CoreAddresses addresses = extracted.other(); assertEquals( new AdvertisedSocketAddress( "tx", 1001 ), addresses.getCatchupServer() ); assertEquals( new AdvertisedSocketAddress( "raft", 2001 ), addresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", 3001 ), addresses.getBoltServer() ); + assertEquals( new AdvertisedSocketAddress( "bolt", 3001 ), addresses.getClientConnectorAddresses().getBoltAddress() ); } @Test @@ -94,6 +97,10 @@ public void shouldCollectMembersAsAMap() throws Exception settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), "bolt:" + (i + 1) ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).type.name(), "HTTP" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).enabled.name(), "true" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).advertised_address.name(), "http:" + (i + 1) ); + config.augment( settings ); Map attributes = buildMemberAttributes( memberId, config ).getAttributes(); hazelcastMembers.add( new MemberImpl( new Address( "localhost", i ), null, attributes, false ) ); @@ -110,7 +117,7 @@ public void shouldCollectMembersAsAMap() throws Exception CoreAddresses coreAddresses = coreMemberMap.get( coreMembers.get( i ) ); assertEquals( new AdvertisedSocketAddress( "tx", (i + 1) ), coreAddresses.getCatchupServer() ); assertEquals( new AdvertisedSocketAddress( "raft", (i + 1) ), coreAddresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreAddresses.getBoltServer() ); + assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreAddresses.getClientConnectorAddresses().getBoltAddress() ); } } @@ -129,6 +136,10 @@ public void shouldLogAndExcludeMembersWithMissingAttributes() throws Exception settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); settings.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), "bolt:" + (i + 1) ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).type.name(), "HTTP" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).enabled.name(), "true" ); + settings.put( new GraphDatabaseSettings.BoltConnector( "http" ).advertised_address.name(), "http:" + (i + 1) ); + config.augment( settings ); Map attributes = buildMemberAttributes( memberId, config ).getAttributes(); if ( i == 2 ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java index 1bc739b1f2eda..b1f739a64deef 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java @@ -26,7 +26,6 @@ import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; @@ -118,8 +117,8 @@ private static CoreAddresses extractAddresses( Config config ) { AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address ); AdvertisedSocketAddress transactionSource = config.get( CoreEdgeClusterSettings.transaction_advertised_address ); - AdvertisedSocketAddress boltAddress = EnterpriseEdgeEditionModule.extractBoltAddress( config ); + ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); - return new CoreAddresses( raftAddress, transactionSource, boltAddress ); + return new CoreAddresses( raftAddress, transactionSource, clientConnectorAddresses ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java index 37c4480e6e692..9be316a2dc95a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java @@ -19,7 +19,7 @@ */ package org.neo4j.coreedge.discovery; -import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -30,11 +30,11 @@ class SharedDiscoveryEdgeClient extends LifecycleAdapter implements TopologyServ private final EdgeAddresses addresses; private final Log log; - SharedDiscoveryEdgeClient( SharedDiscoveryService sharedDiscoveryService, AdvertisedSocketAddress boltAddress, + SharedDiscoveryEdgeClient( SharedDiscoveryService sharedDiscoveryService, Config config, LogProvider logProvider ) { this.sharedDiscoveryService = sharedDiscoveryService; - this.addresses = new EdgeAddresses( boltAddress ); + this.addresses = new EdgeAddresses( ClientConnectorAddresses.extractFromConfig( config ) ); this.log = logProvider.getLog( getClass() ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java index 54b44590b4ad5..4f4d1d40ec58d 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java @@ -33,7 +33,6 @@ import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.logging.LogProvider; @@ -62,11 +61,11 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself, } @Override - public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, - LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, - long edgeRefreshRate ) + public TopologyService edgeDiscoveryService( Config config, LogProvider logProvider, + DelayedRenewableTimeoutService timeoutService, + long edgeTimeToLiveTimeout, long edgeRefreshRate ) { - return new SharedDiscoveryEdgeClient( this, boltAddress, logProvider ); + return new SharedDiscoveryEdgeClient( this, config, logProvider ); } void waitForClusterFormation() throws InterruptedException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedureTest.java index 98ea708c8a33e..545d03932881e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/ClusterOverviewProcedureTest.java @@ -48,7 +48,7 @@ public class ClusterOverviewProcedureTest { @Test - public void shouldProvideOverivewOfCoreAndEdgeServers() throws Exception + public void shouldProvideOverviewOfCoreAndEdgeServers() throws Exception { // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); @@ -78,11 +78,11 @@ public void shouldProvideOverivewOfCoreAndEdgeServers() throws Exception // then assertThat( members, IsIterableContainingInOrder.contains( - new Object[]{theLeader.getUuid().toString(), "localhost:3000", "LEADER"}, - new Object[]{follower1.getUuid().toString(), "localhost:3001", "FOLLOWER"}, - new Object[]{follower2.getUuid().toString(), "localhost:3002", "FOLLOWER"}, - new Object[]{"00000000-0000-0000-0000-000000000000", "localhost:3004", "READ_REPLICA"}, - new Object[]{"00000000-0000-0000-0000-000000000000", "localhost:3005", "READ_REPLICA"} + new Object[]{theLeader.getUuid().toString(), new String[] {"bolt://localhost:3000"}, "LEADER"}, + new Object[]{follower1.getUuid().toString(), new String[] {"bolt://localhost:3001"}, "FOLLOWER"}, + new Object[]{follower2.getUuid().toString(), new String[] {"bolt://localhost:3002"}, "FOLLOWER"}, + new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:3004"}, "READ_REPLICA"}, + new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:3005"}, "READ_REPLICA"} ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedureTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedureTest.java index 2024f0e34a925..5e1478d9a30cb 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedureTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/procedures/GetServersProcedureTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,8 @@ import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.consensus.LeaderLocator; import org.neo4j.coreedge.core.consensus.NoLeaderFoundException; +import org.neo4j.coreedge.discovery.ClientConnectorAddresses; +import org.neo4j.coreedge.discovery.ClientConnectorAddresses.ConnectorUri; import org.neo4j.coreedge.discovery.CoreAddresses; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; @@ -47,6 +50,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -54,6 +59,8 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + +import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.bolt; import static org.neo4j.coreedge.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.kernel.api.proc.Neo4jTypes.NTMap; @@ -261,7 +268,7 @@ public void shouldReturnTheCoreLeaderForWriteAndEdgesAndCoresForReads() throws E assertThat( readServers.get( "role" ), equalTo( "READ" ) ); assertThat( asList( readServers.get( "addresses" ) ), containsInAnyOrder( coreAddresses( 0 ).getRaftServer().toString(), - edgeAddresses( 1 ).getBoltAddress().toString() ) ); + edgeAddresses( 1 ).getClientConnectorAddresses().getBoltAddress().toString() ) ); Map routingServers = servers.get( 2 ); assertThat( routingServers.get( "role" ), equalTo( "ROUTE" ) ); @@ -403,12 +410,14 @@ static Set addresses( int... ids ) static CoreAddresses coreAddresses( int id ) { AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); - return new CoreAddresses( advertisedSocketAddress, advertisedSocketAddress, advertisedSocketAddress ); + return new CoreAddresses( advertisedSocketAddress, advertisedSocketAddress, + new ClientConnectorAddresses( singletonList( new ConnectorUri( bolt, advertisedSocketAddress ) ) ) ); } private static EdgeAddresses edgeAddresses( int id ) { AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); - return new EdgeAddresses( advertisedSocketAddress ); + return new EdgeAddresses( + new ClientConnectorAddresses( singletonList( new ConnectorUri( bolt, advertisedSocketAddress ) ) ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java index 37e591e44faca..d45907dd64889 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java @@ -19,14 +19,18 @@ */ package org.neo4j.coreedge.scenarios; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.Set; +import org.hamcrest.Description; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,6 +38,7 @@ import org.neo4j.collection.RawIterator; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.ClusterMember; import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.coreedge.discovery.SharedDiscoveryService; import org.neo4j.coreedge.discovery.procedures.ClusterOverviewProcedure; @@ -48,10 +53,16 @@ import org.neo4j.test.coreedge.ClusterRule; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.neo4j.coreedge.discovery.procedures.Role.FOLLOWER; +import static org.neo4j.coreedge.discovery.procedures.Role.LEADER; +import static org.neo4j.coreedge.discovery.procedures.Role.READ_REPLICA; +import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName; import static org.neo4j.kernel.api.security.AccessMode.Static.READ; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -101,8 +112,8 @@ public void shouldDiscoverCoreMembers() throws Exception Cluster cluster = clusterRule.startCluster(); Matcher> expected = allOf( - containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ), - containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ), doesNotContainRole( Role.READ_REPLICA ) ); + containsMemberAddresses( cluster.coreMembers() ), + containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ), doesNotContainRole( READ_REPLICA ) ); for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { @@ -122,9 +133,8 @@ public void shouldDiscoverCoreAndEdgeMembers() throws Exception Cluster cluster = clusterRule.startCluster(); Matcher> expected = allOf( - containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ), - containsAddress( "127.0.0.1:9000" ), containsAddress( "127.0.0.1:9001" ), containsAddress( "127.0.0.1:9002" ), - containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ), containsRole( Role.READ_REPLICA, 3 ) ); + containsAllMemberAddresses( cluster.coreMembers(), cluster.edgeMembers() ), + containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ), containsRole( READ_REPLICA, 3 ) ); for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { @@ -146,9 +156,8 @@ public void shouldDiscoverEdgeMembersAfterRestartingCores() throws Exception cluster.startCoreMembers(); Matcher> expected = allOf( - containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ), - containsAddress( "127.0.0.1:9000" ), containsAddress( "127.0.0.1:9001" ), containsAddress( "127.0.0.1:9002" ), - containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ), containsRole( Role.READ_REPLICA, 3 ) ); + containsAllMemberAddresses( cluster.coreMembers(), cluster.edgeMembers() ), + containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ), containsRole( READ_REPLICA, 3 ) ); for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { @@ -171,9 +180,8 @@ public void shouldDiscoverNewCoreMembers() throws Exception cluster.addCoreMemberWithId( 4 ).start(); Matcher> expected = allOf( - containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ), - containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 4 ), - containsAddress( "127.0.0.1:8003" ), containsAddress( "127.0.0.1:8004" ) ); // new core members + containsMemberAddresses( cluster.coreMembers() ), + containsRole( LEADER, 1 ), containsRole( FOLLOWER, 4 ) ); for ( int coreServerId = 0; coreServerId < 5; coreServerId++ ) { @@ -196,10 +204,8 @@ public void shouldDiscoverNewEdgeMembers() throws Exception cluster.addEdgeMemberWithId( 4 ).start(); Matcher> expected = allOf( - containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ), - containsAddress( "127.0.0.1:9000" ), containsAddress( "127.0.0.1:9001" ), containsAddress( "127.0.0.1:9002" ), - containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ), containsRole( Role.READ_REPLICA, 5 ), - containsAddress( "127.0.0.1:9003" ), containsAddress( "127.0.0.1:9004" ) ); // new edge members + containsAllMemberAddresses( cluster.coreMembers(), cluster.edgeMembers() ), + containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ), containsRole( READ_REPLICA, 5 ) ); for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { @@ -219,7 +225,7 @@ public void shouldDiscoverRemovalOfEdgeMembers() throws Exception for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { - assertEventualOverview( cluster, containsRole( Role.READ_REPLICA, 3 ), coreServerId ); + assertEventualOverview( cluster, containsRole( READ_REPLICA, 3 ), coreServerId ); } // when @@ -229,7 +235,7 @@ public void shouldDiscoverRemovalOfEdgeMembers() throws Exception for ( int coreServerId = 0; coreServerId < 3; coreServerId++ ) { // then - assertEventualOverview( cluster, containsRole( Role.READ_REPLICA, 1 ), coreServerId ); + assertEventualOverview( cluster, containsRole( READ_REPLICA, 1 ), coreServerId ); } } @@ -244,7 +250,7 @@ public void shouldDiscoverRemovalOfCoreMembers() throws Exception for ( int coreServerId = 0; coreServerId < 5; coreServerId++ ) { - assertEventualOverview( cluster, allOf( containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 4 ) ), + assertEventualOverview( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, 4 ) ), coreServerId ); } @@ -255,7 +261,7 @@ public void shouldDiscoverRemovalOfCoreMembers() throws Exception for ( int coreServerId = 2; coreServerId < 5; coreServerId++ ) { // then - assertEventualOverview( cluster, allOf( containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ) ), + assertEventualOverview( cluster, allOf( containsRole( LEADER, 1 ), containsRole( FOLLOWER, 2 ) ), coreServerId ); } } @@ -267,16 +273,44 @@ private void assertEventualOverview( Cluster cluster, Matcher> () -> clusterOverview( cluster.getCoreMemberById( coreServerId ).database() ), expected, 60, SECONDS ); } - private Matcher> containsAddress( String expectedAddress ) + private Matcher> containsAllMemberAddresses( + Collection... members ) { - return new FeatureMatcher,Long>( equalTo( 1L ), expectedAddress, "count" ) + ArrayList clusterMembers = new ArrayList<>(); + for ( Collection member : members ) { - @Override - protected Long featureValueOf( List overview ) - { - return overview.stream().filter( info -> info.address.equals( expectedAddress ) ).count(); - } - }; + clusterMembers.addAll( member ); + } + return containsMemberAddresses( clusterMembers ); + } + + private Matcher> containsMemberAddresses( Collection members ) + { + return containsInAnyOrder( members.stream().map( coreClusterMember -> + new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( MemberInfo item ) + { + Set addresses = asSet(item.addresses); + for ( URI uri : coreClusterMember.clientConnectorAddresses().uriList() ) + { + if (!addresses.contains( uri.toString() )) + { + return false; + } + } + return true; + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "MemberInfo with addresses: " ) + .appendValue( coreClusterMember.clientConnectorAddresses().getBoltAddress() ); + } + } + ).collect( toList() ) ); } private Matcher> containsRole( Role expectedRole, long expectedCount ) @@ -311,20 +345,23 @@ private List clusterOverview( GraphDatabaseFacade db ) while ( itr.hasNext() ) { Object[] row = itr.next(); - infos.add( new MemberInfo( (String) row[1], Role.valueOf( (String) row[2] ) ) ); + Object[] addresses = (Object[]) row[1]; + infos.add( new MemberInfo( Arrays.copyOf( addresses, addresses.length, String[].class ), + Role.valueOf( (String) row[2] ) ) ); } } + return infos; } private static class MemberInfo { - private final String address; + private final String[] addresses; private final Role role; - MemberInfo( String address, Role role ) + MemberInfo( String[] addresses, Role role ) { - this.address = address; + this.addresses = addresses; this.role = role; } @@ -332,27 +369,28 @@ private static class MemberInfo public boolean equals( Object o ) { if ( this == o ) - { return true; } + { + return true; + } if ( o == null || getClass() != o.getClass() ) - { return false; } + { + return false; + } MemberInfo that = (MemberInfo) o; - return Objects.equals( address, that.address ) && - Objects.equals( role, that.role ); + return Arrays.equals( addresses, that.addresses ) && + role == that.role; } @Override public int hashCode() { - return Objects.hash( address, role ); + return Objects.hash( addresses, role ); } @Override public String toString() { - return "MemberInfo{" + - "address='" + address + '\'' + - ", role=" + role + - '}'; + return String.format( "MemberInfo{addresses='%s', role=%s}", addresses, role ); } } } diff --git a/packaging/standalone/standalone-enterprise/src/main/distribution/text/enterprise/conf/neo4j.conf b/packaging/standalone/standalone-enterprise/src/main/distribution/text/enterprise/conf/neo4j.conf index 2fbc2e9431563..c81813704dc70 100644 --- a/packaging/standalone/standalone-enterprise/src/main/distribution/text/enterprise/conf/neo4j.conf +++ b/packaging/standalone/standalone-enterprise/src/main/distribution/text/enterprise/conf/neo4j.conf @@ -68,21 +68,17 @@ dbms.directories.import=import # individual advertised_address. # Bolt connector -dbms.connector.bolt.type=BOLT -dbms.connector.bolt.enabled=true +#dbms.connector.bolt.enabled=true #dbms.connector.bolt.tls_level=OPTIONAL #dbms.connector.bolt.listen_address=:7687 # HTTP Connector -dbms.connector.http.type=HTTP -dbms.connector.http.enabled=true +#dbms.connector.http.enabled=true #dbms.connector.http.listen_address=:#{default.http.port} # HTTPS Connector -dbms.connector.https.type=HTTP -dbms.connector.https.enabled=true -dbms.connector.https.encryption=TLS -dbms.connector.https.listen_address=:#{default.https.port} +#dbms.connector.https.enabled=true +#dbms.connector.https.listen_address=:#{default.https.port} # Number of Neo4j worker threads. #dbms.threads.worker_count= @@ -211,9 +207,6 @@ dbms.connector.https.listen_address=:#{default.https.port} #Raft log pruning frequncy. #core_edge.raft_log_pruning_frequency=10m -#RAFT log pruning strategy. -#core_edge.raft_log_prune_strategy=1g size - #The size to allow the raft log to grow before rotating. #core_edge.raft_log_rotation_size=250M