Skip to content

Commit

Permalink
Disable channel auto-read when bolt messages are queued up
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Jan 8, 2018
1 parent 3dc349f commit 59ac1d2
Show file tree
Hide file tree
Showing 12 changed files with 721 additions and 17 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.bolt.transport.NettyServer;
import org.neo4j.bolt.transport.NettyServer.ProtocolInitializer;
import org.neo4j.bolt.transport.SocketTransport;
import org.neo4j.bolt.v1.runtime.BoltChannelAutoReadLimiter;
import org.neo4j.bolt.v1.runtime.BoltConnectionDescriptor;
import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.runtime.BoltFactoryImpl;
Expand Down Expand Up @@ -231,7 +232,9 @@ private Map<Long, BiFunction<Channel, Boolean, BoltProtocol>> newVersions(
{
BoltConnectionDescriptor descriptor = new BoltConnectionDescriptor(
channel.remoteAddress(), channel.localAddress() );
BoltWorker worker = workerFactory.newWorker( descriptor, channel::close );
BoltChannelAutoReadLimiter limiter =
new BoltChannelAutoReadLimiter( channel, logging.getInternalLog( BoltChannelAutoReadLimiter.class ) );
BoltWorker worker = workerFactory.newWorker( descriptor, limiter, channel::close );
return new BoltProtocolV1( worker, channel, logging );
}
);
Expand Down
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime;

import io.netty.channel.Channel;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.logging.Log;

import static java.util.Objects.requireNonNull;

public class BoltChannelAutoReadLimiter implements BoltWorkerQueueMonitor
{
private static final int defaultLowWatermark = Integer.getInteger( "org.neo4j.bolt.autoread.loWatermark", 100 );
private static final int defaultHighWatermark = Integer.getInteger( "org.neo4j.bolt.autoread.hiWatermark", 300 );

private final AtomicInteger queueSize = new AtomicInteger( 0 );
private final Channel channel;
private final Log log;
private final int lowWatermark;
private final int highWatermark;

public BoltChannelAutoReadLimiter( Channel channel, Log log )
{
this( channel, log, defaultLowWatermark, defaultHighWatermark );
}

public BoltChannelAutoReadLimiter( Channel channel, Log log, int lowWatermark, int highWatermark)
{
if ( highWatermark <= 0 )
{
throw new IllegalArgumentException( "invalid highWatermark value" );
}

if ( lowWatermark < 0 || lowWatermark >= highWatermark )
{
throw new IllegalArgumentException( "invalid lowWatermark value" );
}

this.channel = requireNonNull( channel );
this.log = log;
this.lowWatermark = lowWatermark;
this.highWatermark = highWatermark;
}

@Override
public void enqueued( Job job )
{
checkLimitsOnEnqueue( queueSize.incrementAndGet() );
}

@Override
public void dequeued( Job job )
{
checkLimitsOnDequeue( queueSize.decrementAndGet() );
}

@Override
public void drained( Collection<Job> jobs )
{
checkLimitsOnDequeue( queueSize.addAndGet( -jobs.size() ) );
}

private void checkLimitsOnEnqueue( int currentSize )
{
if (currentSize > highWatermark && channel.config().isAutoRead())
{
if (log != null)
{
log.warn( "Channel [%s]: client produced %d messages on the worker queue, auto-read is being disabled.", channel.id(), currentSize );
}

channel.config().setAutoRead( false );
}
}

private void checkLimitsOnDequeue( int currentSize )
{
if (currentSize <= lowWatermark && !channel.config().isAutoRead())
{
if (log != null)
{
log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.id(), currentSize );
}

channel.config().setAutoRead( true );
}
}

}
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime;

import java.util.Collection;

public interface BoltWorkerQueueMonitor
{

void enqueued(Job job);

void dequeued(Job job);

void drained(Collection<Job> jobs);

}
Expand Up @@ -45,13 +45,13 @@ public MonitoredWorkerFactory( Monitors monitors, WorkerFactory delegate, Clock
}

@Override
public BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, Runnable onClose )
public BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, BoltWorkerQueueMonitor queueMonitor, Runnable onClose )
{
if ( monitors.hasListeners( SessionMonitor.class ) )
{
return new MonitoredBoltWorker( monitor, delegate.newWorker( connectionDescriptor, onClose ), clock );
return new MonitoredBoltWorker( monitor, delegate.newWorker( connectionDescriptor, queueMonitor, onClose ), clock );
}
return delegate.newWorker( connectionDescriptor, onClose );
return delegate.newWorker( connectionDescriptor, queueMonitor, onClose );
}

static class MonitoredBoltWorker implements BoltWorker
Expand Down
Expand Up @@ -27,13 +27,14 @@ public interface WorkerFactory
{
default BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor )
{
return newWorker( connectionDescriptor, null );
return newWorker( connectionDescriptor, null, null );
}

/**
* @param connectionDescriptor describes the underlying medium (TCP, HTTP, ...)
* @param queueMonitor object that will be notified of changes (enqueue, dequeue, drain) in worker job queue
* @param onClose callback for closing the underlying connection in case of protocol violation.
* @return a new job queue
*/
BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, Runnable onClose );
BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, BoltWorkerQueueMonitor queueMonitor, Runnable onClose );
}
Expand Up @@ -20,16 +20,19 @@
package org.neo4j.bolt.v1.runtime.concurrent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.neo4j.bolt.v1.runtime.BoltConnectionAuthFatality;
import org.neo4j.bolt.v1.runtime.BoltConnectionFatality;
import org.neo4j.bolt.v1.runtime.BoltProtocolBreachFatality;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.BoltWorkerQueueMonitor;
import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
Expand All @@ -39,21 +42,28 @@
*/
class RunnableBoltWorker implements Runnable, BoltWorker
{
private static final int workQueueSize = Integer.getInteger( "org.neo4j.bolt.workQueueSize", 100 );
private static final int workQueueMaxBatchSize = Integer.getInteger( "org.neo4j.bolt.workQueueMaxBatchSize", 100 );
static final int workQueuePollDuration = Integer.getInteger( "org.neo4j.bolt.workQueuePollDuration", 10 );

private final BlockingQueue<Job> jobQueue = new ArrayBlockingQueue<>( workQueueSize );
private final BlockingQueue<Job> jobQueue = new LinkedBlockingQueue<>();
private final BoltStateMachine machine;
private final Log log;
private final Log userLog;
private final BoltWorkerQueueMonitor queueMonitor;

private volatile boolean keepRunning = true;

RunnableBoltWorker( BoltStateMachine machine, LogService logging )
{
this( machine, logging, null );
}

RunnableBoltWorker( BoltStateMachine machine, LogService logging, BoltWorkerQueueMonitor queueMonitor )
{
this.machine = machine;
this.log = logging.getInternalLog( getClass() );
this.userLog = logging.getUserLog( getClass() );
this.queueMonitor = queueMonitor;
}

/**
Expand All @@ -68,6 +78,7 @@ public void enqueue( Job job )
try
{
jobQueue.put( job );
notifyEnqueued( job );
}
catch ( InterruptedException e )
{
Expand All @@ -80,7 +91,7 @@ public void enqueue( Job job )
@Override
public void run()
{
List<Job> batch = new ArrayList<>( workQueueSize );
List<Job> batch = new ArrayList<>( workQueueMaxBatchSize );

try
{
Expand All @@ -89,11 +100,13 @@ public void run()
Job job = jobQueue.poll( workQueuePollDuration, TimeUnit.SECONDS );
if ( job != null )
{
notifyDequeued( job );
execute( job );

for ( int jobCount = jobQueue.drainTo( batch ); keepRunning && jobCount > 0;
jobCount = jobQueue.drainTo( batch ) )
for ( int jobCount = jobQueue.drainTo( batch, workQueueMaxBatchSize ); keepRunning && jobCount > 0;
jobCount = jobQueue.drainTo( batch, workQueueMaxBatchSize ) )
{
notifyDrained( batch );
executeBatch( batch );
}
}
Expand Down Expand Up @@ -169,4 +182,29 @@ private void closeStateMachine()
log.error( "Unable to close Bolt session '" + machine.key() + "'", t );
}
}

private void notifyEnqueued( Job job )
{
if ( queueMonitor != null )
{
queueMonitor.enqueued( job );
}
}

private void notifyDequeued( Job job )
{
if ( queueMonitor != null )
{
queueMonitor.dequeued( job );
}
}

private void notifyDrained( List<Job> jobs )
{
if ( queueMonitor != null )
{
queueMonitor.drained( jobs );
}
}

}
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.BoltWorkerQueueMonitor;
import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand Down Expand Up @@ -60,10 +61,10 @@ public ThreadedWorkerFactory( BoltFactory connector, JobScheduler scheduler, Log
}

@Override
public BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, Runnable onClose )
public BoltWorker newWorker( BoltConnectionDescriptor connectionDescriptor, BoltWorkerQueueMonitor queueMonitor, Runnable onClose )
{
BoltStateMachine machine = connector.newMachine( connectionDescriptor, onClose, clock );
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logging );
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logging, queueMonitor );

scheduler.schedule( sessionWorker, worker, stringMap( THREAD_ID, machine.key() ) );

Expand Down

0 comments on commit 59ac1d2

Please sign in to comment.