Skip to content

Commit

Permalink
Resolve on connect in NonBlockingChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jul 18, 2017
1 parent d2f8aa9 commit 874c10a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 25 deletions.
Expand Up @@ -25,12 +25,12 @@
import io.netty.util.concurrent.FutureListener;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.Log;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -46,14 +46,14 @@ class NonBlockingChannel
private final Log log;
private Channel nettyChannel;
private Bootstrap bootstrap;
private InetSocketAddress destination;
private AdvertisedSocketAddress destination;
private Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
private volatile boolean stillRunning = true;
private final MessageQueueMonitor monitor;
private final int maxQueueSize;
private FutureListener<Void> errorListener;

NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination,
NonBlockingChannel( Bootstrap bootstrap, final AdvertisedSocketAddress destination,
final Log log, MessageQueueMonitor monitor, int maxQueueSize )
{
this.bootstrap = bootstrap;
Expand Down Expand Up @@ -105,7 +105,7 @@ private void messageSendingThreadWork()
{
nettyChannel.close();
messageQueue.clear();
monitor.queueSize(destination, messageQueue.size());
monitor.queueSize( destination, messageQueue.size() );
}
}

Expand Down Expand Up @@ -173,12 +173,13 @@ private void ensureConnected() throws IOException
{
if ( nettyChannel != null && !nettyChannel.isOpen() )
{
log.warn( String.format( "Lost connection to: %s (%s)", destination, nettyChannel.remoteAddress() ) );
nettyChannel = null;
}

while ( nettyChannel == null && stillRunning )
{
ChannelFuture channelFuture = bootstrap.connect( destination );
ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() );

Channel channel = channelFuture.awaitUninterruptibly().channel();
if ( channelFuture.isSuccess() )
Expand Down
Expand Up @@ -93,7 +93,7 @@ private NonBlockingChannel channel( AdvertisedSocketAddress to )

if ( nonBlockingChannel == null )
{
nonBlockingChannel = new NonBlockingChannel( bootstrap, to.socketAddress(), log, monitor, maxQueueSize ) ;
nonBlockingChannel = new NonBlockingChannel( bootstrap, to, log, monitor, maxQueueSize ) ;
NonBlockingChannel existingNonBlockingChannel = nonBlockingChannels.putIfAbsent( to, nonBlockingChannel );

if ( existingNonBlockingChannel != null )
Expand All @@ -107,7 +107,7 @@ private NonBlockingChannel channel( AdvertisedSocketAddress to )
}
}

monitor.register( to.socketAddress() );
monitor.register( to );
return nonBlockingChannel;
}

Expand Down
Expand Up @@ -19,18 +19,18 @@
*/
package org.neo4j.causalclustering.messaging.monitoring;

import java.net.InetSocketAddress;
import org.neo4j.helpers.AdvertisedSocketAddress;

public interface MessageQueueMonitor
{
Long droppedMessages();

void droppedMessage( InetSocketAddress destination );
void droppedMessage( AdvertisedSocketAddress destination );

void queueSize( InetSocketAddress destination, long size );
void queueSize( AdvertisedSocketAddress destination, long size );

Long queueSizes();

void register( InetSocketAddress to );
void register( AdvertisedSocketAddress to );
}

Expand Up @@ -19,13 +19,13 @@
*/
package org.neo4j.metrics.source.causalclustering;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.helpers.AdvertisedSocketAddress;

public class MessageQueueMonitorMetric implements MessageQueueMonitor
{
Expand All @@ -39,13 +39,13 @@ public Long droppedMessages()
}

@Override
public void droppedMessage( InetSocketAddress destination )
public void droppedMessage( AdvertisedSocketAddress destination )
{
droppedMessages.get( destination.toString() ).increment();
}

@Override
public void queueSize( InetSocketAddress destination, long size )
public void queueSize( AdvertisedSocketAddress destination, long size )
{
queueSize.get( destination.toString() ).set( size );
}
Expand All @@ -57,14 +57,14 @@ public Long queueSizes()
}

@Override
public void register( InetSocketAddress destination )
public void register( AdvertisedSocketAddress destination )
{
if ( !droppedMessages.containsKey( destination.toString() ) )
{
droppedMessages.put( destination.toString(), new LongAdder() );
}

if ( !queueSize.containsKey( destination.getHostString() ) )
if ( !queueSize.containsKey( destination.toString() ) )
{
queueSize.put( destination.toString(), new AtomicLong() );
}
Expand Down
Expand Up @@ -19,10 +19,9 @@
*/
package org.neo4j.metrics.source;

import java.net.InetSocketAddress;

import org.junit.Test;

import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.metrics.source.causalclustering.MessageQueueMonitorMetric;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -35,9 +34,9 @@ public void shouldCalculateTotalDroppedMessages() throws Exception
{
// given
MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric();
InetSocketAddress one = new InetSocketAddress( 1 );
InetSocketAddress three = new InetSocketAddress( 3 );
InetSocketAddress two = new InetSocketAddress( 2 );
AdvertisedSocketAddress one = new AdvertisedSocketAddress( "hostname", 1 );
AdvertisedSocketAddress three = new AdvertisedSocketAddress( "hostname", 3 );
AdvertisedSocketAddress two = new AdvertisedSocketAddress( "hostname", 2 );

// when
metric.register( one );
Expand All @@ -57,9 +56,9 @@ public void shouldCalculateTotalQueueSize() throws Exception
{
// given
MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric();
InetSocketAddress one = new InetSocketAddress( 1 );
InetSocketAddress two = new InetSocketAddress( 2 );
InetSocketAddress three = new InetSocketAddress( 3 );
AdvertisedSocketAddress one = new AdvertisedSocketAddress( "hostname", 1 );
AdvertisedSocketAddress two = new AdvertisedSocketAddress( "hostname", 2 );
AdvertisedSocketAddress three = new AdvertisedSocketAddress( "hostname", 3 );

// when
metric.register( one );
Expand All @@ -71,6 +70,6 @@ public void shouldCalculateTotalQueueSize() throws Exception
metric.queueSize( three, 7 );

// then
assertThat( metric.queueSizes(), equalTo(18L) );
assertThat( metric.queueSizes(), equalTo( 18L ) );
}
}

0 comments on commit 874c10a

Please sign in to comment.