diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 22fdb864b8d..b57990e64c1 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -15,26 +15,17 @@ */ package io.netty.channel; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufHolder; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.RecyclableMpscLinkedQueueNode; import io.netty.util.internal.StringUtil; +import io.netty.util.internal.SystemPropertyUtil; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; import static io.netty.channel.DefaultChannelPipeline.*; @@ -849,23 +840,12 @@ private static void safeExecute(EventExecutor executor, Runnable runnable, Chann abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode implements Runnable { - private static final FastThreadLocal, Integer>> CLASS_SIZES = - new FastThreadLocal, Integer>>() { - @Override - protected Map, Integer> initialValue() throws Exception { - Map, Integer> map = new WeakHashMap, Integer>(); - map.put(void.class, 0); - map.put(byte.class, 1); - map.put(char.class, 2); - map.put(short.class, 2); - map.put(boolean.class, 4); // Probably an integer. - map.put(int.class, 4); - map.put(float.class, 4); - map.put(long.class, 8); - map.put(double.class, 8); - return map; - } - }; + private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = + SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); + + // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + private static final int WRITE_TASK_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); private AbstractChannelHandlerContext ctx; private Object msg; @@ -876,97 +856,34 @@ private AbstractWriteTask(Recycler.Handle handle) { super(handle); } - private static int estimateSize(Object o, Map, Integer> classSizes) { - int answer = 8 + estimateSize(o.getClass(), classSizes, null); - - if (o instanceof ByteBuf) { - answer += ((ByteBuf) o).readableBytes(); - } else if (o instanceof ByteBufHolder) { - answer += ((ByteBufHolder) o).content().readableBytes(); - } else if (o instanceof FileRegion) { - // nothing to add. - } else if (o instanceof byte[]) { - answer += ((byte[]) o).length; - } else if (o instanceof ByteBuffer) { - answer += ((ByteBuffer) o).remaining(); - } else if (o instanceof CharSequence) { - answer += ((CharSequence) o).length() << 1; - } else if (o instanceof Iterable) { - for (Object m : (Iterable) o) { - answer += estimateSize(m, classSizes); - } - } - - return align(answer); - } - - private static int estimateSize(Class clazz, Map, Integer> classSizes, - Set> visitedClasses) { - Integer objectSize = classSizes.get(clazz); - if (objectSize != null) { - return objectSize; - } - - if (visitedClasses != null) { - if (visitedClasses.contains(clazz)) { - return 0; - } - } else { - visitedClasses = new HashSet>(); - } - - visitedClasses.add(clazz); - - int answer = 8; // Basic overhead. - for (Class c = clazz; c != null; c = c.getSuperclass()) { - Field[] fields = c.getDeclaredFields(); - for (Field f : fields) { - if ((f.getModifiers() & Modifier.STATIC) != 0) { - // Ignore static fields. - continue; - } - - answer += estimateSize(f.getType(), classSizes, visitedClasses); - } - } - - visitedClasses.remove(clazz); - - // Some alignment. - answer = align(answer); - - // Put the final answer. - classSizes.put(clazz, answer); - return answer; - } - - private static int align(int size) { - return size + 8 - (size & 7); - } - protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { task.ctx = ctx; task.msg = msg; task.promise = promise; - task.size = ctx.pipeline.channel.estimatorHandle().size(msg) + estimateSize(task, CLASS_SIZES.get()); - ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(task.size); + if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { + ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); + + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD; + buffer.incrementPendingOutboundBytes(task.size); + } else { + task.size = 0; + } + } else { + task.size = 0; } } @Override public final void run() { try { - if (size > 0) { - ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.decrementPendingOutboundBytes(size); - } + ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) { + buffer.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally {