Skip to content

Commit

Permalink
Make ReadLimiter tests compatible with BoltConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 2, 2018
1 parent ff206b9 commit a8384d2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 38 deletions.
Expand Up @@ -107,7 +107,7 @@ private void checkLimitsOnDequeue( BoltConnection connection, int currentSize )
{
if ( log != null )
{
log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), currentSize );
log.warn( "Channel [%s]: consumed messages on the worker queue below %d, auto-read is being enabled.", channel.remoteAddress(), lowWatermark );
}

channel.config().setAutoRead( true );
Expand Down
Expand Up @@ -24,8 +24,11 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;

import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.logging.Log;
import org.neo4j.util.FeatureToggles;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
Expand All @@ -41,18 +44,24 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class BoltChannelAutoReadLimiterTest
public class BoltConnectionReadLimiterTest
{
/*private static final Job job = s -> s.run( "INIT", null, null );
private static final Job job = s -> s.run( "INIT", null, null );
private BoltConnection connection;
private Channel channel;
private Log log;

@Before
public void setup()
{
this.channel = new EmbeddedChannel();
this.log = mock( Log.class );
channel = new EmbeddedChannel();
log = mock( Log.class );

connection = mock( BoltConnection.class );
when( connection.id() ).thenReturn( channel.id().asLongText() );
when( connection.channel() ).thenReturn( channel );
}

@Test
Expand Down Expand Up @@ -82,7 +91,7 @@ public void shouldNotDisableAutoReadBelowHighWatermark()

assertTrue( channel.config().isAutoRead() );

limiter.enqueued( job );
limiter.enqueued( connection, job );

assertTrue( channel.config().isAutoRead() );
verify( log, never() ).warn( anyString(), any(), any() );
Expand All @@ -95,9 +104,9 @@ public void shouldDisableAutoReadWhenAtHighWatermark()

assertTrue( channel.config().isAutoRead() );

limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );

assertFalse( channel.config().isAutoRead() );
verify( log ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) );
Expand All @@ -110,11 +119,11 @@ public void shouldDisableAutoReadOnlyOnceWhenAboveHighWatermark()

assertTrue( channel.config().isAutoRead() );

limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );

assertFalse( channel.config().isAutoRead() );
verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) );
Expand All @@ -127,11 +136,10 @@ public void shouldEnableAutoReadWhenAtLowWatermark()

assertTrue( channel.config().isAutoRead() );

limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.dequeued( job );
limiter.dequeued( job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.drained( connection, Arrays.asList( job, job ) );

assertTrue( channel.config().isAutoRead() );
verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) );
Expand All @@ -145,12 +153,10 @@ public void shouldEnableAutoReadOnlyOnceWhenBelowLowWatermark()

assertTrue( channel.config().isAutoRead() );

limiter.enqueued( job );
limiter.enqueued( job );
limiter.enqueued( job );
limiter.dequeued( job );
limiter.dequeued( job );
limiter.dequeued( job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.enqueued( connection, job );
limiter.drained( connection, Arrays.asList( job, job, job ) );

assertTrue( channel.config().isAutoRead() );
verify( log, times( 1 ) ).warn( contains( "disabled" ), eq( channel.remoteAddress() ), eq( 3 ) );
Expand Down Expand Up @@ -229,12 +235,12 @@ public void shouldNotAcceptNegativeHighWatermark()

private BoltConnectionReadLimiter newLimiter( int low, int high )
{
return new BoltConnectionReadLimiter( channel, log, low, high );
return new BoltConnectionReadLimiter( log, low, high );
}

private BoltConnectionReadLimiter newLimiterWithDefaults()
{
return new BoltConnectionReadLimiter( channel, log );
}*/
return new BoltConnectionReadLimiter( log );
}

}
Expand Up @@ -165,11 +165,7 @@ public void failingJobShouldLogAndStopConnection() throws Throwable
{
String id = UUID.randomUUID().toString();
BoltConnection connection = newConnection( id );
//when( connection.processNextBatch() ).thenThrow( new RuntimeException( "some unexpected error" ) );
doAnswer( inv ->
{
throw new RuntimeException( "some unexpected error" );
} ).when( connection ).processNextBatch();
when( connection.processNextBatch() ).thenThrow( new RuntimeException( "some unexpected error" ) );

boltScheduler.start();
boltScheduler.created( connection );
Expand All @@ -191,11 +187,7 @@ public void successfulJobsShouldTriggerSchedulingOfPendingJobs() throws Throwabl
String id = UUID.randomUUID().toString();
BoltConnection connection = newConnection( id );
AtomicInteger counter = new AtomicInteger( 0 );
doAnswer( inv ->
{
counter.incrementAndGet();
return true;
} ).when( connection ).processNextBatch();
doAnswer( inv -> counter.incrementAndGet() > 0 ).when( connection ).processNextBatch();
when( connection.hasPendingJobs() ).thenReturn( true ).thenReturn( false );

boltScheduler.start();
Expand Down

0 comments on commit a8384d2

Please sign in to comment.