From a8384d201bb5a2f39a2a4c7394a3f70a544c500e Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Sun, 11 Feb 2018 23:29:34 +0000 Subject: [PATCH] Make ReadLimiter tests compatible with BoltConnection --- .../runtime/BoltConnectionReadLimiter.java | 2 +- ...ava => BoltConnectionReadLimiterTest.java} | 60 ++++++++++--------- .../runtime/ExecutorBoltSchedulerTest.java | 12 +--- 3 files changed, 36 insertions(+), 38 deletions(-) rename community/bolt/src/test/java/org/neo4j/bolt/runtime/{BoltChannelAutoReadLimiterTest.java => BoltConnectionReadLimiterTest.java} (81%) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiter.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiter.java index 65fbbcaf03132..5998718554086 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiter.java @@ -107,7 +107,7 @@ private void checkLimitsOnDequeue( BoltConnection connection, int currentSize ) { if ( log != null ) { - log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), currentSize ); + log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), lowWatermark ); } channel.config().setAutoRead( true ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltChannelAutoReadLimiterTest.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiterTest.java similarity index 81% rename from community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltChannelAutoReadLimiterTest.java rename to community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiterTest.java index 9151901a17a20..b0a25ca4fb476 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltChannelAutoReadLimiterTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/BoltConnectionReadLimiterTest.java @@ -24,8 +24,11 @@ import org.junit.Before; import org.junit.Test; +import java.util.Arrays; + import org.neo4j.bolt.v1.runtime.Job; import org.neo4j.logging.Log; +import org.neo4j.util.FeatureToggles; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.startsWith; @@ -41,18 +44,24 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -public class BoltChannelAutoReadLimiterTest +public class BoltConnectionReadLimiterTest { - /*private static final Job job = s -> s.run( "INIT", null, null ); + private static final Job job = s -> s.run( "INIT", null, null ); + private BoltConnection connection; private Channel channel; private Log log; @Before public void setup() { - this.channel = new EmbeddedChannel(); - this.log = mock( Log.class ); + channel = new EmbeddedChannel(); + log = mock( Log.class ); + + connection = mock( BoltConnection.class ); + when( connection.id() ).thenReturn( channel.id().asLongText() ); + when( connection.channel() ).thenReturn( channel ); } @Test @@ -82,7 +91,7 @@ public void shouldNotDisableAutoReadBelowHighWatermark() assertTrue( channel.config().isAutoRead() ); - limiter.enqueued( job ); + limiter.enqueued( connection, job ); assertTrue( channel.config().isAutoRead() ); verify( log, never() ).warn( anyString(), any(), any() ); @@ -95,9 +104,9 @@ public void shouldDisableAutoReadWhenAtHighWatermark() assertTrue( channel.config().isAutoRead() ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); assertFalse( channel.config().isAutoRead() ); verify( log ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); @@ -110,11 +119,11 @@ public void shouldDisableAutoReadOnlyOnceWhenAboveHighWatermark() assertTrue( channel.config().isAutoRead() ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); assertFalse( channel.config().isAutoRead() ); verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); @@ -127,11 +136,10 @@ public void shouldEnableAutoReadWhenAtLowWatermark() assertTrue( channel.config().isAutoRead() ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.dequeued( job ); - limiter.dequeued( job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.drained( connection, Arrays.asList( job, job ) ); assertTrue( channel.config().isAutoRead() ); verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); @@ -145,12 +153,10 @@ public void shouldEnableAutoReadOnlyOnceWhenBelowLowWatermark() assertTrue( channel.config().isAutoRead() ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.enqueued( job ); - limiter.dequeued( job ); - limiter.dequeued( job ); - limiter.dequeued( job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.enqueued( connection, job ); + limiter.drained( connection, Arrays.asList( job, job, job ) ); assertTrue( channel.config().isAutoRead() ); verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) ); @@ -229,12 +235,12 @@ public void shouldNotAcceptNegativeHighWatermark() private BoltConnectionReadLimiter newLimiter( int low, int high ) { - return new BoltConnectionReadLimiter( channel, log, low, high ); + return new BoltConnectionReadLimiter( log, low, high ); } private BoltConnectionReadLimiter newLimiterWithDefaults() { - return new BoltConnectionReadLimiter( channel, log ); - }*/ + return new BoltConnectionReadLimiter( log ); + } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java index 5cccf0b0efd44..1f39d50b5577e 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java @@ -165,11 +165,7 @@ public void failingJobShouldLogAndStopConnection() throws Throwable { String id = UUID.randomUUID().toString(); BoltConnection connection = newConnection( id ); - //when( connection.processNextBatch() ).thenThrow( new RuntimeException( "some unexpected error" ) ); - doAnswer( inv -> - { - throw new RuntimeException( "some unexpected error" ); - } ).when( connection ).processNextBatch(); + when( connection.processNextBatch() ).thenThrow( new RuntimeException( "some unexpected error" ) ); boltScheduler.start(); boltScheduler.created( connection ); @@ -191,11 +187,7 @@ public void successfulJobsShouldTriggerSchedulingOfPendingJobs() throws Throwabl String id = UUID.randomUUID().toString(); BoltConnection connection = newConnection( id ); AtomicInteger counter = new AtomicInteger( 0 ); - doAnswer( inv -> - { - counter.incrementAndGet(); - return true; - } ).when( connection ).processNextBatch(); + doAnswer( inv -> counter.incrementAndGet() > 0 ).when( connection ).processNextBatch(); when( connection.hasPendingJobs() ).thenReturn( true ).thenReturn( false ); boltScheduler.start();