Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/2.3' into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Nov 12, 2015
2 parents d4e469c + 97bb67c commit 3f9b46c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 33 deletions.
Expand Up @@ -19,15 +19,6 @@
*/
package org.neo4j.cluster.com;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
Expand All @@ -46,6 +37,14 @@
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

import org.neo4j.cluster.com.message.Message;
import org.neo4j.cluster.com.message.MessageProcessor;
import org.neo4j.cluster.com.message.MessageSource;
Expand Down Expand Up @@ -102,16 +101,17 @@ public interface NetworkChannelsListener
private ServerBootstrap serverBootstrap;
private Iterable<MessageProcessor> processors = Listeners.newListeners();

private Monitor monitor;
private Configuration config;
private Log msgLog;
private final Monitor monitor;
private final Configuration config;
private final Log msgLog;

private Map<URI, Channel> connections = new ConcurrentHashMap<URI, Channel>();
private final Map<URI, Channel> connections = new ConcurrentHashMap<>();
private Iterable<NetworkChannelsListener> listeners = Listeners.newListeners();

volatile boolean bindingDetected = false;

private volatile boolean paused;
private int port;

public NetworkReceiver( Monitor monitor, Configuration config, LogProvider logProvider )
{
Expand Down Expand Up @@ -147,14 +147,16 @@ public void start()
int maxPort = ports.length == 2 ? ports[1] : minPort;

// Try all ports in the given range
listen( minPort, maxPort );
port = listen( minPort, maxPort );

msgLog.debug( "Started NetworkReceiver at " + config.clusterServer().getHost() + ":" + port );
}

@Override
public void stop()
throws Throwable
{
msgLog.debug( "Shutting down NetworkReceiver" );
msgLog.debug( "Shutting down NetworkReceiver at " + config.clusterServer().getHost() + ":" + port );

channels.close().awaitUninterruptibly();
serverBootstrap.releaseExternalResources();
Expand All @@ -172,8 +174,8 @@ public void setPaused(boolean paused)
this.paused = paused;
}

private void listen( int minPort, int maxPort )
throws URISyntaxException, ChannelException, UnknownHostException
private int listen( int minPort, int maxPort )
throws URISyntaxException, ChannelException
{
ChannelException ex = null;
for ( int checkPort = minPort; checkPort <= maxPort; checkPort++ )
Expand All @@ -197,7 +199,7 @@ private void listen( int minPort, int maxPort )
listeningAt( getURI( localAddress ) );

channels.add( listenChannel );
return;
return checkPort;
}
catch ( ChannelException e )
{
Expand All @@ -210,6 +212,7 @@ private void listen( int minPort, int maxPort )
}

// MessageSource implementation
@Override
public void addMessageProcessor( MessageProcessor processor )
{
processors = Listeners.addListener( processor, processors );
Expand Down Expand Up @@ -238,7 +241,7 @@ public void receive( Message message )
}
}

private URI getURI( InetSocketAddress address ) throws URISyntaxException
private URI getURI( InetSocketAddress address )
{
String uri;

Expand Down
Expand Up @@ -101,18 +101,18 @@ public interface NetworkChannelsListener

// Sending
// One executor for each receiving instance, so that one blocking instance cannot block others receiving messages
private Map<URI, ExecutorService> senderExecutors = new HashMap<URI, ExecutorService>();
private Set<URI> failedInstances = new HashSet<URI>(); // Keeps track of what instances we have failed to open
private final Map<URI, ExecutorService> senderExecutors = new HashMap<URI, ExecutorService>();
private final Set<URI> failedInstances = new HashSet<URI>(); // Keeps track of what instances we have failed to open
// connections to
private ClientBootstrap clientBootstrap;

private final Monitor monitor;
private Configuration config;
private final Configuration config;
private final NetworkReceiver receiver;
private Log msgLog;
private final Log msgLog;
private URI me;

private Map<URI, Channel> connections = new ConcurrentHashMap<URI, Channel>();
private final Map<URI, Channel> connections = new ConcurrentHashMap<URI, Channel>();
private Iterable<NetworkChannelsListener> listeners = Listeners.newListeners();

private volatile boolean paused;
Expand Down Expand Up @@ -163,6 +163,13 @@ public void start()
Executors.newFixedThreadPool( 2, daemon( "Cluster client worker", monitor ) ), 2 ) );
clientBootstrap.setOption( "tcpNoDelay", true );
clientBootstrap.setPipelineFactory( new NetworkNodePipelineFactory() );

msgLog.debug( "Started NetworkSender for " + toString( config ) );
}

private String toString( Configuration config )
{
return "defaultPort:" + config.defaultPort() + ", port:" + config.port();
}

@Override
Expand All @@ -188,7 +195,7 @@ public void stop()

channels.close().awaitUninterruptibly();
clientBootstrap.releaseExternalResources();
msgLog.debug( "Shutting down NetworkSender complete" );
msgLog.debug( "Shutting down NetworkSender for " + toString( config ) + " complete" );
}

@Override
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.modeswitch.HighAvailabilityModeSwitcher;

import static java.lang.String.format;

/**
* Keeps a list of members, their roles and availability for display for example in JMX or REST.
* <p>
Expand Down Expand Up @@ -125,4 +127,18 @@ private static ClusterMember updateRole( ClusterMember member, HighAvailabilityM
return member.unavailable();
}
}

@Override
public String toString()
{
StringBuilder buf = new StringBuilder();
for ( ClusterMember clusterMember : getMembers() )
{
buf.append( " " ).append( clusterMember.getInstanceId() ).append( ":" )
.append( clusterMember.getHARole() )
.append( " (is alive = " ).append( clusterMember.isAlive() ).append( ")" )
.append( format( "%n" ) );
}
return buf.toString();
}
}
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

Expand Down Expand Up @@ -91,6 +92,7 @@
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableMap;

import static org.neo4j.helpers.ArrayUtil.contains;
import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand Down Expand Up @@ -531,14 +533,7 @@ public static String stateToString( ManagedCluster cluster )
.append( " (" ).append( client.getClusterServer() ).append( "):" ).append( "\n" );

ClusterMembers members = database.getDependencyResolver().resolveDependency( ClusterMembers.class );

for ( ClusterMember clusterMember : members.getMembers() )
{
buf.append( " " ).append( clusterMember.getInstanceId() ).append( ":" )
.append( clusterMember.getHARole() )
.append( " (is alive = " ).append( clusterMember.isAlive() ).append( ")" )
.append( "\n" );
}
buf.append( members );
}

return buf.toString();
Expand Down

0 comments on commit 3f9b46c

Please sign in to comment.