Skip to content

Commit

Permalink
Merge pull request #5 from komamitsu/buffer_pool
Browse files Browse the repository at this point in the history
Buffer pool
  • Loading branch information
komamitsu committed Oct 18, 2015
2 parents f4ecd8c + 5ea073b commit 645e374
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 58 deletions.
8 changes: 2 additions & 6 deletions src/main/java/org/komamitsu/fluency/buffer/Buffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public abstract class Buffer<T extends Buffer.Config>
private static final Logger LOG = LoggerFactory.getLogger(Buffer.class);
protected static final Charset CHARSET = Charset.forName("ASCII");
protected final T bufferConfig;
protected final AtomicInteger allocatedSize = new AtomicInteger();
protected final ThreadLocal<ObjectMapper> objectMapperHolder = new ThreadLocal<ObjectMapper>() {
@Override
protected ObjectMapper initialValue()
Expand Down Expand Up @@ -67,10 +66,7 @@ public void close(Sender sender)
protected abstract void closeInternal(Sender sender)
throws IOException;

public int getAllocatedSize()
{
return allocatedSize.get();
}
public abstract long getAllocatedSize();

public int getMaxSize()
{
Expand All @@ -84,7 +80,7 @@ public float getBufferUsage()

public abstract static class Config<T extends Buffer, C extends Config>
{
protected int maxBufferSize = 16 * 1024 * 1024;
protected int maxBufferSize = 512 * 1024 * 1024;
protected boolean ackResponseMode = false;

public int getMaxBufferSize()
Expand Down
110 changes: 110 additions & 0 deletions src/main/java/org/komamitsu/fluency/buffer/BufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.komamitsu.fluency.buffer;

import org.msgpack.core.annotations.VisibleForTesting;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class BufferPool
{
@VisibleForTesting
final Map<Integer, LinkedBlockingQueue<ByteBuffer>> bufferPool = new HashMap<Integer, LinkedBlockingQueue<ByteBuffer>>();
private final AtomicLong allocatedSize = new AtomicLong();
private final int initialBufferSize;
private final int maxBufferSize;

public BufferPool(int initialBufferSize, int maxBufferSize)
{
this.initialBufferSize = initialBufferSize;
this.maxBufferSize = maxBufferSize;
}

public ByteBuffer acquireBuffer(int bufferSize)
{
int normalizedBufferSize = initialBufferSize;
while (normalizedBufferSize < bufferSize) {
normalizedBufferSize *= 2;
}

LinkedBlockingQueue<ByteBuffer> buffers;
synchronized (bufferPool) {
buffers = bufferPool.get(normalizedBufferSize);
if (buffers == null) {
buffers = new LinkedBlockingQueue<ByteBuffer>();
bufferPool.put(normalizedBufferSize, buffers);
}
}

ByteBuffer buffer = buffers.poll();
if (buffer != null) {
return buffer;
}

/*
synchronized (allocatedSize) {
if (allocatedSize.get() + normalizedBufferSize > maxBufferSize) {
return null; // `null` means the buffer is full.
}
allocatedSize.addAndGet(normalizedBufferSize);
return ByteBuffer.allocateDirect(normalizedBufferSize);
}
*/

while (true) {
long currentAllocatedSize = allocatedSize.get();
if (currentAllocatedSize + normalizedBufferSize > maxBufferSize) {
releaseBuffers();
return null; // `null` means the buffer is full.
}
if (currentAllocatedSize == allocatedSize.getAndAdd(normalizedBufferSize)) {
return ByteBuffer.allocateDirect(normalizedBufferSize);
}
allocatedSize.getAndAdd(-normalizedBufferSize);
}
}

public void returnBuffer(ByteBuffer byteBuffer)
{
LinkedBlockingQueue<ByteBuffer> buffers = bufferPool.get(byteBuffer.capacity());
if (buffers == null) {
throw new IllegalStateException("`buffers` shouldn't be null");
}

byteBuffer.position(0);
byteBuffer.limit(byteBuffer.capacity());
buffers.offer(byteBuffer);
}

public long getAllocatedSize()
{
return allocatedSize.get();
}

public void releaseBuffers()
{
synchronized (bufferPool) {
for (Map.Entry<Integer, LinkedBlockingQueue<ByteBuffer>> entry : bufferPool.entrySet()) {
ByteBuffer buffer;
while ((buffer = entry.getValue().poll()) != null) {
allocatedSize.addAndGet(-buffer.capacity());
}
}
}
}

@Override
public String toString()
{
return "BufferPool{" +
"bufferPool=" + bufferPool +
", allocatedSize=" + allocatedSize +
", initialBufferSize=" + initialBufferSize +
", maxBufferSize=" + maxBufferSize +
'}';
}
}
12 changes: 8 additions & 4 deletions src/main/java/org/komamitsu/fluency/buffer/MessageBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.komamitsu.fluency.sender.Sender;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MessageBuffer
extends Buffer<MessageBuffer.Config>
{
private static final Logger LOG = LoggerFactory.getLogger(MessageBuffer.class);
private final AtomicInteger allocatedSize = new AtomicInteger();
private final LinkedBlockingQueue<ByteBuffer> messages = new LinkedBlockingQueue<ByteBuffer>();
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Expand Down Expand Up @@ -89,6 +87,12 @@ public void closeInternal(Sender sender)
messages.clear();
}

@Override
public long getAllocatedSize()
{
return allocatedSize.get();
}

public static class Config extends Buffer.Config<MessageBuffer, Config>
{
@Override
Expand Down
80 changes: 42 additions & 38 deletions src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@
import org.komamitsu.fluency.sender.Sender;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class PackedForwardBuffer
Expand All @@ -27,10 +23,12 @@ public class PackedForwardBuffer
private static final Logger LOG = LoggerFactory.getLogger(PackedForwardBuffer.class);
private final Map<String, RetentionBuffer> retentionBuffers = new HashMap<String, RetentionBuffer>();
private final LinkedBlockingQueue<TaggableBuffer> flushableBuffers = new LinkedBlockingQueue<TaggableBuffer>();
private final BufferPool bufferPool;

private PackedForwardBuffer(PackedForwardBuffer.Config bufferConfig)
{
super(bufferConfig);
bufferPool = new BufferPool(bufferConfig.getInitialBufferSize(), bufferConfig.getMaxBufferSize());
}

private synchronized RetentionBuffer prepareBuffer(String tag, int writeSize)
Expand All @@ -41,28 +39,23 @@ private synchronized RetentionBuffer prepareBuffer(String tag, int writeSize)
return retentionBuffer;
}

int origRetentionBufferSize;
int newRetentionBufferSize;
if (retentionBuffer == null) {
origRetentionBufferSize = 0;
newRetentionBufferSize = bufferConfig.getInitialBufferSize();
}
else{
origRetentionBufferSize = retentionBuffer.getByteBuffer().capacity();
newRetentionBufferSize = (int) (retentionBuffer.getByteBuffer().capacity() * bufferConfig.getBufferExpandRatio());
}

while (newRetentionBufferSize < writeSize) {
newRetentionBufferSize *= bufferConfig.getBufferExpandRatio();
}

int delta = newRetentionBufferSize - origRetentionBufferSize;
if (allocatedSize.get() + delta > bufferConfig.getMaxBufferSize()) {
throw new BufferFullException("Buffer is full. bufferConfig=" + bufferConfig + ", allocatedSize=" + allocatedSize);
ByteBuffer acquiredBuffer = bufferPool.acquireBuffer(newRetentionBufferSize);
if (acquiredBuffer == null) {
throw new BufferFullException("Buffer is full. bufferConfig=" + bufferConfig + ", bufferPool=" + bufferPool);
}
allocatedSize.addAndGet(delta);

RetentionBuffer newBuffer = new RetentionBuffer(ByteBuffer.allocateDirect(newRetentionBufferSize));
RetentionBuffer newBuffer = new RetentionBuffer(acquiredBuffer);
if (retentionBuffer != null) {
retentionBuffer.getByteBuffer().flip();
newBuffer.getByteBuffer().put(retentionBuffer.getByteBuffer());
Expand Down Expand Up @@ -137,34 +130,38 @@ public void flushInternal(Sender sender, boolean force)

TaggableBuffer flushableBuffer = null;
while ((flushableBuffer = flushableBuffers.poll()) != null) {
allocatedSize.addAndGet(-flushableBuffer.getByteBuffer().capacity());
// TODO: Reuse MessagePacker
ByteArrayOutputStream header = new ByteArrayOutputStream();
MessagePacker messagePacker = MessagePack.newDefaultPacker(header);
LOG.trace("flushInternal(): bufferUsage={}, flushableBuffer={}", getBufferUsage(), flushableBuffer);
String tag = flushableBuffer.getTag();
ByteBuffer byteBuffer = flushableBuffer.getByteBuffer();
if (bufferConfig.isAckResponseMode()) {
messagePacker.packArrayHeader(3);
}
else {
messagePacker.packArrayHeader(2);
}
messagePacker.packString(tag);
messagePacker.packRawStringHeader(byteBuffer.position());
messagePacker.flush();

synchronized (sender) {
ByteBuffer headerBuffer = ByteBuffer.wrap(header.toByteArray());
byteBuffer.flip();
try {
// TODO: Reuse MessagePacker
ByteArrayOutputStream header = new ByteArrayOutputStream();
MessagePacker messagePacker = MessagePack.newDefaultPacker(header);
LOG.trace("flushInternal(): bufferUsage={}, flushableBuffer={}", getBufferUsage(), flushableBuffer);
String tag = flushableBuffer.getTag();
ByteBuffer byteBuffer = flushableBuffer.getByteBuffer();
if (bufferConfig.isAckResponseMode()) {
String uuid = UUID.randomUUID().toString();
sender.sendWithAck(Arrays.asList(headerBuffer, byteBuffer), uuid.getBytes(CHARSET));
messagePacker.packArrayHeader(3);
}
else {
sender.send(Arrays.asList(headerBuffer, byteBuffer));
messagePacker.packArrayHeader(2);
}
messagePacker.packString(tag);
messagePacker.packRawStringHeader(byteBuffer.position());
messagePacker.flush();

synchronized (sender) {
ByteBuffer headerBuffer = ByteBuffer.wrap(header.toByteArray());
byteBuffer.flip();
if (bufferConfig.isAckResponseMode()) {
String uuid = UUID.randomUUID().toString();
sender.sendWithAck(Arrays.asList(headerBuffer, byteBuffer), uuid.getBytes(CHARSET));
}
else {
sender.send(Arrays.asList(headerBuffer, byteBuffer));
}
}
}
finally {
bufferPool.returnBuffer(flushableBuffer.getByteBuffer());
}
}
}

Expand All @@ -175,6 +172,13 @@ public synchronized void closeInternal(Sender sender)
moveRetentionBuffersToFlushable(true);
retentionBuffers.clear();
flush(sender, true);
bufferPool.releaseBuffers();
}

@Override
public long getAllocatedSize()
{
return bufferPool.getAllocatedSize();
}

private static class RetentionBuffer
Expand Down Expand Up @@ -240,10 +244,10 @@ public String toString()

public static class Config extends Buffer.Config<PackedForwardBuffer, Config>
{
private int initialBufferSize = 512 * 1024;
private int initialBufferSize = 1024 * 1024;
private float bufferExpandRatio = 2.0f;
private int bufferRetentionSize = 4 * 1024 * 1024;
private int bufferRetentionTimeMillis = 500;
private int bufferRetentionTimeMillis = 400;

public int getInitialBufferSize()
{
Expand Down

0 comments on commit 645e374

Please sign in to comment.