From 3cdb352a7541338df31aa6b3c4414199c024a551 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Fri, 29 Jan 2016 16:27:49 +0000 Subject: [PATCH] Adding queue metrics to core-edge * Track the number of messages in the outbound queue * Track the number of dropped messages due to the queue being full * Make queue size configurable --- .../catchup/storecopy/edge/CoreClient.java | 5 +- .../storecopy/edge/EdgeToCoreClient.java | 4 +- .../coreedge/raft/net/NonBlockingChannel.java | 19 ++++- .../net/monitoring/MessageQueueMonitor.java | 36 ++++++++++ .../server/CoreEdgeClusterSettings.java | 4 ++ .../neo4j/coreedge/server/SenderService.java | 50 +++++++------ .../server/core/CoreToCoreClient.java | 4 +- .../core/EnterpriseCoreEditionModule.java | 5 +- .../edge/EnterpriseEdgeEditionModule.java | 4 +- .../coreedge/server/ChannelKeepAliveTest.java | 8 +-- .../org/neo4j/metrics/source/CoreMetrics.java | 11 +++ .../source/MessageQueueMonitorMetric.java | 72 +++++++++++++++++++ .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 54 ++++++++------ .../source/MessageQueueMonitorMetricTest.java | 55 ++++++++++++++ 14 files changed, 272 insertions(+), 59 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/monitoring/MessageQueueMonitor.java create mode 100644 enterprise/metrics/src/main/java/org/neo4j/metrics/source/MessageQueueMonitorMetric.java create mode 100644 enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/CoreClient.java index 79669590573e5..e18ae56ca2a79 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/CoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/CoreClient.java @@ -52,9 +52,10 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe private SenderService senderService; public CoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration, - ChannelInitializer channelInitializer, Monitors monitors ) + ChannelInitializer channelInitializer, Monitors monitors, int maxQueueSize ) { - this.senderService = new SenderService( expiryScheduler, expiration, channelInitializer, logProvider ); + this.senderService = new SenderService( expiryScheduler, expiration, channelInitializer, logProvider, + monitors, maxQueueSize ); this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java index 8becf704d4d21..e90d79fb62f82 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/edge/EdgeToCoreClient.java @@ -45,9 +45,9 @@ public class EdgeToCoreClient extends CoreClient { public EdgeToCoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration, - ChannelInitializer channelInitializer, Monitors monitors ) + ChannelInitializer channelInitializer, Monitors monitors, int maxQueueSize ) { - super( logProvider, expiryScheduler, expiration, channelInitializer, monitors ); + super( logProvider, expiryScheduler, expiration, channelInitializer, monitors, maxQueueSize ); } public static class ChannelInitializer extends io.netty.channel.ChannelInitializer diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/NonBlockingChannel.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/NonBlockingChannel.java index da1a9cce9f982..48fee269202a6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/NonBlockingChannel.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/NonBlockingChannel.java @@ -32,6 +32,7 @@ import io.netty.channel.ChannelHandler; import io.netty.util.concurrent.FutureListener; +import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor; import org.neo4j.coreedge.server.Disposable; import org.neo4j.helpers.collection.IteratorUtil; import org.neo4j.logging.Log; @@ -41,7 +42,6 @@ public class NonBlockingChannel implements Disposable { - private static final int MAX_QUEUE_SIZE = 64; private static final int CONNECT_BACKOFF_IN_MS = 250; /* This pause is a maximum for retrying in case of a park/unpark race as well as for any other abnormal situations. */ @@ -53,13 +53,19 @@ public class NonBlockingChannel implements Disposable private Queue messageQueue = new ConcurrentLinkedQueue<>(); private volatile boolean stillRunning = true; private ChannelHandler keepAliveHandler; + private final MessageQueueMonitor monitor; + private final int maxQueueSize; FutureListener errorListener; - public NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination, ChannelHandler keepAliveHandler, final Log log ) + public NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination, + ChannelHandler keepAliveHandler, final Log log, MessageQueueMonitor monitor, + int maxQueueSize) { this.bootstrap = bootstrap; this.destination = destination; this.keepAliveHandler = keepAliveHandler; + this.monitor = monitor; + this.maxQueueSize = maxQueueSize; this.errorListener = future -> { if ( !future.isSuccess() ) @@ -103,6 +109,7 @@ private void messageSendingThreadWork() { nettyChannel.close(); messageQueue.clear(); + monitor.queueSize(destination, messageQueue.size()); } } @@ -133,10 +140,15 @@ public void send( Object msg ) throw new IllegalStateException( "sending on disposed channel" ); } - if ( messageQueue.size() < MAX_QUEUE_SIZE ) + if ( messageQueue.size() < maxQueueSize ) { messageQueue.offer( msg ); LockSupport.unpark( messageSendingThread ); + monitor.queueSize( destination, messageQueue.size()); + } + else + { + monitor.droppedMessage(destination); } } @@ -155,6 +167,7 @@ private boolean sendMessages() throws IOException write.addListener( errorListener ); messageQueue.poll(); + monitor.queueSize( destination, messageQueue.size()); sentSomething = true; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/monitoring/MessageQueueMonitor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/monitoring/MessageQueueMonitor.java new file mode 100644 index 0000000000000..0ecabb243ac55 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/monitoring/MessageQueueMonitor.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.net.monitoring; + +import java.net.InetSocketAddress; + +public interface MessageQueueMonitor +{ + Long droppedMessages(); + + void droppedMessage( InetSocketAddress destination ); + + void queueSize( InetSocketAddress destination, long size ); + + Long queueSizes(); + + void register( InetSocketAddress to ); +} + diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java index 638a757bf9a01..4e80dc4ad7f30 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java @@ -159,4 +159,8 @@ public String toString() @Description("The maximum file size before the replicated lock token state file is rotated (in unit of entries)") public static final Setting replicated_lock_token_state_size = setting( "core_edge.replicated_lock_token_state_size", INTEGER, "1000" ); + + @Description("The number of messages waiting to be sent to other servers in the cluster") + public static final Setting outgoing_queue_size = + setting( "core_edge.outgoing_queue_size", INTEGER, "64" ); } \ No newline at end of file diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java index 5cafd9419c30e..e4f5f29ff60a1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java @@ -36,9 +36,11 @@ import org.neo4j.coreedge.raft.net.NonBlockingChannel; import org.neo4j.coreedge.raft.net.Outbound; +import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -47,27 +49,33 @@ public class SenderService extends LifecycleAdapter implements Outbound { private final Expiration expiration; - private final ConcurrentHashMap> lazyChannelMap = + private final ConcurrentHashMap lazyChannelMap = new ConcurrentHashMap<>(); private final ExpiryScheduler scheduler; private final ChannelInitializer channelInitializer; private final ReadWriteLock serviceLock = new ReentrantReadWriteLock(); private final Log log; + private final Monitors monitors; private JobScheduler.JobHandle jobHandle; private boolean senderServiceRunning; private Bootstrap bootstrap; private NioEventLoopGroup eventLoopGroup; + private int maxQueueSize; public SenderService( ExpiryScheduler expiryScheduler, - Expiration expiration, - ChannelInitializer channelInitializer, - LogProvider logProvider ) + Expiration expiration, + ChannelInitializer channelInitializer, + LogProvider logProvider, + Monitors monitors, + int maxQueueSize) { this.expiration = expiration; this.scheduler = expiryScheduler; this.channelInitializer = channelInitializer; this.log = logProvider.getLog( getClass() ); + this.monitors = monitors; + this.maxQueueSize = maxQueueSize; } @Override @@ -81,8 +89,10 @@ public void send( AdvertisedSocketAddress to, Serializable... messages ) return; } - Timestamped lazyChannel = getAndUpdateLife( to ); + MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class ); + TimestampedNonBlockingChannel lazyChannel = getAndUpdateLife( to, monitor ); NonBlockingChannel nonBlockingChannel = lazyChannel.get(); + monitor.register(to.socketAddress()); for ( Object msg : messages ) { nonBlockingChannel.send( msg ); @@ -99,18 +109,18 @@ public int activeChannelCount() return lazyChannelMap.size(); } - private Timestamped getAndUpdateLife( AdvertisedSocketAddress to ) + private TimestampedNonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, MessageQueueMonitor monitor ) { - Timestamped timestampedLazyChannel = lazyChannelMap.get( to ); + TimestampedNonBlockingChannel timestampedLazyChannel = lazyChannelMap.get( to ); if ( timestampedLazyChannel == null ) { Expiration.ExpirationTime expirationTime = expiration.new ExpirationTime(); - timestampedLazyChannel = new Timestamped<>( expirationTime, + timestampedLazyChannel = new TimestampedNonBlockingChannel( expirationTime, new NonBlockingChannel( bootstrap, to.socketAddress(), - new InboundKeepAliveHandler( expirationTime ), log ) ); + new InboundKeepAliveHandler( expirationTime ), log, monitor, maxQueueSize ) ); - Timestamped existingTimestampedLazyChannel = + TimestampedNonBlockingChannel existingTimestampedLazyChannel = lazyChannelMap.putIfAbsent( to, timestampedLazyChannel ); if ( existingTimestampedLazyChannel != null ) @@ -164,10 +174,10 @@ public synchronized void stop() jobHandle = null; } - Iterator> itr = lazyChannelMap.values().iterator(); + Iterator itr = lazyChannelMap.values().iterator(); while ( itr.hasNext() ) { - Timestamped timestampedChannel = itr.next(); + TimestampedNonBlockingChannel timestampedChannel = itr.next(); timestampedChannel.get().dispose(); itr.remove(); } @@ -189,10 +199,10 @@ public synchronized void stop() private synchronized void reapDeadChannels() { - Iterator> itr = lazyChannelMap.values().iterator(); + Iterator itr = lazyChannelMap.values().iterator(); while ( itr.hasNext() ) { - Timestamped timestampedChannel = itr.next(); + TimestampedNonBlockingChannel timestampedChannel = itr.next(); serviceLock.writeLock().lock(); try @@ -210,20 +220,20 @@ private synchronized void reapDeadChannels() } } - private final class Timestamped + private final class TimestampedNonBlockingChannel { private final Expiration.ExpirationTime endOfLife; - private T timestampedThing; + private NonBlockingChannel channel; - public Timestamped( Expiration.ExpirationTime endOfLife, T timestampedThing ) + public TimestampedNonBlockingChannel( Expiration.ExpirationTime endOfLife, NonBlockingChannel channel ) { this.endOfLife = endOfLife; - this.timestampedThing = timestampedThing; + this.channel = channel; } - public T get() + public NonBlockingChannel get() { - return timestampedThing; + return channel; } public Expiration.ExpirationTime getEndOfLife() diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreToCoreClient.java index 4b26a4bfd5331..b72e0ef942938 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreToCoreClient.java @@ -46,9 +46,9 @@ public class CoreToCoreClient extends CoreClient { public CoreToCoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration, - ChannelInitializer channelInitializer, Monitors monitors ) + ChannelInitializer channelInitializer, Monitors monitors, int maxQueueSize ) { - super( logProvider, expiryScheduler, expiration, channelInitializer, monitors ); + super( logProvider, expiryScheduler, expiration, channelInitializer, monitors, maxQueueSize ); } public static class ChannelInitializer extends io.netty.channel.ChannelInitializer diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 30e4fd8442f1c..fb2eacd319895 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -166,9 +166,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, life.add( dependencies.satisfyDependency( discoveryService ) ); final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal(); + int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size ); final SenderService senderService = new SenderService( new ExpiryScheduler( platformModule.jobScheduler ), new Expiration( SYSTEM_CLOCK ), - new RaftChannelInitializer( marshal ), logProvider ); + new RaftChannelInitializer( marshal ), logProvider, platformModule.monitors, maxQueueSize ); life.add( senderService ); final CoreMember myself = new CoreMember( @@ -347,7 +348,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, CoreToCoreClient.ChannelInitializer channelInitializer = new CoreToCoreClient.ChannelInitializer( logProvider ); CoreToCoreClient coreToCoreClient = life.add( new CoreToCoreClient( logProvider, expiryScheduler, expiration, - channelInitializer, platformModule.monitors ) ); + channelInitializer, platformModule.monitors, maxQueueSize ) ); channelInitializer.setOwner( coreToCoreClient ); long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java index f714cf5f5fca6..b0e74e6e01b62 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java @@ -35,6 +35,7 @@ import org.neo4j.coreedge.catchup.tx.edge.TxPullClient; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; import org.neo4j.coreedge.discovery.EdgeDiscoveryService; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.Expiration; import org.neo4j.coreedge.server.ExpiryScheduler; import org.neo4j.graphdb.DependencyResolver; @@ -145,8 +146,9 @@ public void assertSchemaWritesAllowed() throws InvalidTransactionTypeKernelExcep Expiration expiration = new Expiration( SYSTEM_CLOCK ); EdgeToCoreClient.ChannelInitializer channelInitializer = new EdgeToCoreClient.ChannelInitializer( logProvider ); + int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size ); EdgeToCoreClient edgeToCoreClient = life.add( new EdgeToCoreClient( logProvider, expiryScheduler, expiration, - channelInitializer, platformModule.monitors ) ); + channelInitializer, platformModule.monitors, maxQueueSize ) ); channelInitializer.setOwner( edgeToCoreClient ); Supplier transactionIdStoreSupplier = diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/ChannelKeepAliveTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/ChannelKeepAliveTest.java index 78f4aa64959f8..be55db61b0cd2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/ChannelKeepAliveTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/ChannelKeepAliveTest.java @@ -37,11 +37,8 @@ import io.netty.handler.codec.LengthFieldPrepender; import org.junit.Test; -import org.neo4j.coreedge.server.AdvertisedSocketAddress; -import org.neo4j.coreedge.server.ExpiryScheduler; -import org.neo4j.coreedge.server.Expiration; -import org.neo4j.coreedge.server.SenderService; import org.neo4j.helpers.FakeClock; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import org.neo4j.test.OnDemandJobScheduler; @@ -117,7 +114,8 @@ public void shouldReapChannelOnlyAfterItHasExpired() throws Throwable OnDemandJobScheduler onDemandJobScheduler = new OnDemandJobScheduler(); SenderService senderService = new SenderService( new ExpiryScheduler( onDemandJobScheduler ), - new Expiration( fakeClock, 2, MINUTES ), discardClientInitializer(), NullLogProvider.getInstance() ); + new Expiration( fakeClock, 2, MINUTES ), discardClientInitializer(), NullLogProvider.getInstance(), + new Monitors(), 64); senderService.start(); senderService.send( new AdvertisedSocketAddress( serverAddress ), "GO!" ); diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java index 83351f089a5b7..9b69264828865 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java @@ -51,6 +51,10 @@ public class CoreMetrics extends LifecycleAdapter public static final String TX_RETRIES = name( CORE_EDGE_PREFIX, "tx_retries" ); @Documented("Is this server the leader?") public static final String IS_LEADER = name( CORE_EDGE_PREFIX, "is_leader" ); + @Documented("How many RAFT messages were dropped?") + public static final String DROPPED_MESSAGES = name( CORE_EDGE_PREFIX, "dropped_messages" ); + @Documented("How many RAFT messages are queued up?") + public static final String QUEUE_SIZE = name( CORE_EDGE_PREFIX, "queue_sizes" ); private Monitors monitors; private MetricRegistry registry; @@ -62,6 +66,7 @@ public class CoreMetrics extends LifecycleAdapter private final LeaderNotFoundMetric leaderNotFoundMetric = new LeaderNotFoundMetric(); private final TxPullRequestsMetric txPullRequestsMetric = new TxPullRequestsMetric(); private final TxRetryMetric txRetryMetric = new TxRetryMetric(); + private final MessageQueueMonitorMetric messageQueueMetric = new MessageQueueMonitorMetric(); public CoreMetrics( Monitors monitors, MetricRegistry registry, Supplier coreMetaData ) { @@ -79,6 +84,7 @@ public void start() throws Throwable monitors.addMonitorListener( leaderNotFoundMetric ); monitors.addMonitorListener( txPullRequestsMetric ); monitors.addMonitorListener( txRetryMetric ); + monitors.addMonitorListener( messageQueueMetric ); registry.register( COMMIT_INDEX, (Gauge) raftLogCommitIndexMetric::commitIndex ); registry.register( APPEND_INDEX, (Gauge) raftLogAppendIndexMetric::appendIndex ); @@ -87,6 +93,8 @@ public void start() throws Throwable registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived ); registry.register( TX_RETRIES, (Gauge) txRetryMetric::transactionsRetries ); registry.register( IS_LEADER, new LeaderGauge() ); + registry.register( DROPPED_MESSAGES, (Gauge) messageQueueMetric::droppedMessages ); + registry.register( QUEUE_SIZE, (Gauge) messageQueueMetric::queueSizes ); } @Override @@ -99,6 +107,8 @@ public void stop() throws IOException registry.remove( TX_PULL_REQUESTS_RECEIVED ); registry.remove( TX_RETRIES ); registry.remove( IS_LEADER ); + registry.remove( DROPPED_MESSAGES ); + registry.remove( QUEUE_SIZE ); monitors.removeMonitorListener( raftLogCommitIndexMetric ); monitors.removeMonitorListener( raftLogAppendIndexMetric ); @@ -106,6 +116,7 @@ public void stop() throws IOException monitors.removeMonitorListener( leaderNotFoundMetric ); monitors.removeMonitorListener( txPullRequestsMetric ); monitors.removeMonitorListener( txRetryMetric ); + monitors.removeMonitorListener( messageQueueMetric ); } private class LeaderGauge implements Gauge diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/MessageQueueMonitorMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/MessageQueueMonitorMetric.java new file mode 100644 index 0000000000000..99f6d27f17da9 --- /dev/null +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/MessageQueueMonitorMetric.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics.source; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor; + +public class MessageQueueMonitorMetric implements MessageQueueMonitor +{ + private Map droppedMessages = new TreeMap<>(); + private Map queueSize = new TreeMap<>(); + + @Override + public Long droppedMessages() + { + return droppedMessages.values().stream().mapToLong( LongAdder::longValue ).sum(); + } + + @Override + public void droppedMessage( InetSocketAddress destination ) + { + droppedMessages.get( destination.toString() ).increment(); + } + + @Override + public void queueSize( InetSocketAddress destination, long size ) + { + queueSize.get( destination.toString() ).set( size ); + } + + @Override + public Long queueSizes() + { + return queueSize.values().stream().mapToLong( AtomicLong::get ).sum(); + } + + @Override + public void register( InetSocketAddress destination ) + { + if ( !droppedMessages.containsKey( destination.toString() ) ) + { + droppedMessages.put( destination.toString(), new LongAdder() ); + } + + if ( !queueSize.containsKey( destination.getHostString() ) ) + { + queueSize.put( destination.toString(), new AtomicLong() ); + } + } +} diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index fcebb85287a0e..f38220af90b88 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -42,12 +42,14 @@ import org.neo4j.tooling.GlobalGraphOperations; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; + import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address; import static org.neo4j.graphdb.Label.label; import static org.neo4j.helpers.collection.Iterables.count; @@ -81,7 +83,7 @@ public void shouldMonitorCoreEdge() throws Exception // then for ( final CoreGraphDatabase db : cluster.coreServers() ) { - assertAllNodesVisible(db); + assertAllNodesVisible( db ); } for ( final EdgeGraphDatabase db : cluster.edgeServers() ) @@ -91,23 +93,23 @@ public void shouldMonitorCoreEdge() throws Exception File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), "metrics" ); - assertEventually("append index eventually accurate", + assertEventually( "append index eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.APPEND_INDEX ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("commit index eventually accurate", + assertEventually( "commit index eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.COMMIT_INDEX ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("term eventually accurate", + assertEventually( "term eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TERM ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("leader not found eventually accurate", + assertEventually( "leader not found eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.LEADER_NOT_FOUND ) ), - equalTo ( 0L ), 5, TimeUnit.SECONDS ); + equalTo( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("tx pull requests received eventually accurate", + assertEventually( "tx pull requests received eventually accurate", () -> { long total = 0; @@ -118,29 +120,37 @@ public void shouldMonitorCoreEdge() throws Exception } return total; }, - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("tx retries eventually accurate", + assertEventually( "tx retries eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_RETRIES ) ), - equalTo ( 0L ), 5, TimeUnit.SECONDS ); + equalTo( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("is leader eventually accurate", + assertEventually( "is leader eventually accurate", () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.IS_LEADER ) ), - greaterThanOrEqualTo ( 0L ), 5, TimeUnit.SECONDS ); + greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); File edgeServerMetricsDir = new File( cluster.getEdgeServerById( 0 ).getStoreDir(), "metrics" ); - assertEventually("pull update request registered", + assertEventually( "pull update request registered", () -> readLastValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATES ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("pull update request registered", + assertEventually( "pull update request registered", () -> readLastValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_REQUESTED ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); - assertEventually("pull update response received", + assertEventually( "pull update response received", () -> readLastValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_RECEIVED ) ), - greaterThan ( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), 5, TimeUnit.SECONDS ); + + assertEventually( "dropped messages eventually accurate", + () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.DROPPED_MESSAGES ) ), + equalTo( 0L ), 5, TimeUnit.SECONDS ); + + assertEventually( "queue size eventually accurate", + () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.QUEUE_SIZE ) ), + equalTo( 0L ), 5, TimeUnit.SECONDS ); cluster.shutdown(); } @@ -154,7 +164,7 @@ private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception Config config = db.getDependencyResolver().resolveDependency( Config.class ); assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount, - greaterThan( 0L ), 15, SECONDS ); + greaterThan( 0L ), 15, SECONDS ); for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() ) { diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java new file mode 100644 index 0000000000000..0f8d0c66ccb10 --- /dev/null +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/source/MessageQueueMonitorMetricTest.java @@ -0,0 +1,55 @@ +package org.neo4j.metrics.source; + +import java.net.InetSocketAddress; + +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class MessageQueueMonitorMetricTest +{ + @Test + public void shouldCalculateTotalDroppedMessages() throws Exception + { + // given + MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric(); + InetSocketAddress one = new InetSocketAddress( 1 ); + InetSocketAddress three = new InetSocketAddress( 3 ); + InetSocketAddress two = new InetSocketAddress( 2 ); + + // when + metric.register( one ); + metric.register( two ); + metric.register( three ); + + metric.droppedMessage( one ); + metric.droppedMessage( two ); + metric.droppedMessage( three ); + + // then + assertThat( metric.droppedMessages(), equalTo( 3L ) ); + } + + @Test + public void shouldCalculateTotalQueueSize() throws Exception + { + // given + MessageQueueMonitorMetric metric = new MessageQueueMonitorMetric(); + InetSocketAddress one = new InetSocketAddress( 1 ); + InetSocketAddress two = new InetSocketAddress( 2 ); + InetSocketAddress three = new InetSocketAddress( 3 ); + + // when + metric.register( one ); + metric.register( two ); + metric.register( three ); + + metric.queueSize( one, 5 ); + metric.queueSize( two, 6 ); + metric.queueSize( three, 7 ); + + // then + assertThat( metric.queueSizes(), equalTo(18L) ); + } +} \ No newline at end of file