Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect HTTP/2 connection window size #1489

Merged
merged 3 commits into from Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import com.linecorp.armeria.common.ProtocolViolationException;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.internal.ArmeriaHttpUtil;
import com.linecorp.armeria.internal.InboundTrafficController;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -61,7 +62,7 @@ private enum State {
private State state = State.NEED_HEADERS;

Http1ResponseDecoder(Channel channel) {
super(channel);
super(channel, InboundTrafficController.ofHttp1(channel));
}

@Override
Expand Down
Expand Up @@ -36,7 +36,7 @@ final class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
super(decoder, encoder, initialSettings);

this.clientFactory = clientFactory;
responseDecoder = new Http2ResponseDecoder(channel, encoder());
responseDecoder = new Http2ResponseDecoder(channel, encoder(), clientFactory);
connection().addListener(responseDecoder);
decoder().frameListener(responseDecoder);

Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.internal.ArmeriaHttpUtil;
import com.linecorp.armeria.internal.Http2GoAwayHandler;
import com.linecorp.armeria.internal.InboundTrafficController;
import com.linecorp.armeria.unsafe.ByteBufHttpData;

import io.netty.buffer.ByteBuf;
Expand All @@ -56,8 +57,9 @@ final class Http2ResponseDecoder extends HttpResponseDecoder implements Http2Con
private final Http2ConnectionEncoder encoder;
private final Http2GoAwayHandler goAwayHandler;

Http2ResponseDecoder(Channel channel, Http2ConnectionEncoder encoder) {
super(channel);
Http2ResponseDecoder(Channel channel, Http2ConnectionEncoder encoder, HttpClientFactory clientFactory) {
super(channel,
InboundTrafficController.ofHttp2(channel, clientFactory.http2InitialConnectionWindowSize()));
conn = encoder.connection();
this.encoder = encoder;
goAwayHandler = new Http2GoAwayHandler();
Expand Down
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.ChannelUtil;
import com.linecorp.armeria.internal.Http1ClientCodec;
import com.linecorp.armeria.internal.ReadSuppressingHandler;
import com.linecorp.armeria.internal.TrafficLoggingHandler;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock

// Configure the pipeline.
final Channel ch = ctx.channel();
ChannelUtil.disableWriterBufferWatermark(ch);

final ChannelPipeline p = ch.pipeline();
p.addLast(new FlushConsolidationHandler());
Expand Down
Expand Up @@ -52,9 +52,9 @@ abstract class HttpResponseDecoder {
private final InboundTrafficController inboundTrafficController;
private boolean disconnectWhenFinished;

HttpResponseDecoder(Channel channel) {
HttpResponseDecoder(Channel channel, InboundTrafficController inboundTrafficController) {
this.channel = channel;
inboundTrafficController = new InboundTrafficController(channel);
this.inboundTrafficController = inboundTrafficController;
}

final Channel channel() {
Expand Down
Expand Up @@ -29,10 +29,6 @@
interface HttpSession {

HttpSession INACTIVE = new HttpSession() {

private final InboundTrafficController inboundTrafficController =
new InboundTrafficController(null, 0, 0);

@Nullable
@Override
public SessionProtocol protocol() {
Expand All @@ -46,7 +42,7 @@ public boolean isActive() {

@Override
public InboundTrafficController inboundTrafficController() {
return inboundTrafficController;
return InboundTrafficController.disabled();
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/com/linecorp/armeria/internal/ChannelUtil.java
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;

public final class ChannelUtil {
Expand All @@ -42,6 +43,9 @@ public final class ChannelUtil {
}
}

private static final WriteBufferWaterMark DISABLED_WRITE_BUFFER_WATERMARK =
new WriteBufferWaterMark(0, Integer.MAX_VALUE);

public static Class<? extends EventLoopGroup> epollEventLoopClass() {
return EPOLL_EVENT_LOOP_CLASS;
}
Expand All @@ -67,5 +71,13 @@ public static CompletableFuture<Void> close(Iterable<? extends Channel> channels
return future;
}

/**
* Disables the write buffer water mark of the specified {@link Channel}, because we do not use this
* feature at all and thus we do not want {@code channelWritabilityChanged} events triggered often.
*/
public static void disableWriterBufferWatermark(Channel channel) {
channel.config().setWriteBufferWaterMark(DISABLED_WRITE_BUFFER_WATERMARK);
}

private ChannelUtil() {}
}
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;

import com.google.common.base.MoreObjects;
import com.google.common.math.IntMath;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
Expand All @@ -29,23 +30,38 @@ public final class InboundTrafficController extends AtomicInteger {

private static final long serialVersionUID = 420503276551000218L;

private static final InboundTrafficController DISABLED = new InboundTrafficController(null, 0, 0);
private static int numDeferredReads;

public static int numDeferredReads() {
return numDeferredReads;
}

public static InboundTrafficController ofHttp1(Channel channel) {
return new InboundTrafficController(channel, 128 * 1024, 64 * 1024);
}

public static InboundTrafficController ofHttp2(Channel channel, int connectionWindowSize) {
// Compensate for protocol overhead traffic incurred by frame headers, etc.
// This is a very rough estimate, but it should not hurt.
connectionWindowSize = IntMath.saturatedAdd(connectionWindowSize, 1024);

final int highWatermark = Math.max(connectionWindowSize, 128 * 1024);
final int lowWatermark = highWatermark >>> 1;
return new InboundTrafficController(channel, highWatermark, lowWatermark);
}

public static InboundTrafficController disabled() {
return DISABLED;
}

@Nullable
private final ChannelConfig cfg;
private final int highWatermark;
private final int lowWatermark;
private volatile boolean suspended;

public InboundTrafficController(@Nullable Channel channel) {
this(channel, 128 * 1024, 64 * 1024);
}

public InboundTrafficController(@Nullable Channel channel, int highWatermark, int lowWatermark) {
private InboundTrafficController(@Nullable Channel channel, int highWatermark, int lowWatermark) {
cfg = channel != null ? channel.config() : null;
this.highWatermark = highWatermark;
this.lowWatermark = lowWatermark;
Expand Down
Expand Up @@ -84,7 +84,7 @@ final class Http1RequestDecoder extends ChannelDuplexHandler {
Http1ObjectEncoder writer) {
this.cfg = cfg;
this.scheme = scheme;
inboundTrafficController = new InboundTrafficController(channel);
inboundTrafficController = InboundTrafficController.ofHttp1(channel);
this.writer = writer;
}

Expand Down
Expand Up @@ -66,7 +66,8 @@ final class Http2RequestDecoder extends Http2EventAdapter {
this.cfg = cfg;
this.channel = channel;
this.writer = writer;
inboundTrafficController = new InboundTrafficController(channel);
inboundTrafficController =
InboundTrafficController.ofHttp2(channel, cfg.http2InitialConnectionWindowSize());
goAwayHandler = new Http2GoAwayHandler();
}

Expand Down
Expand Up @@ -37,6 +37,7 @@

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.ChannelUtil;
import com.linecorp.armeria.internal.Http1ObjectEncoder;
import com.linecorp.armeria.internal.ReadSuppressingHandler;
import com.linecorp.armeria.internal.TrafficLoggingHandler;
Expand Down Expand Up @@ -123,6 +124,8 @@ final class HttpServerPipelineConfigurator extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelUtil.disableWriterBufferWatermark(ch);

final ChannelPipeline p = ch.pipeline();
p.addLast(new FlushConsolidationHandler());
p.addLast(ReadSuppressingHandler.INSTANCE);
Expand Down
Expand Up @@ -120,7 +120,7 @@ public boolean isActive() {

@Override
public InboundTrafficController inboundTrafficController() {
return HttpSession.INACTIVE.inboundTrafficController();
return InboundTrafficController.disabled();
}

@Override
Expand Down