Skip to content

Commit

Permalink
Improved testability of BufferReusingChunkingChannelBuffer
Browse files Browse the repository at this point in the history
Now it accepts a factory of ChannelBuffers which makes testing more pleasant
compared to method overriding.
Rewritten corresponding test to use this new ability.
  • Loading branch information
lutovich committed Oct 21, 2015
1 parent 0946638 commit df3041a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.backup;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.LinkedBlockingQueue;

import org.neo4j.com.ChunkingChannelBuffer;
import org.neo4j.function.Factory;

/**
* {@linkplain ChunkingChannelBuffer Chunking buffer} that is able to reuse up to {@link #MAX_WRITE_AHEAD_CHUNKS}
Expand All @@ -43,19 +45,37 @@
*/
class BufferReusingChunkingChannelBuffer extends ChunkingChannelBuffer
{
private static final Factory<ChannelBuffer> DEFAULT_CHANNEL_BUFFER_FACTORY = new Factory<ChannelBuffer>()
{
@Override
public ChannelBuffer newInstance()
{
return ChannelBuffers.dynamicBuffer();
}
};

private final Factory<ChannelBuffer> bufferFactory;
private final Queue<ChannelBuffer> freeBuffers = new LinkedBlockingQueue<>( MAX_WRITE_AHEAD_CHUNKS );

BufferReusingChunkingChannelBuffer( ChannelBuffer initialBuffer, Channel channel, int capacity,
byte internalProtocolVersion, byte applicationProtocolVersion )
{
this( initialBuffer, DEFAULT_CHANNEL_BUFFER_FACTORY, channel, capacity, internalProtocolVersion,
applicationProtocolVersion );
}

BufferReusingChunkingChannelBuffer( ChannelBuffer initialBuffer, Factory<ChannelBuffer> bufferFactory,
Channel channel, int capacity, byte internalProtocolVersion, byte applicationProtocolVersion )
{
super( initialBuffer, channel, capacity, internalProtocolVersion, applicationProtocolVersion );
this.bufferFactory = bufferFactory;
}

@Override
protected ChannelBuffer newChannelBuffer()
{
ChannelBuffer buffer = freeBuffers.poll();
return (buffer == null) ? createNewChannelBuffer() : buffer;
return (buffer == null) ? bufferFactory.newInstance() : buffer;
}

@Override
Expand All @@ -72,9 +92,4 @@ public void operationComplete( ChannelFuture future ) throws Exception
}
};
}

ChannelBuffer createNewChannelBuffer()
{
return super.newChannelBuffer();
}
}
Expand Up @@ -25,31 +25,37 @@
import org.jboss.netty.channel.ChannelFuture;
import org.junit.Test;

import org.neo4j.function.Factory;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class BufferReusingChunkingChannelBufferTest
{
@Test
@SuppressWarnings( "unchecked" )
public void newBuffersAreCreatedIfNoFreeBuffersAreAvailable()
{
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 );
CountingChannelBufferFactory bufferFactory = new CountingChannelBufferFactory();
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBuffer( 10, bufferFactory );

buffer.writeLong( 1 );
buffer.writeLong( 2 );
buffer.writeLong( 3 );

verify( buffer, times( 3 ) ).createNewChannelBuffer();
assertEquals( 3, bufferFactory.instancesCreated );
}

@Test
@SuppressWarnings( "unchecked" )
public void freeBuffersAreReused() throws Exception
{
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 );
CountingChannelBufferFactory bufferFactory = new CountingChannelBufferFactory();
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBuffer( 10, bufferFactory );

buffer.writeLong( 1 );
buffer.writeLong( 2 );
Expand All @@ -62,16 +68,17 @@ public void freeBuffersAreReused() throws Exception
buffer.writeLong( 4 );

// 2 buffers were created
verify( buffer, times( 2 ) ).createNewChannelBuffer();
assertEquals( 2, bufferFactory.instancesCreated );

// and 2 buffers were reused
verify( reusedBuffer1 ).writeLong( 3 );
verify( reusedBuffer2 ).writeLong( 4 );
}

private static BufferReusingChunkingChannelBuffer newBufferReusingChunkingChannelBufferSpy( int capacity )
private static BufferReusingChunkingChannelBuffer newBufferReusingChunkingChannelBuffer( int capacity,
CountingChannelBufferFactory bufferFactory )
{
ChannelBuffer initialBuffer = ChannelBuffers.dynamicBuffer();
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();

Channel channel = mock( Channel.class );
ChannelFuture channelFuture = mock( ChannelFuture.class );
Expand All @@ -80,7 +87,7 @@ private static BufferReusingChunkingChannelBuffer newBufferReusingChunkingChanne
when( channel.isBound() ).thenReturn( true );
when( channel.write( anyObject() ) ).thenReturn( channelFuture );

return spy( new BufferReusingChunkingChannelBuffer( initialBuffer, channel, capacity, (byte) 1, (byte) 1 ) );
return new BufferReusingChunkingChannelBuffer( buffer, bufferFactory, channel, capacity, (byte) 1, (byte) 1 );
}

private static ChannelBuffer triggerOperationCompleteCallback( BufferReusingChunkingChannelBuffer buffer )
Expand All @@ -95,4 +102,16 @@ private static ChannelBuffer triggerOperationCompleteCallback( BufferReusingChun
buffer.newChannelFutureListener( reusedBuffer ).operationComplete( channelFuture );
return reusedBuffer;
}

private static class CountingChannelBufferFactory implements Factory<ChannelBuffer>
{
int instancesCreated;

@Override
public ChannelBuffer newInstance()
{
instancesCreated++;
return ChannelBuffers.dynamicBuffer();
}
}
}

0 comments on commit df3041a

Please sign in to comment.