Skip to content

Commit

Permalink
Halt connection when write throttle is held for a configured duration
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Jan 18, 2018
1 parent 9aeaa23 commit c0a70bd
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 72 deletions.
10 changes: 5 additions & 5 deletions community/bolt/pom.xml
Expand Up @@ -86,6 +86,11 @@
<artifactId>annotations</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!--Test dependencies-->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -168,10 +173,5 @@
<artifactId>websocket-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -148,7 +148,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
WorkerFactory workerFactory = createWorkerFactory( boltFactory, scheduler, dependencies, logService, clock );
ConnectorPortRegister connectionRegister = dependencies.connectionRegister();

TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config );
TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock );
BoltProtocolHandlerFactory handlerFactory = createHandlerFactory( workerFactory, throttleGroup, logService );

Map<BoltConnector, ProtocolInitializer> connectors = config.enabledBoltConnectors().stream()
Expand Down
Expand Up @@ -35,8 +35,9 @@ public interface TransportThrottle
* Apply throttling logic for the given channel..
*
* @param channel the netty channel to which this throttling logic should be applied
* @throws TransportThrottleException when throttle decides this connection should be halted
*/
void acquire( Channel channel );
void acquire( Channel channel ) throws TransportThrottleException;

/**
* Release throttling for the given channel (if applied)..
Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.transport;

import org.neo4j.bolt.v1.runtime.BoltConnectionFatality;

public class TransportThrottleException extends BoltConnectionFatality
{

public TransportThrottleException( String message )
{
super( message );
}

}
Expand Up @@ -20,10 +20,8 @@
package org.neo4j.bolt.transport;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import java.util.ArrayList;
import java.util.List;
import java.time.Clock;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -45,9 +43,9 @@ private TransportThrottleGroup()
this.writeThrottle = NoOpTransportThrottle.INSTANCE;
}

public TransportThrottleGroup( Config config )
public TransportThrottleGroup( Config config, Clock clock )
{
this.writeThrottle = createWriteThrottle( config );
this.writeThrottle = createWriteThrottle( config, clock );
}

public TransportThrottle writeThrottle()
Expand All @@ -65,12 +63,13 @@ public void uninstall( Channel channel )
writeThrottle.uninstall( channel );
}

private static TransportThrottle createWriteThrottle( Config config )
private static TransportThrottle createWriteThrottle( Config config, Clock clock )
{
if ( config.get( GraphDatabaseSettings.bolt_write_throttle ) )
{
return new TransportWriteThrottle( config.get( GraphDatabaseSettings.bolt_write_buffer_low_water_mark ),
config.get( GraphDatabaseSettings.bolt_write_buffer_high_water_mark ) );
config.get( GraphDatabaseSettings.bolt_write_buffer_high_water_mark ), clock,
config.get( GraphDatabaseSettings.bolt_write_throttle_max_duration ) );
}

return NoOpTransportThrottle.INSTANCE;
Expand Down
Expand Up @@ -26,7 +26,10 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.time.DurationFormatUtils;

import java.time.Clock;
import java.time.Duration;
import java.util.function.Supplier;

/**
Expand All @@ -36,21 +39,26 @@
*/
public class TransportWriteThrottle implements TransportThrottle
{
private static final AttributeKey<ThrottleLock> LOCK_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.LOCK" );
static final AttributeKey<ThrottleLock> LOCK_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.LOCK" );
static final AttributeKey<Boolean> MAX_DURATION_EXCEEDED_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.MAX_DURATION_EXCEEDED" );
private final int lowWaterMark;
private final int highWaterMark;
private final Clock clock;
private final long maxLockDuration;
private final Supplier<ThrottleLock> lockSupplier;
private final ChannelInboundHandler listener;

public TransportWriteThrottle( int lowWaterMark, int highWaterMark )
public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration )
{
this( lowWaterMark, highWaterMark, () -> new DefaultThrottleLock() );
this( lowWaterMark, highWaterMark, clock, maxLockDuration, () -> new DefaultThrottleLock() );
}

public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Supplier<ThrottleLock> lockSupplier )
public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration, Supplier<ThrottleLock> lockSupplier )
{
this.lowWaterMark = lowWaterMark;
this.highWaterMark = highWaterMark;
this.clock = clock;
this.maxLockDuration = maxLockDuration.toMillis();
this.lockSupplier = lockSupplier;
this.listener = new ChannelStatusListener();
}
Expand All @@ -66,19 +74,43 @@ public void install( Channel channel )
}

@Override
public void acquire( Channel channel )
public void acquire( Channel channel ) throws TransportThrottleException
{
ThrottleLock lock = channel.attr( LOCK_KEY ).get();

while ( channel.isOpen() && !channel.isWritable() )
if ( !isDurationAlreadyExceeded( channel ) )
{
try
{
lock.lock( channel, 1000 );
}
catch ( InterruptedException ex )
ThrottleLock lock = channel.attr( LOCK_KEY ).get();

long startTimeMillis = 0;
while ( channel.isOpen() && !channel.isWritable() )
{
Thread.currentThread().interrupt();
if ( maxLockDuration > 0 )
{
long currentTimeMillis = clock.millis();
if ( startTimeMillis == 0 )
{
startTimeMillis = currentTimeMillis;
}
else
{
if ( currentTimeMillis - startTimeMillis > maxLockDuration )
{
setDurationExceeded( channel );

throw new TransportThrottleException( String.format(
"Bolt connection [%s] will be closed because the client did not consume outgoing buffers for %s which is not expected.",
channel.remoteAddress(), DurationFormatUtils.formatDurationHMS( maxLockDuration ) ) );
}
}
}

try
{
lock.lock( channel, 1000 );
}
catch ( InterruptedException ex )
{
Thread.currentThread().interrupt();
}
}
}
}
Expand All @@ -100,6 +132,18 @@ public void uninstall( Channel channel )
channel.attr( LOCK_KEY ).set( null );
}

private static boolean isDurationAlreadyExceeded( Channel channel )
{
Boolean marker = channel.attr( MAX_DURATION_EXCEEDED_KEY ).get();

return marker != null && marker.booleanValue();
}

private static void setDurationExceeded( Channel channel )
{
channel.attr( MAX_DURATION_EXCEEDED_KEY ).set( Boolean.TRUE );
}

@ChannelHandler.Sharable
private class ChannelStatusListener extends ChannelInboundHandlerAdapter
{
Expand Down
Expand Up @@ -98,7 +98,7 @@ private void checkLimitsOnEnqueue( int currentSize )
{
if ( log != null )
{
log.warn( "Channel [%s]: client produced %d messages on the worker queue, auto-read is being disabled.", channel.id(), currentSize );
log.warn( "Channel [%s]: client produced %d messages on the worker queue, auto-read is being disabled.", channel.remoteAddress(), currentSize );
}

channel.config().setAutoRead( false );
Expand All @@ -111,7 +111,7 @@ private void checkLimitsOnDequeue( int currentSize )
{
if ( log != null )
{
log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.id(), currentSize );
log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), currentSize );
}

channel.config().setAutoRead( true );
Expand Down
Expand Up @@ -24,7 +24,7 @@
*/
public class BoltConnectionFatality extends Exception
{
BoltConnectionFatality( String message )
protected BoltConnectionFatality( String message )
{
super( message );
}
Expand Down
Expand Up @@ -19,6 +19,9 @@
*/
package org.neo4j.bolt.v1.runtime;

import org.apache.commons.lang3.exception.ExceptionUtils;

import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -375,15 +378,9 @@ public State init( BoltStateMachine machine, String userAgent,
}
return READY;
}
catch ( AuthenticationException | AuthProviderTimeoutException | AuthProviderFailedException e )
{
fail( machine, Neo4jError.fatalFrom( e.status(), e.getMessage() ) );
throw new BoltConnectionAuthFatality( e.getMessage() );
}
catch ( Throwable t )
{
fail( machine, Neo4jError.fatalFrom( Status.General.UnknownError, t.getMessage() ) );
throw new BoltConnectionFatality( t.getMessage() );
return handleFailure( machine, t, true );
}
}
},
Expand Down Expand Up @@ -411,13 +408,11 @@ public State run( BoltStateMachine machine, String statement,
}
catch ( AuthorizationExpiredException e )
{
fail( machine, Neo4jError.fatalFrom( e ) );
throw new BoltConnectionAuthFatality( e.getMessage() );
return handleFailure( machine, e, true );
}
catch ( Throwable e )
catch ( Throwable t )
{
fail( machine, Neo4jError.from( e ) );
return FAILED;
return handleFailure( machine, t );
}
}

Expand Down Expand Up @@ -465,13 +460,11 @@ public State pullAll( BoltStateMachine machine ) throws BoltConnectionFatality
}
catch ( AuthorizationExpiredException e )
{
fail( machine, Neo4jError.fatalFrom( e ) );
throw new BoltConnectionAuthFatality( e.getMessage() );
return handleFailure( machine, e, true );
}
catch ( Throwable e )
{
fail( machine, Neo4jError.from( e ) );
return FAILED;
return handleFailure( machine, e );
}
}

Expand All @@ -487,13 +480,11 @@ public State discardAll( BoltStateMachine machine ) throws BoltConnectionFatalit
}
catch ( AuthorizationExpiredException e )
{
fail( machine, Neo4jError.fatalFrom( e ) );
throw new BoltConnectionAuthFatality( e.getMessage() );
return handleFailure( machine, e, true );
}
catch ( Throwable e )
catch ( Throwable t )
{
fail( machine, Neo4jError.from( e ) );
return FAILED;
return handleFailure( machine, t );
}
}
},
Expand Down Expand Up @@ -662,12 +653,43 @@ State resetMachine( BoltStateMachine machine ) throws BoltConnectionFatality
machine.ctx.statementProcessor.reset();
return READY;
}
catch ( Throwable e )
catch ( Throwable t )
{
return handleFailure( machine, t, true );
}
}
}

private static State handleFailure( BoltStateMachine machine, Throwable t ) throws BoltConnectionFatality
{
return handleFailure( machine, t, false );
}

private static State handleFailure( BoltStateMachine machine, Throwable t, boolean fatal ) throws BoltConnectionFatality
{
if ( ExceptionUtils.indexOfType( t, BoltConnectionFatality.class ) != -1 )
{
fatal = true;
}

return handleFailure( machine, t, fatal ? Neo4jError.fatalFrom( t ) : Neo4jError.from( t ) );
}

private static State handleFailure( BoltStateMachine machine, Throwable t, Neo4jError error ) throws BoltConnectionFatality
{
fail( machine, error );

if ( error.isFatal() )
{
if ( ExceptionUtils.indexOfType( t, AuthorizationExpiredException.class ) != -1 )
{
fail( machine, Neo4jError.fatalFrom( e ) );
throw new BoltConnectionFatality( e.getMessage() );
throw new BoltConnectionAuthFatality( t.getMessage() );
}

throw new BoltConnectionFatality( t.getMessage() );
}

return State.FAILED;
}

private static void fail( BoltStateMachine machine, Neo4jError neo4jError )
Expand Down

0 comments on commit c0a70bd

Please sign in to comment.