Skip to content

Commit

Permalink
Expose all connector addresses in dbms.cluster.overview
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj committed Oct 3, 2016
1 parent 75b29e3 commit f399eec
Show file tree
Hide file tree
Showing 25 changed files with 519 additions and 184 deletions.
@@ -1,3 +1,22 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.server.configuration;

import java.util.List;
Expand Down
Expand Up @@ -119,11 +119,4 @@ else if ( transactionIdStore.getLastCommittedTransactionId() >= firstTxId )
monitor.increment();
protocol.expect( State.MESSAGE_TYPE );
}

@Override
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause )
{
cause.printStackTrace();
ctx.close();
}
}
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.server.configuration.ClientConnectorSettings.HttpConnector.Encryption;

import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.bolt;
import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.http;
import static org.neo4j.coreedge.discovery.ClientConnectorAddresses.Scheme.https;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnectors;
import static org.neo4j.server.configuration.ClientConnectorSettings.httpConnector;

public class ClientConnectorAddresses
{
private final List<ConnectorUri> connectorUris;

public ClientConnectorAddresses( List<ConnectorUri> connectorUris )
{
this.connectorUris = connectorUris;
}

static ClientConnectorAddresses extractFromConfig( Config config )
{
List<ConnectorUri> connectorUris = new ArrayList<>();

connectorUris.add( new ConnectorUri( bolt, boltConnectors( config ).stream().findFirst()
.map( boltConnector -> config.get( boltConnector.advertised_address ) ).orElseThrow( () ->
new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) ) ) );

connectorUris.add( new ConnectorUri( http, config.get( httpConnector( config, Encryption.NONE ).orElseThrow(
() -> new IllegalArgumentException( "An HTTP connector must be configured to run the server" ) )
.advertised_address ) ) );

httpConnector( config, Encryption.TLS )
.map( ( connector ) -> config.get( connector.advertised_address ) )
.ifPresent( httpsAddress -> connectorUris.add( new ConnectorUri( https, httpsAddress ) ) );

return new ClientConnectorAddresses( connectorUris );
}

public AdvertisedSocketAddress getBoltAddress()
{
return connectorUris.stream().filter( connectorUri -> connectorUri.scheme == bolt ).findFirst().orElseThrow(
() -> new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) )
.socketAddress;
}

public List<URI> uriList()
{
return connectorUris.stream().map( ConnectorUri::toUri ).collect( Collectors.toList() );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
ClientConnectorAddresses that = (ClientConnectorAddresses) o;
return Objects.equals( connectorUris, that.connectorUris );
}

@Override
public int hashCode()
{
return Objects.hash( connectorUris );
}

@Override
public String toString()
{
return connectorUris.stream().map( ConnectorUri::toString ).collect( Collectors.joining( "," ) );
}

static ClientConnectorAddresses fromString( String value )
{
return new ClientConnectorAddresses( Stream.of( value.split( "," ) )
.map( ConnectorUri::fromString ).collect( Collectors.toList() ) );
}

public enum Scheme
{
bolt, http, https
}

public static class ConnectorUri
{
private final Scheme scheme;
private final AdvertisedSocketAddress socketAddress;

public ConnectorUri( Scheme scheme, AdvertisedSocketAddress socketAddress )
{
this.scheme = scheme;
this.socketAddress = socketAddress;
}

private URI toUri()
{
try
{
return new URI( scheme.name().toLowerCase(), null, socketAddress.getHostname(), socketAddress.getPort(),
null, null, null );
}
catch ( URISyntaxException e )
{
throw new IllegalArgumentException( e );
}
}

@Override
public String toString()
{
return toUri().toString();
}

private static ConnectorUri fromString( String string )
{
URI uri = URI.create( string );
return new ConnectorUri( Scheme.valueOf( uri.getScheme() ),
new AdvertisedSocketAddress( uri.getHost(), uri.getPort() ) );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
ConnectorUri that = (ConnectorUri) o;
return scheme == that.scheme &&
Objects.equals( socketAddress, that.socketAddress );
}

@Override
public int hashCode()
{
return Objects.hash( scheme, socketAddress );
}
}
}
Expand Up @@ -25,14 +25,14 @@ public class CoreAddresses
{
private final AdvertisedSocketAddress raftServer;
private final AdvertisedSocketAddress catchupServer;
private final AdvertisedSocketAddress boltServer;
private final ClientConnectorAddresses clientConnectorAddresses;

public CoreAddresses( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer,
AdvertisedSocketAddress boltServer )
ClientConnectorAddresses clientConnectorAddresses )
{
this.raftServer = raftServer;
this.catchupServer = catchupServer;
this.boltServer = boltServer;
this.clientConnectorAddresses = clientConnectorAddresses;
}

public AdvertisedSocketAddress getRaftServer()
Expand All @@ -45,8 +45,8 @@ public AdvertisedSocketAddress getCatchupServer()
return catchupServer;
}

public AdvertisedSocketAddress getBoltServer()
public ClientConnectorAddresses getClientConnectorAddresses()
{
return boltServer;
return clientConnectorAddresses;
}
}
Expand Up @@ -21,7 +21,6 @@

import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.LogProvider;
Expand All @@ -31,6 +30,6 @@ public interface DiscoveryServiceFactory
CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider,
TopologyService edgeDiscoveryService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
}
Expand Up @@ -19,25 +19,23 @@
*/
package org.neo4j.coreedge.discovery;

import org.neo4j.helpers.AdvertisedSocketAddress;

public class EdgeAddresses
{
private final AdvertisedSocketAddress boltAddress;
private final ClientConnectorAddresses clientConnectorAddresses;

public EdgeAddresses( AdvertisedSocketAddress boltAddress )
public EdgeAddresses( ClientConnectorAddresses clientConnectorAddresses )
{
this.boltAddress = boltAddress;
this.clientConnectorAddresses = clientConnectorAddresses;
}

public AdvertisedSocketAddress getBoltAddress()
public ClientConnectorAddresses getClientConnectorAddresses()
{
return boltAddress;
return clientConnectorAddresses;
}

@Override
public String toString()
{
return String.format( "EdgeAddresses{boltAddress=%s}", boltAddress );
return String.format( "EdgeAddresses{clientConnectorAddresses=%s}", clientConnectorAddresses );
}
}
Expand Up @@ -28,7 +28,7 @@
import com.hazelcast.spi.exception.RetryableIOException;

import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand All @@ -41,22 +41,22 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
{
static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge";
private final Log log;
private final AdvertisedSocketAddress boltAddress;
private final ClientConnectorAddresses connectorAddresses;
private final HazelcastConnector connector;
private final RenewableTimeoutService renewableTimeoutService;
private HazelcastInstance hazelcastInstance;
private RenewableTimeoutService.RenewableTimeout edgeRefreshTimer;
private final long edgeTimeToLiveTimeout;
private final long edgeRefreshRate;

HazelcastClient( HazelcastConnector connector, LogProvider logProvider, AdvertisedSocketAddress boltAddress,
HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config,
RenewableTimeoutService renewableTimeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate )
{
this.connector = connector;
this.renewableTimeoutService = renewableTimeoutService;
this.edgeRefreshRate = edgeRefreshRate;
this.log = logProvider.getLog( getClass() );
this.boltAddress = boltAddress;
this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config );
this.edgeTimeToLiveTimeout = edgeTimeToLiveTimeout;
}

Expand Down Expand Up @@ -88,12 +88,12 @@ public void start() throws Throwable
private Object addEdgeServer( HazelcastInstance hazelcastInstance )
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String address = boltAddress.toString();
String addresses = connectorAddresses.toString();

log.debug( "Adding edge server into cluster (%s -> %s)", uuid, address );
log.debug( "Adding edge server into cluster (%s -> %s)", uuid, addresses );

return hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME )
.put( uuid, address, edgeTimeToLiveTimeout, MILLISECONDS );
.put( uuid, addresses, edgeTimeToLiveTimeout, MILLISECONDS );
}

@Override
Expand Down

0 comments on commit f399eec

Please sign in to comment.