Skip to content

Commit

Permalink
Customizable estimation for messages written outside the EventLoop
Browse files Browse the repository at this point in the history
Motivation:

Estimation algorithm currently used for WriteTasks is complicated and
wrong. Additionally, some code relies on outbound buffer size
incremented only on actual writes to the outbound buffer.

Modifications:

- Throw away the old estimator and replace with a simple algorithm that
  uses the client-provided estimator along with a statically configured
  WriteTask overhead (io.netty.transport.writeTaskSizeOverhead system
  property with the default value of 48 bytes)
- Add a io.netty.transport.estimateSizeOnSubmit boolean system property
  allowing the clients to disable the message estimation outside the
  event loop

Result:

Task estimation is user controllable and produces better results by
default
  • Loading branch information
Alexey Ermakov authored and normanmaurer committed Dec 23, 2015
1 parent 6177738 commit d2ddb52
Showing 1 changed file with 23 additions and 106 deletions.
Expand Up @@ -15,26 +15,17 @@
*/
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;

import static io.netty.channel.DefaultChannelPipeline.*;

Expand Down Expand Up @@ -849,23 +840,12 @@ private static void safeExecute(EventExecutor executor, Runnable runnable, Chann

abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable {

private static final FastThreadLocal<Map<Class<?>, Integer>> CLASS_SIZES =
new FastThreadLocal<Map<Class<?>, Integer>>() {
@Override
protected Map<Class<?>, Integer> initialValue() throws Exception {
Map<Class<?>, Integer> map = new WeakHashMap<Class<?>, Integer>();
map.put(void.class, 0);
map.put(byte.class, 1);
map.put(char.class, 2);
map.put(short.class, 2);
map.put(boolean.class, 4); // Probably an integer.
map.put(int.class, 4);
map.put(float.class, 4);
map.put(long.class, 8);
map.put(double.class, 8);
return map;
}
};
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);

// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);

private AbstractChannelHandlerContext ctx;
private Object msg;
Expand All @@ -876,97 +856,34 @@ private AbstractWriteTask(Recycler.Handle handle) {
super(handle);
}

private static int estimateSize(Object o, Map<Class<?>, Integer> classSizes) {
int answer = 8 + estimateSize(o.getClass(), classSizes, null);

if (o instanceof ByteBuf) {
answer += ((ByteBuf) o).readableBytes();
} else if (o instanceof ByteBufHolder) {
answer += ((ByteBufHolder) o).content().readableBytes();
} else if (o instanceof FileRegion) {
// nothing to add.
} else if (o instanceof byte[]) {
answer += ((byte[]) o).length;
} else if (o instanceof ByteBuffer) {
answer += ((ByteBuffer) o).remaining();
} else if (o instanceof CharSequence) {
answer += ((CharSequence) o).length() << 1;
} else if (o instanceof Iterable<?>) {
for (Object m : (Iterable<?>) o) {
answer += estimateSize(m, classSizes);
}
}

return align(answer);
}

private static int estimateSize(Class<?> clazz, Map<Class<?>, Integer> classSizes,
Set<Class<?>> visitedClasses) {
Integer objectSize = classSizes.get(clazz);
if (objectSize != null) {
return objectSize;
}

if (visitedClasses != null) {
if (visitedClasses.contains(clazz)) {
return 0;
}
} else {
visitedClasses = new HashSet<Class<?>>();
}

visitedClasses.add(clazz);

int answer = 8; // Basic overhead.
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field f : fields) {
if ((f.getModifiers() & Modifier.STATIC) != 0) {
// Ignore static fields.
continue;
}

answer += estimateSize(f.getType(), classSizes, visitedClasses);
}
}

visitedClasses.remove(clazz);

// Some alignment.
answer = align(answer);

// Put the final answer.
classSizes.put(clazz, answer);
return answer;
}

private static int align(int size) {
return size + 8 - (size & 7);
}

protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = ctx.pipeline.channel.estimatorHandle().size(msg) + estimateSize(task, CLASS_SIZES.get());

ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(task.size);
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();

// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
buffer.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
} else {
task.size = 0;
}
}

@Override
public final void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
Expand Down

0 comments on commit d2ddb52

Please sign in to comment.