diff --git a/community/bolt/pom.xml b/community/bolt/pom.xml index 9171ddadbcf18..5aac8b3f0e114 100644 --- a/community/bolt/pom.xml +++ b/community/bolt/pom.xml @@ -86,6 +86,11 @@ annotations + + org.apache.commons + commons-lang3 + + junit @@ -168,10 +173,5 @@ websocket-client test - - org.apache.commons - commons-lang3 - test - diff --git a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java index a1cf8b74d2c96..969a296a5b26b 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java @@ -148,7 +148,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) WorkerFactory workerFactory = createWorkerFactory( boltFactory, scheduler, dependencies, logService, clock ); ConnectorPortRegister connectionRegister = dependencies.connectionRegister(); - TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config ); + TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock ); BoltProtocolHandlerFactory handlerFactory = createHandlerFactory( workerFactory, throttleGroup, logService ); Map connectors = config.enabledBoltConnectors().stream() diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottle.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottle.java index 76b7bdeb1ba8f..268eb39cce477 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottle.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottle.java @@ -35,8 +35,9 @@ public interface TransportThrottle * Apply throttling logic for the given channel.. * * @param channel the netty channel to which this throttling logic should be applied + * @throws TransportThrottleException when throttle decides this connection should be halted */ - void acquire( Channel channel ); + void acquire( Channel channel ) throws TransportThrottleException; /** * Release throttling for the given channel (if applied).. diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleException.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleException.java new file mode 100644 index 0000000000000..4d005aa35df19 --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.transport; + +import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; + +public class TransportThrottleException extends BoltConnectionFatality +{ + + public TransportThrottleException( String message ) + { + super( message ); + } + +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleGroup.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleGroup.java index 47bf7ddada36f..58cb884e0fff9 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleGroup.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportThrottleGroup.java @@ -20,10 +20,8 @@ package org.neo4j.bolt.transport; import io.netty.channel.Channel; -import io.netty.util.AttributeKey; -import java.util.ArrayList; -import java.util.List; +import java.time.Clock; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.configuration.Config; @@ -45,9 +43,9 @@ private TransportThrottleGroup() this.writeThrottle = NoOpTransportThrottle.INSTANCE; } - public TransportThrottleGroup( Config config ) + public TransportThrottleGroup( Config config, Clock clock ) { - this.writeThrottle = createWriteThrottle( config ); + this.writeThrottle = createWriteThrottle( config, clock ); } public TransportThrottle writeThrottle() @@ -65,12 +63,13 @@ public void uninstall( Channel channel ) writeThrottle.uninstall( channel ); } - private static TransportThrottle createWriteThrottle( Config config ) + private static TransportThrottle createWriteThrottle( Config config, Clock clock ) { if ( config.get( GraphDatabaseSettings.bolt_write_throttle ) ) { return new TransportWriteThrottle( config.get( GraphDatabaseSettings.bolt_write_buffer_low_water_mark ), - config.get( GraphDatabaseSettings.bolt_write_buffer_high_water_mark ) ); + config.get( GraphDatabaseSettings.bolt_write_buffer_high_water_mark ), clock, + config.get( GraphDatabaseSettings.bolt_write_throttle_max_duration ) ); } return NoOpTransportThrottle.INSTANCE; diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportWriteThrottle.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportWriteThrottle.java index 3db2950da96ac..ac020a9eb0515 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportWriteThrottle.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportWriteThrottle.java @@ -26,7 +26,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.WriteBufferWaterMark; import io.netty.util.AttributeKey; +import org.apache.commons.lang3.time.DurationFormatUtils; +import java.time.Clock; +import java.time.Duration; import java.util.function.Supplier; /** @@ -36,21 +39,26 @@ */ public class TransportWriteThrottle implements TransportThrottle { - private static final AttributeKey LOCK_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.LOCK" ); + static final AttributeKey LOCK_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.LOCK" ); + static final AttributeKey MAX_DURATION_EXCEEDED_KEY = AttributeKey.valueOf( "BOLT.WRITE_THROTTLE.MAX_DURATION_EXCEEDED" ); private final int lowWaterMark; private final int highWaterMark; + private final Clock clock; + private final long maxLockDuration; private final Supplier lockSupplier; private final ChannelInboundHandler listener; - public TransportWriteThrottle( int lowWaterMark, int highWaterMark ) + public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration ) { - this( lowWaterMark, highWaterMark, () -> new DefaultThrottleLock() ); + this( lowWaterMark, highWaterMark, clock, maxLockDuration, () -> new DefaultThrottleLock() ); } - public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Supplier lockSupplier ) + public TransportWriteThrottle( int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration, Supplier lockSupplier ) { this.lowWaterMark = lowWaterMark; this.highWaterMark = highWaterMark; + this.clock = clock; + this.maxLockDuration = maxLockDuration.toMillis(); this.lockSupplier = lockSupplier; this.listener = new ChannelStatusListener(); } @@ -66,19 +74,43 @@ public void install( Channel channel ) } @Override - public void acquire( Channel channel ) + public void acquire( Channel channel ) throws TransportThrottleException { - ThrottleLock lock = channel.attr( LOCK_KEY ).get(); - - while ( channel.isOpen() && !channel.isWritable() ) + if ( !isDurationAlreadyExceeded( channel ) ) { - try - { - lock.lock( channel, 1000 ); - } - catch ( InterruptedException ex ) + ThrottleLock lock = channel.attr( LOCK_KEY ).get(); + + long startTimeMillis = 0; + while ( channel.isOpen() && !channel.isWritable() ) { - Thread.currentThread().interrupt(); + if ( maxLockDuration > 0 ) + { + long currentTimeMillis = clock.millis(); + if ( startTimeMillis == 0 ) + { + startTimeMillis = currentTimeMillis; + } + else + { + if ( currentTimeMillis - startTimeMillis > maxLockDuration ) + { + setDurationExceeded( channel ); + + throw new TransportThrottleException( String.format( + "Bolt connection [%s] will be closed because the client did not consume outgoing buffers for %s which is not expected.", + channel.remoteAddress(), DurationFormatUtils.formatDurationHMS( maxLockDuration ) ) ); + } + } + } + + try + { + lock.lock( channel, 1000 ); + } + catch ( InterruptedException ex ) + { + Thread.currentThread().interrupt(); + } } } } @@ -100,6 +132,18 @@ public void uninstall( Channel channel ) channel.attr( LOCK_KEY ).set( null ); } + private static boolean isDurationAlreadyExceeded( Channel channel ) + { + Boolean marker = channel.attr( MAX_DURATION_EXCEEDED_KEY ).get(); + + return marker != null && marker.booleanValue(); + } + + private static void setDurationExceeded( Channel channel ) + { + channel.attr( MAX_DURATION_EXCEEDED_KEY ).set( Boolean.TRUE ); + } + @ChannelHandler.Sharable private class ChannelStatusListener extends ChannelInboundHandlerAdapter { diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiter.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiter.java index 182638edc9299..c6faa5857060b 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiter.java @@ -98,7 +98,7 @@ private void checkLimitsOnEnqueue( int currentSize ) { if ( log != null ) { - log.warn( "Channel [%s]: client produced %d messages on the worker queue, auto-read is being disabled.", channel.id(), currentSize ); + log.warn( "Channel [%s]: client produced %d messages on the worker queue, auto-read is being disabled.", channel.remoteAddress(), currentSize ); } channel.config().setAutoRead( false ); @@ -111,7 +111,7 @@ private void checkLimitsOnDequeue( int currentSize ) { if ( log != null ) { - log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.id(), currentSize ); + log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), currentSize ); } channel.config().setAutoRead( true ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltConnectionFatality.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltConnectionFatality.java index a527647b22de2..7efe6eaa5d3a2 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltConnectionFatality.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltConnectionFatality.java @@ -24,7 +24,7 @@ */ public class BoltConnectionFatality extends Exception { - BoltConnectionFatality( String message ) + protected BoltConnectionFatality( String message ) { super( message ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java index 4a11ed208cfdc..ddf1994a1edc0 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java @@ -19,6 +19,9 @@ */ package org.neo4j.bolt.v1.runtime; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.io.IOException; import java.time.Clock; import java.util.Map; import java.util.UUID; @@ -375,15 +378,9 @@ public State init( BoltStateMachine machine, String userAgent, } return READY; } - catch ( AuthenticationException | AuthProviderTimeoutException | AuthProviderFailedException e ) - { - fail( machine, Neo4jError.fatalFrom( e.status(), e.getMessage() ) ); - throw new BoltConnectionAuthFatality( e.getMessage() ); - } catch ( Throwable t ) { - fail( machine, Neo4jError.fatalFrom( Status.General.UnknownError, t.getMessage() ) ); - throw new BoltConnectionFatality( t.getMessage() ); + return handleFailure( machine, t, true ); } } }, @@ -411,13 +408,11 @@ public State run( BoltStateMachine machine, String statement, } catch ( AuthorizationExpiredException e ) { - fail( machine, Neo4jError.fatalFrom( e ) ); - throw new BoltConnectionAuthFatality( e.getMessage() ); + return handleFailure( machine, e, true ); } - catch ( Throwable e ) + catch ( Throwable t ) { - fail( machine, Neo4jError.from( e ) ); - return FAILED; + return handleFailure( machine, t ); } } @@ -465,13 +460,11 @@ public State pullAll( BoltStateMachine machine ) throws BoltConnectionFatality } catch ( AuthorizationExpiredException e ) { - fail( machine, Neo4jError.fatalFrom( e ) ); - throw new BoltConnectionAuthFatality( e.getMessage() ); + return handleFailure( machine, e, true ); } catch ( Throwable e ) { - fail( machine, Neo4jError.from( e ) ); - return FAILED; + return handleFailure( machine, e ); } } @@ -487,13 +480,11 @@ public State discardAll( BoltStateMachine machine ) throws BoltConnectionFatalit } catch ( AuthorizationExpiredException e ) { - fail( machine, Neo4jError.fatalFrom( e ) ); - throw new BoltConnectionAuthFatality( e.getMessage() ); + return handleFailure( machine, e, true ); } - catch ( Throwable e ) + catch ( Throwable t ) { - fail( machine, Neo4jError.from( e ) ); - return FAILED; + return handleFailure( machine, t ); } } }, @@ -662,12 +653,43 @@ State resetMachine( BoltStateMachine machine ) throws BoltConnectionFatality machine.ctx.statementProcessor.reset(); return READY; } - catch ( Throwable e ) + catch ( Throwable t ) + { + return handleFailure( machine, t, true ); + } + } + } + + private static State handleFailure( BoltStateMachine machine, Throwable t ) throws BoltConnectionFatality + { + return handleFailure( machine, t, false ); + } + + private static State handleFailure( BoltStateMachine machine, Throwable t, boolean fatal ) throws BoltConnectionFatality + { + if ( ExceptionUtils.indexOfType( t, BoltConnectionFatality.class ) != -1 ) + { + fatal = true; + } + + return handleFailure( machine, t, fatal ? Neo4jError.fatalFrom( t ) : Neo4jError.from( t ) ); + } + + private static State handleFailure( BoltStateMachine machine, Throwable t, Neo4jError error ) throws BoltConnectionFatality + { + fail( machine, error ); + + if ( error.isFatal() ) + { + if ( ExceptionUtils.indexOfType( t, AuthorizationExpiredException.class ) != -1 ) { - fail( machine, Neo4jError.fatalFrom( e ) ); - throw new BoltConnectionFatality( e.getMessage() ); + throw new BoltConnectionAuthFatality( t.getMessage() ); } + + throw new BoltConnectionFatality( t.getMessage() ); } + + return State.FAILED; } private static void fail( BoltStateMachine machine, Neo4jError neo4jError ) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/ChunkedOutput.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/ChunkedOutput.java index f92747ad42d68..e8c240f5506b1 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/ChunkedOutput.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/ChunkedOutput.java @@ -27,11 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.bolt.transport.TransportThrottle; +import org.neo4j.bolt.transport.TransportThrottleException; import org.neo4j.bolt.transport.TransportThrottleGroup; +import org.neo4j.bolt.v1.messaging.BoltIOException; import org.neo4j.bolt.v1.messaging.BoltResponseMessageBoundaryHook; import org.neo4j.bolt.v1.packstream.PackOutput; import org.neo4j.bolt.v1.packstream.PackOutputClosedException; import org.neo4j.bolt.v1.packstream.PackStream; +import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; +import org.neo4j.kernel.api.exceptions.Status; import static java.lang.Math.max; @@ -82,7 +86,14 @@ public synchronized PackOutput flush() throws IOException // check for and apply write throttles if ( throttleGroup != null ) { - throttleGroup.writeThrottle().acquire( channel ); + try + { + throttleGroup.writeThrottle().acquire( channel ); + } + catch ( TransportThrottleException ex ) + { + throw new BoltIOException( Status.Request.InvalidUsage, ex.getMessage(), ex ); + } } // Local copy and clear the buffer field. This ensures that the buffer is not re-released if the flush call fails diff --git a/community/bolt/src/test/java/org/neo4j/bolt/transport/TransportWriteThrottleTest.java b/community/bolt/src/test/java/org/neo4j/bolt/transport/TransportWriteThrottleTest.java index 91c176d8ced62..8064ed8455ff0 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/transport/TransportWriteThrottleTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/transport/TransportWriteThrottleTest.java @@ -32,17 +32,30 @@ import org.junit.Test; import org.mockito.Answers; import org.mockito.ArgumentCaptor; - +import org.w3c.dom.Attr; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Clock; +import java.time.Duration; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; import org.neo4j.test.rule.concurrent.OtherThreadRule; +import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -78,10 +91,15 @@ public void setup() lockAttribute = mock( Attribute.class ); when( lockAttribute.get() ).thenReturn( lock ); + Attribute durationExceedAttribute = mock( Attribute.class ); + when( durationExceedAttribute.get() ).thenReturn( null ); + channel = mock( SocketChannel.class, Answers.RETURNS_MOCKS ); when( channel.config() ).thenReturn( config ); when( channel.isOpen() ).thenReturn( true ); - when( channel.attr( any() ) ).thenReturn( lockAttribute ); + when( channel.remoteAddress() ).thenReturn( InetSocketAddress.createUnresolved( "localhost", 32000 ) ); + when( channel.attr( TransportWriteThrottle.LOCK_KEY ) ).thenReturn( lockAttribute ); + when( channel.attr( TransportWriteThrottle.MAX_DURATION_EXCEEDED_KEY ) ).thenReturn( durationExceedAttribute ); ChannelPipeline pipeline = channel.pipeline(); when( channel.pipeline() ).thenReturn( pipeline ); @@ -115,7 +133,11 @@ public void shouldNotLockWhenWritable() throws Exception when( channel.isWritable() ).thenReturn( true ); // when - Future future = Executors.newSingleThreadExecutor().submit( () -> throttle.acquire( channel ) ); + Future future = otherThread.execute( state -> + { + throttle.acquire( channel ); + return null; + } ); // expect try @@ -140,7 +162,11 @@ public void shouldLockWhenNotWritable() throws Exception when( channel.isWritable() ).thenReturn( false ); // when - Future future = Executors.newSingleThreadExecutor().submit( () -> throttle.acquire( channel ) ); + Future future = otherThread.execute( state -> + { + throttle.acquire( channel ); + return null; + } ); // expect try @@ -203,12 +229,45 @@ public void shouldResumeWhenWritabilityChanged() throws Exception assertThat( lockOverride.unlockCallCount(), is( 1 ) ); } + @Test + public void shouldThrowThrottleExceptionWhenMaxDurationIsReached() throws Exception + { + // given + TestThrottleLock lockOverride = new TestThrottleLock(); + FakeClock clock = Clocks.fakeClock( 1, TimeUnit.SECONDS ); + TransportThrottle throttle = newThrottleAndInstall( channel, lockOverride, clock, Duration.ofSeconds( 5 ) ); + when( channel.isWritable() ).thenReturn( false ); + + // when + Future future = otherThread.execute( state -> + { + throttle.acquire( channel ); + return null; + } ); + + otherThread.get().waitUntilWaiting(); + clock.forward( 6, TimeUnit.SECONDS ); + + // expect + try + { + future.get( 1, TimeUnit.MINUTES ); + + fail( "expecting ExecutionException" ); + } + catch ( ExecutionException ex ) + { + assertThat( ex.getCause(), instanceOf( TransportThrottleException.class ) ); + assertThat( ex.getMessage(), containsString( "will be closed because the client did not consume outgoing buffers for" ) ); + } + } + private TransportThrottle newThrottle() { - return newThrottle( null ); + return newThrottle( null, Clocks.systemClock(), Duration.ZERO ); } - private TransportThrottle newThrottle( ThrottleLock lockOverride ) + private TransportThrottle newThrottle( ThrottleLock lockOverride, Clock clock, Duration maxLockDuration ) { if ( lockOverride != null ) { @@ -217,7 +276,7 @@ private TransportThrottle newThrottle( ThrottleLock lockOverride ) when( lockAttribute.get() ).thenReturn( lockOverride ); } - return new TransportWriteThrottle( 64, 256, () -> lock ); + return new TransportWriteThrottle( 64, 256, clock, maxLockDuration, () -> lock ); } private TransportThrottle newThrottleAndInstall( Channel channel ) @@ -227,7 +286,12 @@ private TransportThrottle newThrottleAndInstall( Channel channel ) private TransportThrottle newThrottleAndInstall( Channel channel, ThrottleLock lockOverride ) { - TransportThrottle throttle = newThrottle( lockOverride ); + return newThrottleAndInstall( channel, lockOverride, Clocks.systemClock(), Duration.ZERO ); + } + + private TransportThrottle newThrottleAndInstall( Channel channel, ThrottleLock lockOverride, Clock clock, Duration maxLockDuration ) + { + TransportThrottle throttle = newThrottle( lockOverride, clock, maxLockDuration ); throttle.install( channel ); @@ -243,7 +307,7 @@ private static class TestThrottleLock implements ThrottleLock @Override public void lock( Channel channel, long timeout ) throws InterruptedException { - actualLock.lock( channel, 0 ); + actualLock.lock( channel, timeout ); lockCount.incrementAndGet(); } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiterTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiterTest.java index 3f3817f7c5dde..1f665ac7898a0 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiterTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltChannelAutoReadLimiterTest.java @@ -100,7 +100,7 @@ public void shouldDisableAutoReadWhenAtHighWatermark() limiter.enqueued( job ); assertFalse( channel.config().isAutoRead() ); - verify( log ).warn( contains( "disabled" ), eq( channel.id() ), eq( 3 ) ); + verify( log ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); } @Test @@ -117,7 +117,7 @@ public void shouldDisableAutoReadOnlyOnceWhenAboveHighWatermark() limiter.enqueued( job ); assertFalse( channel.config().isAutoRead() ); - verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.id() ), eq( 3 ) ); + verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); } @Test @@ -134,8 +134,8 @@ public void shouldEnableAutoReadWhenAtLowWatermark() limiter.dequeued( job ); assertTrue( channel.config().isAutoRead() ); - verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.id() ), eq( 3 ) ); - verify( log, times( 1 ) ).warn( contains( "enabled" ), eq( channel.id() ), eq( 1 ) ); + verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); + verify( log, times( 1 ) ).warn( contains( "enabled" ), eq( channel.remoteAddress() ), eq( 1 ) ); } @Test @@ -153,8 +153,8 @@ public void shouldEnableAutoReadOnlyOnceWhenBelowLowWatermark() limiter.dequeued( job ); assertTrue( channel.config().isAutoRead() ); - verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.id() ), eq( 3 ) ); - verify( log, times( 1 ) ).warn( contains( "enabled" ), eq( channel.id() ), eq( 1 ) ); + verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); + verify( log, times( 1 ) ).warn( contains( "enabled" ), eq( channel.remoteAddress() ), eq( 1 ) ); } @Test diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java new file mode 100644 index 0000000000000..aa32a505460cd --- /dev/null +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v1.transport.integration; + +import org.apache.commons.lang3.StringUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.net.SocketException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.neo4j.bolt.v1.runtime.BoltChannelAutoReadLimiter; +import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; +import org.neo4j.bolt.v1.runtime.WorkerFactory; +import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection; +import org.neo4j.bolt.v1.transport.socket.client.SecureWebSocketConnection; +import org.neo4j.bolt.v1.transport.socket.client.SocketConnection; +import org.neo4j.bolt.v1.transport.socket.client.TransportConnection; +import org.neo4j.bolt.v1.transport.socket.client.WebSocketConnection; +import org.neo4j.collection.RawIterator; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.helpers.HostnamePort; +import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.api.proc.CallableProcedure; +import org.neo4j.kernel.api.proc.Context; +import org.neo4j.kernel.api.proc.Neo4jTypes; +import org.neo4j.kernel.api.proc.ProcedureSignature; +import org.neo4j.kernel.impl.proc.Procedures; +import org.neo4j.kernel.impl.util.ValueUtils; +import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.logging.AssertableLogProvider; +import org.neo4j.test.TestGraphDatabaseFactory; +import org.neo4j.test.assertion.Assert; +import org.neo4j.test.matchers.CommonMatchers; +import org.neo4j.test.matchers.ExceptionMessageMatcher; +import org.neo4j.test.mockito.matcher.LogMatchers; +import org.neo4j.test.rule.concurrent.OtherThreadRule; +import org.neo4j.test.rule.fs.EphemeralFileSystemRule; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.CoreMatchers.both; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.neo4j.bolt.v1.messaging.message.DiscardAllMessage.discardAll; +import static org.neo4j.bolt.v1.messaging.message.InitMessage.init; +import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll; +import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; +import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess; +import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; +import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; +import static org.neo4j.test.matchers.CommonMatchers.matchesExceptionMessage; + +public class BoltThrottleMaxDurationIT +{ + private AssertableLogProvider logProvider; + private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); + private Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(), + fsRule::get, getSettingsFunction() ); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server ); + @Rule + public OtherThreadRule otherThread = new OtherThreadRule<>( 5, TimeUnit.MINUTES ); + + public TransportConnection connection = new SocketConnection(); + + private HostnamePort address; + + protected TestGraphDatabaseFactory getTestGraphDatabaseFactory() + { + TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory(); + + logProvider = new AssertableLogProvider(); + + factory.setInternalLogProvider( logProvider ); + //factory.setUserLogProvider( logProvider ); + + return factory; + + } + + protected Consumer> getSettingsFunction() + { + return settings -> + { + settings.put( GraphDatabaseSettings.auth_enabled.name(), "false" ); + settings.put( GraphDatabaseSettings.bolt_write_throttle_max_duration.name(), "30s" ); + }; + } + + @Before + public void setup() throws Exception + { + address = server.lookupDefaultConnector(); + } + + @After + public void after() throws Exception + { + if ( connection != null ) + { + connection.disconnect(); + } + } + + @Test + public void sendingButNotReceivingClientShouldBeKilledWhenWriteThrottleMaxDurationIsReached() throws Exception + { + int numberOfRunDiscardPairs = 10_000; + String largeString = StringUtils.repeat( " ", 8 * 1024 ); + + connection.connect( address ) + .send( TransportTestUtil.acceptedVersions( 1, 0, 0, 0 ) ) + .send( TransportTestUtil.chunk( + init( "TestClient/1.1", emptyMap() ) ) ); + + assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, eventuallyReceives( msgSuccess() ) ); + + Future sender = otherThread.execute( state -> + { + for ( int i = 0; i < numberOfRunDiscardPairs; i++ ) + { + connection.send( TransportTestUtil.chunk( + run( "RETURN $data as data", ValueUtils.asMapValue( singletonMap( "data", largeString ) ) ), + pullAll() + ) ); + } + + return null; + } ); + + try + { + otherThread.get().awaitFuture( sender ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), Matchers.instanceOf( SocketException.class ) ); + } + + logProvider.assertAtLeastOnce( + AssertableLogProvider.inLog( Matchers.containsString( WorkerFactory.class.getPackage().getName() ) ).error( containsString( "crashed" ), + matchesExceptionMessage( containsString( "will be closed because the client did not consume outgoing buffers for " ) ) ) ); + } + +} diff --git a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java index d2f47b0e71f59..1dcf14b549a66 100644 --- a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java +++ b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java @@ -103,7 +103,9 @@ enum Request implements Status "The client provided a request that was missing required fields, or had values that are not allowed." ), TransactionRequired( ClientError, "The request cannot be performed outside of a transaction, and there is no transaction present to " + - "use. Wrap your request in a transaction and retry." ); + "use. Wrap your request in a transaction and retry." ), + InvalidUsage( ClientError, // TODO: see above + "The client made a request but did not consume outgoing buffers in a timely fashion." ); private final Code code; @Override diff --git a/community/common/src/test/java/org/neo4j/test/matchers/CommonMatchers.java b/community/common/src/test/java/org/neo4j/test/matchers/CommonMatchers.java index 10dc6f0b5520e..a218ee4454990 100644 --- a/community/common/src/test/java/org/neo4j/test/matchers/CommonMatchers.java +++ b/community/common/src/test/java/org/neo4j/test/matchers/CommonMatchers.java @@ -48,6 +48,17 @@ public static Matcher> matchesOneToOneInAnyOrder( Matche return new MatchesOneToOneInAnyOrder<>( expectedMatchers ); } + /** + * Checks that an exception message matches given matcher + * + * @param matcher + * @return + */ + public static Matcher matchesExceptionMessage( Matcher matcher ) + { + return new ExceptionMessageMatcher( matcher ); + } + /** * Checks that exception has expected array or suppressed exceptions. * diff --git a/community/common/src/test/java/org/neo4j/test/matchers/ExceptionMessageMatcher.java b/community/common/src/test/java/org/neo4j/test/matchers/ExceptionMessageMatcher.java new file mode 100644 index 0000000000000..403a953f5d0f6 --- /dev/null +++ b/community/common/src/test/java/org/neo4j/test/matchers/ExceptionMessageMatcher.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.test.matchers; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +public class ExceptionMessageMatcher extends TypeSafeMatcher +{ + private final Matcher matcher; + + public ExceptionMessageMatcher( Matcher matcher ) + { + this.matcher = matcher; + } + + @Override + protected boolean matchesSafely( Throwable throwable ) + { + return matcher.matches( throwable.getMessage() ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "expect message to be " ).appendDescriptionOf( matcher ); + } + +} diff --git a/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java b/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java index c15cef01f905d..cd496b53ada21 100644 --- a/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java +++ b/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java @@ -791,6 +791,13 @@ public enum LabelIndex buildSetting( "unsupported.dbms.bolt.write_throttle.low_watermark", INTEGER, String.valueOf( ByteUnit.kibiBytes( 128 ) ) ).constraint( range( (int) ByteUnit.kibiBytes( 16 ), Integer.MAX_VALUE ) ).build(); + @Description( "When the total time write throttle lock is held exceeds this value, the corresponding bolt channel will be aborted. Setting " + + " this to 0 will disable this behaviour." ) + @Internal + public static final Setting bolt_write_throttle_max_duration = + buildSetting( "unsupported.dbms.bolt.write_throttle.max_duration", DURATION, "15m" ).constraint( + min( Duration.ofSeconds( 30 ) ) ).build(); + @Description( "Create an archive of an index before re-creating it if failing to load on startup." ) @Internal public static final Setting archive_failed_index = setting(