diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/BackupServer.java b/enterprise/backup/src/main/java/org/neo4j/backup/BackupServer.java index 284f60197b27d..308c9cf0e59bd 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/BackupServer.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/BackupServer.java @@ -19,9 +19,11 @@ */ package org.neo4j.backup; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.neo4j.backup.BackupClient.BackupRequestType; +import org.neo4j.com.ChunkingChannelBuffer; import org.neo4j.com.Client; import org.neo4j.com.Protocol; import org.neo4j.com.ProtocolVersion; @@ -36,34 +38,64 @@ import static org.neo4j.helpers.Clock.SYSTEM_CLOCK; -class BackupServer extends Server +class BackupServer extends Server { + private static final long DEFAULT_OLD_CHANNEL_THRESHOLD = Client.DEFAULT_READ_RESPONSE_TIMEOUT_SECONDS * 1000; + private static final int DEFAULT_MAX_CONCURRENT_TX = 3; + + private static final BackupRequestType[] contexts = BackupRequestType.values(); + static final byte PROTOCOL_VERSION = 1; - private final BackupRequestType[] contexts = BackupRequestType.values(); - static int DEFAULT_PORT = 6362; + static final int DEFAULT_PORT = 6362; static final int FRAME_LENGTH = Protocol.MEGA * 4; - public BackupServer( TheBackupInterface requestTarget, final HostnamePort server, - Logging logging, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor ) + BackupServer( TheBackupInterface requestTarget, HostnamePort server, Logging logging, + ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor ) + { + super( requestTarget, newBackupConfig( FRAME_LENGTH, server ), logging, FRAME_LENGTH, + new ProtocolVersion( PROTOCOL_VERSION, ProtocolVersion.INTERNAL_PROTOCOL_VERSION ), + TxChecksumVerifier.ALWAYS_MATCH, SYSTEM_CLOCK, byteCounterMonitor, requestMonitor ); + } + + @Override + protected RequestType getRequestContext( byte id ) + { + return contexts[id]; + } + + @Override + protected void finishOffChannel( Channel channel, RequestContext context ) { - super( requestTarget, new Configuration() + } + + @Override + protected ChunkingChannelBuffer newChunkingBuffer( ChannelBuffer bufferToWriteTo, Channel channel, int capacity, + byte internalProtocolVersion, byte applicationProtocolVersion ) + { + return new BufferReusingChunkingChannelBuffer( bufferToWriteTo, channel, capacity, internalProtocolVersion, + applicationProtocolVersion ); + } + + private static Configuration newBackupConfig( final int chunkSize, final HostnamePort server ) + { + return new Configuration() { @Override public long getOldChannelThreshold() { - return Client.DEFAULT_READ_RESPONSE_TIMEOUT_SECONDS * 1000; + return DEFAULT_OLD_CHANNEL_THRESHOLD; } @Override public int getMaxConcurrentTransactions() { - return 3; + return DEFAULT_MAX_CONCURRENT_TX; } @Override public int getChunkSize() { - return FRAME_LENGTH; + return chunkSize; } @Override @@ -71,25 +103,6 @@ public HostnamePort getServerAddress() { return server; } - }, logging, FRAME_LENGTH, new ProtocolVersion( PROTOCOL_VERSION, - ProtocolVersion.INTERNAL_PROTOCOL_VERSION ), - TxChecksumVerifier.ALWAYS_MATCH, SYSTEM_CLOCK, byteCounterMonitor, requestMonitor ); - } - - @Override - protected void responseWritten( RequestType type, Channel channel, - RequestContext context ) - { - } - - @Override - protected RequestType getRequestContext( byte id ) - { - return contexts[id]; - } - - @Override - protected void finishOffChannel( Channel channel, RequestContext context ) - { + }; } } diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/BufferReusingChunkingChannelBuffer.java b/enterprise/backup/src/main/java/org/neo4j/backup/BufferReusingChunkingChannelBuffer.java new file mode 100644 index 0000000000000..11a1c9ea21914 --- /dev/null +++ b/enterprise/backup/src/main/java/org/neo4j/backup/BufferReusingChunkingChannelBuffer.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.backup; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.neo4j.com.ChunkingChannelBuffer; + +/** + * {@linkplain ChunkingChannelBuffer Chunking buffer} that is able to reuse up to {@link #MAX_WRITE_AHEAD_CHUNKS} + * netty channel buffers. + *

+ * Buffer is considered to be free when future corresponding to the call {@link Channel#write(Object)} is completed. + * Argument to {@link Channel#write(Object)} is {@link ChannelBuffer}. + * Method {@link ChannelFutureListener#operationComplete(ChannelFuture)} is called upon future completion and + * than {@link ChannelBuffer} is returned to the queue of free buffers. + *

+ * Allocation of buffers is traded for allocation of {@link ChannelFutureListener}s that returned buffers to the + * queue of free buffers. + */ +class BufferReusingChunkingChannelBuffer extends ChunkingChannelBuffer +{ + private final Queue freeBuffers = new LinkedBlockingQueue<>( MAX_WRITE_AHEAD_CHUNKS ); + + BufferReusingChunkingChannelBuffer( ChannelBuffer initialBuffer, Channel channel, int capacity, + byte internalProtocolVersion, byte applicationProtocolVersion ) + { + super( initialBuffer, channel, capacity, internalProtocolVersion, applicationProtocolVersion ); + } + + @Override + protected ChannelBuffer newChannelBuffer() + { + ChannelBuffer buffer = freeBuffers.poll(); + return (buffer == null) ? createNewChannelBuffer() : buffer; + } + + @Override + protected ChannelFutureListener newChannelFutureListener( final ChannelBuffer buffer ) + { + return new ChannelFutureListener() + { + @Override + public void operationComplete( ChannelFuture future ) throws Exception + { + buffer.clear(); + freeBuffers.offer( buffer ); + BufferReusingChunkingChannelBuffer.super.operationComplete( future ); + } + }; + } + + ChannelBuffer createNewChannelBuffer() + { + return super.newChannelBuffer(); + } +} diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/BufferReusingChunkingChannelBufferTest.java b/enterprise/backup/src/test/java/org/neo4j/backup/BufferReusingChunkingChannelBufferTest.java new file mode 100644 index 0000000000000..f2747f14e2cd3 --- /dev/null +++ b/enterprise/backup/src/test/java/org/neo4j/backup/BufferReusingChunkingChannelBufferTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.backup; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.junit.Test; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BufferReusingChunkingChannelBufferTest +{ + @Test + public void newBuffersAreCreatedIfNoFreeBuffersAreAvailable() + { + BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 ); + + buffer.writeLong( 1 ); + buffer.writeLong( 2 ); + buffer.writeLong( 3 ); + + verify( buffer, times( 3 ) ).createNewChannelBuffer(); + } + + @Test + public void freeBuffersAreReused() throws Exception + { + BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 ); + + buffer.writeLong( 1 ); + buffer.writeLong( 2 ); + + // return 2 buffers to the pool + ChannelBuffer reusedBuffer1 = triggerOperationCompleteCallback( buffer ); + ChannelBuffer reusedBuffer2 = triggerOperationCompleteCallback( buffer ); + + buffer.writeLong( 3 ); + buffer.writeLong( 4 ); + + // 2 buffers were created + verify( buffer, times( 2 ) ).createNewChannelBuffer(); + + // and 2 buffers were reused + verify( reusedBuffer1 ).writeLong( 3 ); + verify( reusedBuffer2 ).writeLong( 4 ); + } + + private static BufferReusingChunkingChannelBuffer newBufferReusingChunkingChannelBufferSpy( int capacity ) + { + ChannelBuffer initialBuffer = ChannelBuffers.dynamicBuffer(); + + Channel channel = mock( Channel.class ); + ChannelFuture channelFuture = mock( ChannelFuture.class ); + when( channel.isOpen() ).thenReturn( true ); + when( channel.isConnected() ).thenReturn( true ); + when( channel.isBound() ).thenReturn( true ); + when( channel.write( anyObject() ) ).thenReturn( channelFuture ); + + return spy( new BufferReusingChunkingChannelBuffer( initialBuffer, channel, capacity, (byte) 1, (byte) 1 ) ); + } + + private static ChannelBuffer triggerOperationCompleteCallback( BufferReusingChunkingChannelBuffer buffer ) + throws Exception + { + ChannelBuffer reusedBuffer = spy( ChannelBuffers.dynamicBuffer() ); + + ChannelFuture channelFuture = mock( ChannelFuture.class ); + when( channelFuture.isDone() ).thenReturn( true ); + when( channelFuture.isSuccess() ).thenReturn( true ); + + buffer.newChannelFutureListener( reusedBuffer ).operationComplete( channelFuture ); + return reusedBuffer; + } +} diff --git a/enterprise/com/src/main/java/org/neo4j/com/ChunkingChannelBuffer.java b/enterprise/com/src/main/java/org/neo4j/com/ChunkingChannelBuffer.java index 80631b6551809..244af54a054ec 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/ChunkingChannelBuffer.java +++ b/enterprise/com/src/main/java/org/neo4j/com/ChunkingChannelBuffer.java @@ -62,7 +62,8 @@ public class ChunkingChannelBuffer implements ChannelBuffer, ChannelFutureListen static final byte CONTINUATION_MORE = 1; static final byte OUTCOME_SUCCESS = 0; static final byte OUTCOME_FAILURE = 1; - private static final int MAX_WRITE_AHEAD_CHUNKS = 5; + + protected static final int MAX_WRITE_AHEAD_CHUNKS = 5; private ChannelBuffer buffer; private final Channel channel; @@ -535,12 +536,16 @@ private void sendChunkIfNeeded( int bytesPlus ) { setContinuation( CONTINUATION_MORE ); writeCurrentChunk(); - // TODO Reuse buffers? - buffer = ChannelBuffers.dynamicBuffer(); + buffer = newChannelBuffer(); addRoomForContinuationHeader(); } } + protected ChannelBuffer newChannelBuffer() + { + return ChannelBuffers.dynamicBuffer( capacity ); + } + private void writeCurrentChunk() { if ( !channel.isOpen() || !channel.isConnected() || !channel.isBound() ) @@ -548,10 +553,15 @@ private void writeCurrentChunk() waitForClientToCatchUpOnReadingChunks(); ChannelFuture future = channel.write( buffer ); - future.addListener( this ); + future.addListener( newChannelFutureListener( buffer ) ); writeAheadCounter.incrementAndGet(); } + protected ChannelFutureListener newChannelFutureListener( ChannelBuffer buffer ) + { + return this; + } + private void waitForClientToCatchUpOnReadingChunks() { // Wait until channel gets disconnected or client catches up. diff --git a/enterprise/com/src/main/java/org/neo4j/com/Server.java b/enterprise/com/src/main/java/org/neo4j/com/Server.java index ded3d61fa436e..062f23ad8a15b 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/Server.java +++ b/enterprise/com/src/main/java/org/neo4j/com/Server.java @@ -517,7 +517,7 @@ protected void handleRequest( ChannelBuffer buffer, final Channel channel ) } bufferToWriteTo.clear(); - final ChunkingChannelBuffer chunkingBuffer = new ChunkingChannelBuffer( bufferToWriteTo, channel, chunkSize, + ChunkingChannelBuffer chunkingBuffer = newChunkingBuffer( bufferToWriteTo, channel, chunkSize, getInternalProtocolVersion(), applicationProtocolVersion ); submitSilent( targetCallExecutor, new TargetCaller( type, channel, context, chunkingBuffer, bufferToReadFrom ) ); @@ -642,9 +642,15 @@ public Map getConnectedSlaveChannels() private ChunkingChannelBuffer newChunkingBuffer( Channel channel ) { - return new ChunkingChannelBuffer( ChannelBuffers.dynamicBuffer(), - channel, - chunkSize, getInternalProtocolVersion(), applicationProtocolVersion ); + return newChunkingBuffer( ChannelBuffers.dynamicBuffer(), channel, chunkSize, getInternalProtocolVersion(), + applicationProtocolVersion ); + } + + protected ChunkingChannelBuffer newChunkingBuffer( ChannelBuffer bufferToWriteTo, Channel channel, int capacity, + byte internalProtocolVersion, byte applicationProtocolVersion ) + { + return new ChunkingChannelBuffer( bufferToWriteTo, channel, capacity, internalProtocolVersion, + applicationProtocolVersion ); } private class TargetCaller implements Response.Handler, Runnable