Skip to content

Commit

Permalink
Reuse netty buffers during backup
Browse files Browse the repository at this point in the history
During backup, bytes from store files are transferred using netty
ChannelBuffers. Such buffers were created for each chunk of data that
was send out. Default chunk size is 4M so for big stores amount of
created buffers was huge.

This commit introduces a chunking channel buffer that is able to reuse
ChannelBuffers. Currently it is used only for backup because excessive
buffer creation was only noticed there.

BufferReusingChunkingChannelBuffer uses a queue of free buffers and
subscribes to 'write completed' notifications with a listener that
clears the used buffer and puts it on the queue of free buffers.
Essentially this chunking channel buffer trades allocation of dynamic
netty buffers for allocation of ChannelFutureListeners. This is a right
thing to do because ChannelBuffer is an array based structure while
ChannelFutureListener is only an anonymous class that captures a
single ChannelBuffer.

This change was tested on a 2.5GB store with full backups running in a
tight loop. 1 minute JFR recording showed following results:
 * without buffer reuse: 14592 byte[] instances, total size 26.05GB
 * with buffer reuse: 2436 byte[] instances, total size 9.13GB

So improvement in object allocation would probably be only visible for
big stores and full backups.
  • Loading branch information
lutovich committed Oct 19, 2015
1 parent ceedb2a commit 0946638
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 37 deletions.
71 changes: 42 additions & 29 deletions enterprise/backup/src/main/java/org/neo4j/backup/BackupServer.java
Expand Up @@ -19,9 +19,11 @@
*/
package org.neo4j.backup;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;

import org.neo4j.backup.BackupClient.BackupRequestType;
import org.neo4j.com.ChunkingChannelBuffer;
import org.neo4j.com.Client;
import org.neo4j.com.Protocol;
import org.neo4j.com.ProtocolVersion;
Expand All @@ -36,60 +38,71 @@

import static org.neo4j.helpers.Clock.SYSTEM_CLOCK;

class BackupServer extends Server<TheBackupInterface, Object>
class BackupServer extends Server<TheBackupInterface,Object>
{
private static final long DEFAULT_OLD_CHANNEL_THRESHOLD = Client.DEFAULT_READ_RESPONSE_TIMEOUT_SECONDS * 1000;
private static final int DEFAULT_MAX_CONCURRENT_TX = 3;

private static final BackupRequestType[] contexts = BackupRequestType.values();

static final byte PROTOCOL_VERSION = 1;
private final BackupRequestType[] contexts = BackupRequestType.values();
static int DEFAULT_PORT = 6362;
static final int DEFAULT_PORT = 6362;
static final int FRAME_LENGTH = Protocol.MEGA * 4;

public BackupServer( TheBackupInterface requestTarget, final HostnamePort server,
Logging logging, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor )
BackupServer( TheBackupInterface requestTarget, HostnamePort server, Logging logging,
ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor )
{
super( requestTarget, newBackupConfig( FRAME_LENGTH, server ), logging, FRAME_LENGTH,
new ProtocolVersion( PROTOCOL_VERSION, ProtocolVersion.INTERNAL_PROTOCOL_VERSION ),
TxChecksumVerifier.ALWAYS_MATCH, SYSTEM_CLOCK, byteCounterMonitor, requestMonitor );
}

@Override
protected RequestType<TheBackupInterface> getRequestContext( byte id )
{
return contexts[id];
}

@Override
protected void finishOffChannel( Channel channel, RequestContext context )
{
super( requestTarget, new Configuration()
}

@Override
protected ChunkingChannelBuffer newChunkingBuffer( ChannelBuffer bufferToWriteTo, Channel channel, int capacity,
byte internalProtocolVersion, byte applicationProtocolVersion )
{
return new BufferReusingChunkingChannelBuffer( bufferToWriteTo, channel, capacity, internalProtocolVersion,
applicationProtocolVersion );
}

private static Configuration newBackupConfig( final int chunkSize, final HostnamePort server )
{
return new Configuration()
{
@Override
public long getOldChannelThreshold()
{
return Client.DEFAULT_READ_RESPONSE_TIMEOUT_SECONDS * 1000;
return DEFAULT_OLD_CHANNEL_THRESHOLD;
}

@Override
public int getMaxConcurrentTransactions()
{
return 3;
return DEFAULT_MAX_CONCURRENT_TX;
}

@Override
public int getChunkSize()
{
return FRAME_LENGTH;
return chunkSize;
}

@Override
public HostnamePort getServerAddress()
{
return server;
}
}, logging, FRAME_LENGTH, new ProtocolVersion( PROTOCOL_VERSION,
ProtocolVersion.INTERNAL_PROTOCOL_VERSION ),
TxChecksumVerifier.ALWAYS_MATCH, SYSTEM_CLOCK, byteCounterMonitor, requestMonitor );
}

@Override
protected void responseWritten( RequestType<TheBackupInterface> type, Channel channel,
RequestContext context )
{
}

@Override
protected RequestType<TheBackupInterface> getRequestContext( byte id )
{
return contexts[id];
}

@Override
protected void finishOffChannel( Channel channel, RequestContext context )
{
};
}
}
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.backup;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.neo4j.com.ChunkingChannelBuffer;

/**
* {@linkplain ChunkingChannelBuffer Chunking buffer} that is able to reuse up to {@link #MAX_WRITE_AHEAD_CHUNKS}
* netty channel buffers.
* <p>
* Buffer is considered to be free when future corresponding to the call {@link Channel#write(Object)} is completed.
* Argument to {@link Channel#write(Object)} is {@link ChannelBuffer}.
* Method {@link ChannelFutureListener#operationComplete(ChannelFuture)} is called upon future completion and
* than {@link ChannelBuffer} is returned to the queue of free buffers.
* <p>
* Allocation of buffers is traded for allocation of {@link ChannelFutureListener}s that returned buffers to the
* queue of free buffers.
*/
class BufferReusingChunkingChannelBuffer extends ChunkingChannelBuffer
{
private final Queue<ChannelBuffer> freeBuffers = new LinkedBlockingQueue<>( MAX_WRITE_AHEAD_CHUNKS );

BufferReusingChunkingChannelBuffer( ChannelBuffer initialBuffer, Channel channel, int capacity,
byte internalProtocolVersion, byte applicationProtocolVersion )
{
super( initialBuffer, channel, capacity, internalProtocolVersion, applicationProtocolVersion );
}

@Override
protected ChannelBuffer newChannelBuffer()
{
ChannelBuffer buffer = freeBuffers.poll();
return (buffer == null) ? createNewChannelBuffer() : buffer;
}

@Override
protected ChannelFutureListener newChannelFutureListener( final ChannelBuffer buffer )
{
return new ChannelFutureListener()
{
@Override
public void operationComplete( ChannelFuture future ) throws Exception
{
buffer.clear();
freeBuffers.offer( buffer );
BufferReusingChunkingChannelBuffer.super.operationComplete( future );
}
};
}

ChannelBuffer createNewChannelBuffer()
{
return super.newChannelBuffer();
}
}
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.backup;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.junit.Test;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class BufferReusingChunkingChannelBufferTest
{
@Test
public void newBuffersAreCreatedIfNoFreeBuffersAreAvailable()
{
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 );

buffer.writeLong( 1 );
buffer.writeLong( 2 );
buffer.writeLong( 3 );

verify( buffer, times( 3 ) ).createNewChannelBuffer();
}

@Test
public void freeBuffersAreReused() throws Exception
{
BufferReusingChunkingChannelBuffer buffer = newBufferReusingChunkingChannelBufferSpy( 10 );

buffer.writeLong( 1 );
buffer.writeLong( 2 );

// return 2 buffers to the pool
ChannelBuffer reusedBuffer1 = triggerOperationCompleteCallback( buffer );
ChannelBuffer reusedBuffer2 = triggerOperationCompleteCallback( buffer );

buffer.writeLong( 3 );
buffer.writeLong( 4 );

// 2 buffers were created
verify( buffer, times( 2 ) ).createNewChannelBuffer();

// and 2 buffers were reused
verify( reusedBuffer1 ).writeLong( 3 );
verify( reusedBuffer2 ).writeLong( 4 );
}

private static BufferReusingChunkingChannelBuffer newBufferReusingChunkingChannelBufferSpy( int capacity )
{
ChannelBuffer initialBuffer = ChannelBuffers.dynamicBuffer();

Channel channel = mock( Channel.class );
ChannelFuture channelFuture = mock( ChannelFuture.class );
when( channel.isOpen() ).thenReturn( true );
when( channel.isConnected() ).thenReturn( true );
when( channel.isBound() ).thenReturn( true );
when( channel.write( anyObject() ) ).thenReturn( channelFuture );

return spy( new BufferReusingChunkingChannelBuffer( initialBuffer, channel, capacity, (byte) 1, (byte) 1 ) );
}

private static ChannelBuffer triggerOperationCompleteCallback( BufferReusingChunkingChannelBuffer buffer )
throws Exception
{
ChannelBuffer reusedBuffer = spy( ChannelBuffers.dynamicBuffer() );

ChannelFuture channelFuture = mock( ChannelFuture.class );
when( channelFuture.isDone() ).thenReturn( true );
when( channelFuture.isSuccess() ).thenReturn( true );

buffer.newChannelFutureListener( reusedBuffer ).operationComplete( channelFuture );
return reusedBuffer;
}
}
Expand Up @@ -62,7 +62,8 @@ public class ChunkingChannelBuffer implements ChannelBuffer, ChannelFutureListen
static final byte CONTINUATION_MORE = 1;
static final byte OUTCOME_SUCCESS = 0;
static final byte OUTCOME_FAILURE = 1;
private static final int MAX_WRITE_AHEAD_CHUNKS = 5;

protected static final int MAX_WRITE_AHEAD_CHUNKS = 5;

private ChannelBuffer buffer;
private final Channel channel;
Expand Down Expand Up @@ -535,23 +536,32 @@ private void sendChunkIfNeeded( int bytesPlus )
{
setContinuation( CONTINUATION_MORE );
writeCurrentChunk();
// TODO Reuse buffers?
buffer = ChannelBuffers.dynamicBuffer();
buffer = newChannelBuffer();
addRoomForContinuationHeader();
}
}

protected ChannelBuffer newChannelBuffer()
{
return ChannelBuffers.dynamicBuffer( capacity );
}

private void writeCurrentChunk()
{
if ( !channel.isOpen() || !channel.isConnected() || !channel.isBound() )
throw new ComException( "Channel has been closed, so no need to try to write to it anymore. Client closed it?" );

waitForClientToCatchUpOnReadingChunks();
ChannelFuture future = channel.write( buffer );
future.addListener( this );
future.addListener( newChannelFutureListener( buffer ) );
writeAheadCounter.incrementAndGet();
}

protected ChannelFutureListener newChannelFutureListener( ChannelBuffer buffer )
{
return this;
}

private void waitForClientToCatchUpOnReadingChunks()
{
// Wait until channel gets disconnected or client catches up.
Expand Down
14 changes: 10 additions & 4 deletions enterprise/com/src/main/java/org/neo4j/com/Server.java
Expand Up @@ -517,7 +517,7 @@ protected void handleRequest( ChannelBuffer buffer, final Channel channel )
}

bufferToWriteTo.clear();
final ChunkingChannelBuffer chunkingBuffer = new ChunkingChannelBuffer( bufferToWriteTo, channel, chunkSize,
ChunkingChannelBuffer chunkingBuffer = newChunkingBuffer( bufferToWriteTo, channel, chunkSize,
getInternalProtocolVersion(), applicationProtocolVersion );
submitSilent( targetCallExecutor, new TargetCaller( type, channel, context, chunkingBuffer,
bufferToReadFrom ) );
Expand Down Expand Up @@ -642,9 +642,15 @@ public Map<Channel,RequestContext> getConnectedSlaveChannels()

private ChunkingChannelBuffer newChunkingBuffer( Channel channel )
{
return new ChunkingChannelBuffer( ChannelBuffers.dynamicBuffer(),
channel,
chunkSize, getInternalProtocolVersion(), applicationProtocolVersion );
return newChunkingBuffer( ChannelBuffers.dynamicBuffer(), channel, chunkSize, getInternalProtocolVersion(),
applicationProtocolVersion );
}

protected ChunkingChannelBuffer newChunkingBuffer( ChannelBuffer bufferToWriteTo, Channel channel, int capacity,
byte internalProtocolVersion, byte applicationProtocolVersion )
{
return new ChunkingChannelBuffer( bufferToWriteTo, channel, capacity, internalProtocolVersion,
applicationProtocolVersion );
}

private class TargetCaller implements Response.Handler, Runnable
Expand Down

0 comments on commit 0946638

Please sign in to comment.