diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java index 0b6dceb70bb19..10bc4bd440fd5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java @@ -25,12 +25,12 @@ import io.netty.util.concurrent.FutureListener; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.LockSupport; import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.logging.Log; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -46,14 +46,14 @@ class NonBlockingChannel private final Log log; private Channel nettyChannel; private Bootstrap bootstrap; - private InetSocketAddress destination; + private AdvertisedSocketAddress destination; private Queue messageQueue = new ConcurrentLinkedQueue<>(); private volatile boolean stillRunning = true; private final MessageQueueMonitor monitor; private final int maxQueueSize; private FutureListener errorListener; - NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination, + NonBlockingChannel( Bootstrap bootstrap, final AdvertisedSocketAddress destination, final Log log, MessageQueueMonitor monitor, int maxQueueSize ) { this.bootstrap = bootstrap; @@ -105,7 +105,7 @@ private void messageSendingThreadWork() { nettyChannel.close(); messageQueue.clear(); - monitor.queueSize(destination, messageQueue.size()); + monitor.queueSize( destination, messageQueue.size() ); } } @@ -173,12 +173,13 @@ private void ensureConnected() throws IOException { if ( nettyChannel != null && !nettyChannel.isOpen() ) { + log.warn( String.format( "Lost connection to: %s (%s)", destination, nettyChannel.remoteAddress() ) ); nettyChannel = null; } while ( nettyChannel == null && stillRunning ) { - ChannelFuture channelFuture = bootstrap.connect( destination ); + ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() ); Channel channel = channelFuture.awaitUninterruptibly().channel(); if ( channelFuture.isSuccess() ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index 95215e0071fea..2add579147dab 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -93,7 +93,7 @@ private NonBlockingChannel channel( AdvertisedSocketAddress to ) if ( nonBlockingChannel == null ) { - nonBlockingChannel = new NonBlockingChannel( bootstrap, to.socketAddress(), log, monitor, maxQueueSize ) ; + nonBlockingChannel = new NonBlockingChannel( bootstrap, to, log, monitor, maxQueueSize ) ; NonBlockingChannel existingNonBlockingChannel = nonBlockingChannels.putIfAbsent( to, nonBlockingChannel ); if ( existingNonBlockingChannel != null ) @@ -107,7 +107,7 @@ private NonBlockingChannel channel( AdvertisedSocketAddress to ) } } - monitor.register( to.socketAddress() ); + monitor.register( to ); return nonBlockingChannel; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/monitoring/MessageQueueMonitor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/monitoring/MessageQueueMonitor.java index 3c76c0109cbc3..a4a4d4bcf9e05 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/monitoring/MessageQueueMonitor.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/monitoring/MessageQueueMonitor.java @@ -19,18 +19,18 @@ */ package org.neo4j.causalclustering.messaging.monitoring; -import java.net.InetSocketAddress; +import org.neo4j.helpers.AdvertisedSocketAddress; public interface MessageQueueMonitor { Long droppedMessages(); - void droppedMessage( InetSocketAddress destination ); + void droppedMessage( AdvertisedSocketAddress destination ); - void queueSize( InetSocketAddress destination, long size ); + void queueSize( AdvertisedSocketAddress destination, long size ); Long queueSizes(); - void register( InetSocketAddress to ); + void register( AdvertisedSocketAddress to ); } diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/MessageQueueMonitorMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/MessageQueueMonitorMetric.java index 82b0fae59d2a8..fb68e243ad54e 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/MessageQueueMonitorMetric.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/MessageQueueMonitorMetric.java @@ -19,13 +19,13 @@ */ package org.neo4j.metrics.source.causalclustering; -import java.net.InetSocketAddress; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor; +import org.neo4j.helpers.AdvertisedSocketAddress; public class MessageQueueMonitorMetric implements MessageQueueMonitor { @@ -39,13 +39,13 @@ public Long droppedMessages() } @Override - public void droppedMessage( InetSocketAddress destination ) + public void droppedMessage( AdvertisedSocketAddress destination ) { droppedMessages.get( destination.toString() ).increment(); } @Override - public void queueSize( InetSocketAddress destination, long size ) + public void queueSize( AdvertisedSocketAddress destination, long size ) { queueSize.get( destination.toString() ).set( size ); } @@ -57,14 +57,14 @@ public Long queueSizes() } @Override - public void register( InetSocketAddress destination ) + public void register( AdvertisedSocketAddress destination ) { if ( !droppedMessages.containsKey( destination.toString() ) ) { droppedMessages.put( destination.toString(), new LongAdder() ); } - if ( !queueSize.containsKey( destination.getHostString() ) ) + if ( !queueSize.containsKey( destination.toString() ) ) { queueSize.put( destination.toString(), new AtomicLong() ); } diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java index 5c1fba21bfb4a..41cdd9b77d3a8 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java @@ -19,10 +19,9 @@ */ package org.neo4j.metrics.source; -import java.net.InetSocketAddress; - import org.junit.Test; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.metrics.source.causalclustering.MessageQueueMonitorMetric; import static org.hamcrest.Matchers.equalTo; @@ -35,9 +34,9 @@ public void shouldCalculateTotalDroppedMessages() throws Exception { // given MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric(); - InetSocketAddress one = new InetSocketAddress( 1 ); - InetSocketAddress three = new InetSocketAddress( 3 ); - InetSocketAddress two = new InetSocketAddress( 2 ); + AdvertisedSocketAddress one = new AdvertisedSocketAddress( "hostname", 1 ); + AdvertisedSocketAddress three = new AdvertisedSocketAddress( "hostname", 3 ); + AdvertisedSocketAddress two = new AdvertisedSocketAddress( "hostname", 2 ); // when metric.register( one ); @@ -57,9 +56,9 @@ public void shouldCalculateTotalQueueSize() throws Exception { // given MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric(); - InetSocketAddress one = new InetSocketAddress( 1 ); - InetSocketAddress two = new InetSocketAddress( 2 ); - InetSocketAddress three = new InetSocketAddress( 3 ); + AdvertisedSocketAddress one = new AdvertisedSocketAddress( "hostname", 1 ); + AdvertisedSocketAddress two = new AdvertisedSocketAddress( "hostname", 2 ); + AdvertisedSocketAddress three = new AdvertisedSocketAddress( "hostname", 3 ); // when metric.register( one ); @@ -71,6 +70,6 @@ public void shouldCalculateTotalQueueSize() throws Exception metric.queueSize( three, 7 ); // then - assertThat( metric.queueSizes(), equalTo(18L) ); + assertThat( metric.queueSizes(), equalTo( 18L ) ); } }