Skip to content

Commit

Permalink
Adding queue metrics to core-edge
Browse files Browse the repository at this point in the history
* Track the number of messages in the outbound queue
* Track the number of dropped messages due to the queue being full
* Make queue size configurable
  • Loading branch information
Mark Needham committed Jan 29, 2016
1 parent 7cb2c3c commit 3cdb352
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 59 deletions.
Expand Up @@ -52,9 +52,10 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe
private SenderService senderService;

public CoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration,
ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors )
ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors, int maxQueueSize )
{
this.senderService = new SenderService( expiryScheduler, expiration, channelInitializer, logProvider );
this.senderService = new SenderService( expiryScheduler, expiration, channelInitializer, logProvider,
monitors, maxQueueSize );
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
}

Expand Down
Expand Up @@ -45,9 +45,9 @@
public class EdgeToCoreClient extends CoreClient
{
public EdgeToCoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration,
ChannelInitializer channelInitializer, Monitors monitors )
ChannelInitializer channelInitializer, Monitors monitors, int maxQueueSize )
{
super( logProvider, expiryScheduler, expiration, channelInitializer, monitors );
super( logProvider, expiryScheduler, expiration, channelInitializer, monitors, maxQueueSize );
}

public static class ChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel>
Expand Down
Expand Up @@ -32,6 +32,7 @@
import io.netty.channel.ChannelHandler;
import io.netty.util.concurrent.FutureListener;

import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor;
import org.neo4j.coreedge.server.Disposable;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.logging.Log;
Expand All @@ -41,7 +42,6 @@

public class NonBlockingChannel implements Disposable
{
private static final int MAX_QUEUE_SIZE = 64;
private static final int CONNECT_BACKOFF_IN_MS = 250;
/* This pause is a maximum for retrying in case of a park/unpark race as well as for any other abnormal
situations. */
Expand All @@ -53,13 +53,19 @@ public class NonBlockingChannel implements Disposable
private Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
private volatile boolean stillRunning = true;
private ChannelHandler keepAliveHandler;
private final MessageQueueMonitor monitor;
private final int maxQueueSize;
FutureListener<Void> errorListener;

public NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination, ChannelHandler keepAliveHandler, final Log log )
public NonBlockingChannel( Bootstrap bootstrap, final InetSocketAddress destination,
ChannelHandler keepAliveHandler, final Log log, MessageQueueMonitor monitor,
int maxQueueSize)
{
this.bootstrap = bootstrap;
this.destination = destination;
this.keepAliveHandler = keepAliveHandler;
this.monitor = monitor;
this.maxQueueSize = maxQueueSize;

this.errorListener = future -> {
if ( !future.isSuccess() )
Expand Down Expand Up @@ -103,6 +109,7 @@ private void messageSendingThreadWork()
{
nettyChannel.close();
messageQueue.clear();
monitor.queueSize(destination, messageQueue.size());
}
}

Expand Down Expand Up @@ -133,10 +140,15 @@ public void send( Object msg )
throw new IllegalStateException( "sending on disposed channel" );
}

if ( messageQueue.size() < MAX_QUEUE_SIZE )
if ( messageQueue.size() < maxQueueSize )
{
messageQueue.offer( msg );
LockSupport.unpark( messageSendingThread );
monitor.queueSize( destination, messageQueue.size());
}
else
{
monitor.droppedMessage(destination);
}
}

Expand All @@ -155,6 +167,7 @@ private boolean sendMessages() throws IOException
write.addListener( errorListener );

messageQueue.poll();
monitor.queueSize( destination, messageQueue.size());
sentSomething = true;
}

Expand Down
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.net.monitoring;

import java.net.InetSocketAddress;

public interface MessageQueueMonitor
{
Long droppedMessages();

void droppedMessage( InetSocketAddress destination );

void queueSize( InetSocketAddress destination, long size );

Long queueSizes();

void register( InetSocketAddress to );
}

Expand Up @@ -159,4 +159,8 @@ public String toString()
@Description("The maximum file size before the replicated lock token state file is rotated (in unit of entries)")
public static final Setting<Integer> replicated_lock_token_state_size =
setting( "core_edge.replicated_lock_token_state_size", INTEGER, "1000" );

@Description("The number of messages waiting to be sent to other servers in the cluster")
public static final Setting<Integer> outgoing_queue_size =
setting( "core_edge.outgoing_queue_size", INTEGER, "64" );
}
Expand Up @@ -36,9 +36,11 @@

import org.neo4j.coreedge.raft.net.NonBlockingChannel;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand All @@ -47,27 +49,33 @@
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress>
{
private final Expiration expiration;
private final ConcurrentHashMap<AdvertisedSocketAddress,Timestamped<NonBlockingChannel>> lazyChannelMap =
private final ConcurrentHashMap<AdvertisedSocketAddress,TimestampedNonBlockingChannel> lazyChannelMap =
new ConcurrentHashMap<>();
private final ExpiryScheduler scheduler;
private final ChannelInitializer<SocketChannel> channelInitializer;
private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
private final Log log;
private final Monitors monitors;

private JobScheduler.JobHandle jobHandle;
private boolean senderServiceRunning;
private Bootstrap bootstrap;
private NioEventLoopGroup eventLoopGroup;
private int maxQueueSize;

public SenderService( ExpiryScheduler expiryScheduler,
Expiration expiration,
ChannelInitializer<SocketChannel> channelInitializer,
LogProvider logProvider )
Expiration expiration,
ChannelInitializer<SocketChannel> channelInitializer,
LogProvider logProvider,
Monitors monitors,
int maxQueueSize)
{
this.expiration = expiration;
this.scheduler = expiryScheduler;
this.channelInitializer = channelInitializer;
this.log = logProvider.getLog( getClass() );
this.monitors = monitors;
this.maxQueueSize = maxQueueSize;
}

@Override
Expand All @@ -81,8 +89,10 @@ public void send( AdvertisedSocketAddress to, Serializable... messages )
return;
}

Timestamped<NonBlockingChannel> lazyChannel = getAndUpdateLife( to );
MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class );
TimestampedNonBlockingChannel lazyChannel = getAndUpdateLife( to, monitor );
NonBlockingChannel nonBlockingChannel = lazyChannel.get();
monitor.register(to.socketAddress());
for ( Object msg : messages )
{
nonBlockingChannel.send( msg );
Expand All @@ -99,18 +109,18 @@ public int activeChannelCount()
return lazyChannelMap.size();
}

private Timestamped<NonBlockingChannel> getAndUpdateLife( AdvertisedSocketAddress to )
private TimestampedNonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, MessageQueueMonitor monitor )
{
Timestamped<NonBlockingChannel> timestampedLazyChannel = lazyChannelMap.get( to );
TimestampedNonBlockingChannel timestampedLazyChannel = lazyChannelMap.get( to );

if ( timestampedLazyChannel == null )
{
Expiration.ExpirationTime expirationTime = expiration.new ExpirationTime();
timestampedLazyChannel = new Timestamped<>( expirationTime,
timestampedLazyChannel = new TimestampedNonBlockingChannel( expirationTime,
new NonBlockingChannel( bootstrap, to.socketAddress(),
new InboundKeepAliveHandler( expirationTime ), log ) );
new InboundKeepAliveHandler( expirationTime ), log, monitor, maxQueueSize ) );

Timestamped<NonBlockingChannel> existingTimestampedLazyChannel =
TimestampedNonBlockingChannel existingTimestampedLazyChannel =
lazyChannelMap.putIfAbsent( to, timestampedLazyChannel );

if ( existingTimestampedLazyChannel != null )
Expand Down Expand Up @@ -164,10 +174,10 @@ public synchronized void stop()
jobHandle = null;
}

Iterator<Timestamped<NonBlockingChannel>> itr = lazyChannelMap.values().iterator();
Iterator<TimestampedNonBlockingChannel> itr = lazyChannelMap.values().iterator();
while ( itr.hasNext() )
{
Timestamped<NonBlockingChannel> timestampedChannel = itr.next();
TimestampedNonBlockingChannel timestampedChannel = itr.next();
timestampedChannel.get().dispose();
itr.remove();
}
Expand All @@ -189,10 +199,10 @@ public synchronized void stop()

private synchronized void reapDeadChannels()
{
Iterator<Timestamped<NonBlockingChannel>> itr = lazyChannelMap.values().iterator();
Iterator<TimestampedNonBlockingChannel> itr = lazyChannelMap.values().iterator();
while ( itr.hasNext() )
{
Timestamped<NonBlockingChannel> timestampedChannel = itr.next();
TimestampedNonBlockingChannel timestampedChannel = itr.next();

serviceLock.writeLock().lock();
try
Expand All @@ -210,20 +220,20 @@ private synchronized void reapDeadChannels()
}
}

private final class Timestamped<T extends Disposable>
private final class TimestampedNonBlockingChannel
{
private final Expiration.ExpirationTime endOfLife;
private T timestampedThing;
private NonBlockingChannel channel;

public Timestamped( Expiration.ExpirationTime endOfLife, T timestampedThing )
public TimestampedNonBlockingChannel( Expiration.ExpirationTime endOfLife, NonBlockingChannel channel )
{
this.endOfLife = endOfLife;
this.timestampedThing = timestampedThing;
this.channel = channel;
}

public T get()
public NonBlockingChannel get()
{
return timestampedThing;
return channel;
}

public Expiration.ExpirationTime getEndOfLife()
Expand Down
Expand Up @@ -46,9 +46,9 @@
public class CoreToCoreClient extends CoreClient
{
public CoreToCoreClient( LogProvider logProvider, ExpiryScheduler expiryScheduler, Expiration expiration,
ChannelInitializer channelInitializer, Monitors monitors )
ChannelInitializer channelInitializer, Monitors monitors, int maxQueueSize )
{
super( logProvider, expiryScheduler, expiration, channelInitializer, monitors );
super( logProvider, expiryScheduler, expiration, channelInitializer, monitors, maxQueueSize );
}

public static class ChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel>
Expand Down
Expand Up @@ -166,9 +166,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
life.add( dependencies.satisfyDependency( discoveryService ) );

final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal();
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
final SenderService senderService = new SenderService(
new ExpiryScheduler( platformModule.jobScheduler ), new Expiration( SYSTEM_CLOCK ),
new RaftChannelInitializer( marshal ), logProvider );
new RaftChannelInitializer( marshal ), logProvider, platformModule.monitors, maxQueueSize );
life.add( senderService );

final CoreMember myself = new CoreMember(
Expand Down Expand Up @@ -347,7 +348,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

CoreToCoreClient.ChannelInitializer channelInitializer = new CoreToCoreClient.ChannelInitializer( logProvider );
CoreToCoreClient coreToCoreClient = life.add( new CoreToCoreClient( logProvider, expiryScheduler, expiration,
channelInitializer, platformModule.monitors ) );
channelInitializer, platformModule.monitors, maxQueueSize ) );
channelInitializer.setOwner( coreToCoreClient );

long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout );
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.coreedge.catchup.tx.edge.TxPullClient;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.EdgeDiscoveryService;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.Expiration;
import org.neo4j.coreedge.server.ExpiryScheduler;
import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -145,8 +146,9 @@ public void assertSchemaWritesAllowed() throws InvalidTransactionTypeKernelExcep
Expiration expiration = new Expiration( SYSTEM_CLOCK );

EdgeToCoreClient.ChannelInitializer channelInitializer = new EdgeToCoreClient.ChannelInitializer( logProvider );
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
EdgeToCoreClient edgeToCoreClient = life.add( new EdgeToCoreClient( logProvider, expiryScheduler, expiration,
channelInitializer, platformModule.monitors ) );
channelInitializer, platformModule.monitors, maxQueueSize ) );
channelInitializer.setOwner( edgeToCoreClient );

Supplier<TransactionIdStore> transactionIdStoreSupplier =
Expand Down
Expand Up @@ -37,11 +37,8 @@
import io.netty.handler.codec.LengthFieldPrepender;
import org.junit.Test;

import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.ExpiryScheduler;
import org.neo4j.coreedge.server.Expiration;
import org.neo4j.coreedge.server.SenderService;
import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler;

Expand Down Expand Up @@ -117,7 +114,8 @@ public void shouldReapChannelOnlyAfterItHasExpired() throws Throwable

OnDemandJobScheduler onDemandJobScheduler = new OnDemandJobScheduler();
SenderService senderService = new SenderService( new ExpiryScheduler( onDemandJobScheduler ),
new Expiration( fakeClock, 2, MINUTES ), discardClientInitializer(), NullLogProvider.getInstance() );
new Expiration( fakeClock, 2, MINUTES ), discardClientInitializer(), NullLogProvider.getInstance(),
new Monitors(), 64);

senderService.start();
senderService.send( new AdvertisedSocketAddress( serverAddress ), "GO!" );
Expand Down

0 comments on commit 3cdb352

Please sign in to comment.