Skip to content

Commit

Permalink
Fix delayed notification of session creation failure and log GOAWAY f…
Browse files Browse the repository at this point in the history
…rames

Motivation:

When a connection is closed before session protocol negotiation succeeds
or fails, the Promise of session creation is not notified immediately
but only after the session creation timeout occurs. As a result, the
notification of session creation due to an unexpected disconnection can
take up to as long as connection timeout (3.2 seconds by default)

Also, we need to log more:

- the information about GOAWAY frames we sent and received
- the current session protocol when an unexpected exception occurred

Modifications:

- Try to reject the session creation promise when a connection has been
  closed.
- Add Http2GoAwayListener and add it to both client and server
  connections so that we have more information about GOAWAY frames we
  send and receive.
- Log the current session protocol when logging an unexpected exception
- Log the pre-upgrade request and the removal of the upgrade stream to
  see if there's a case where the upgrade stream is removed before
  sending the first response
- Miscellaneous:
  - Do not propagate IdleStateEvent in Http*IdleTimeoutHandler.

Result:

- Session creation failure is notified much sooner in the situation
  described above.
- We will have more information about GOAWAY frames and unexpected
  behaviors related with session protocol negotiation
  • Loading branch information
trustin committed Apr 8, 2016
1 parent 87be49e commit 74e18db
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws
if (pendingResCount == 0 && evt.isFirst()) {
logger.debug("{} Closing due to idleness", ctx.channel());
ctx.close();
return;
}

ctx.fireUserEventTriggered(evt);
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/linecorp/armeria/client/HttpConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.http.AbstractHttpToHttp2ConnectionHandler;
import com.linecorp.armeria.common.http.Http1ClientCodec;
import com.linecorp.armeria.common.http.Http2GoAwayListener;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.NativeLibraries;

Expand Down Expand Up @@ -205,7 +206,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
return;
}

addBeforeSessionHandler(pipeline, newHttp2ConnectionHandler());
addBeforeSessionHandler(pipeline, newHttp2ConnectionHandler(ch));
protocol = H2;
} else {
if (httpPreference != HttpPreference.HTTP1_REQUIRED) {
Expand All @@ -226,7 +227,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Exceptions.logIfUnexpected(logger, ctx.channel(), cause);
Exceptions.logIfUnexpected(logger, ctx.channel(), null, cause);
ctx.close();
}
});
Expand Down Expand Up @@ -255,7 +256,7 @@ private void configureAsHttp(Channel ch) {
if (attemptUpgrade) {
Http1ClientCodec http1Codec = newHttp1Codec();
Http2ClientUpgradeCodec http2ClientUpgradeCodec =
new Http2ClientUpgradeCodec(newHttp2ConnectionHandler());
new Http2ClientUpgradeCodec(newHttp2ConnectionHandler(ch));
HttpClientUpgradeHandler http2UpgradeHandler =
new HttpClientUpgradeHandler(http1Codec, http2ClientUpgradeCodec, options.maxFrameLength());

Expand Down Expand Up @@ -460,9 +461,10 @@ private void retryWithH1C(ChannelHandlerContext ctx) {
}
}

private Http2ConnectionHandler newHttp2ConnectionHandler() {
private Http2ConnectionHandler newHttp2ConnectionHandler(Channel ch) {
final boolean validateHeaders = false;
final Http2Connection conn = new DefaultHttp2Connection(false);
conn.addListener(new Http2GoAwayListener(ch));
final InboundHttp2ToHttpAdapter listener = new InboundHttp2ToHttpAdapterBuilder(conn)
.propagateSettings(true).validateHttpHeaders(validateHeaders)
.maxContentLength(options.maxFrameLength()).build();
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static HttpSession get(Channel ch) {
private WaitsHolder waitsHolder = PreNegotiationWaitsHolder.INSTANCE;
private volatile boolean active = true;
private int numRequestsSent;
private boolean needsRetryWithH1C;

HttpSessionHandler(HttpSessionChannelFactory channelFactory,
Promise<Channel> sessionPromise, ScheduledFuture<?> timeoutFuture) {
Expand Down Expand Up @@ -173,8 +174,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ReferenceCountUtil.release(msg);
}
} else {
//if invocation not found, we just bypass message to next
ctx.fireChannelRead(msg);
logger.warn("{} Received a response without a matching request: {}", ctx.channel(), msg);
}

if (isDisconnectionRequired(response)) {
Expand Down Expand Up @@ -218,25 +218,32 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

if (evt == RETRY_WITH_H1C) {
// Protocol upgrade has failed, but needs to retry.
needsRetryWithH1C = true;
timeoutFuture.cancel(false);
ctx.close();
channelFactory.connect(ctx.channel().remoteAddress(), SessionProtocol.H1C, sessionPromise);
return;
}

logger.debug("Swallowing a user event: {}", evt);
logger.warn("{} Unexpected user event: {}", ctx.channel(), evt);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
failPendingResponses(ClosedSessionException.INSTANCE);
ctx.fireChannelInactive();

if (!needsRetryWithH1C) {
// Cancel the timeout and reject the sessionPromise just in case the connection has been closed
// even before the session protocol negotiation is done.
timeoutFuture.cancel(false);
sessionPromise.tryFailure(ClosedSessionException.INSTANCE);
}
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Exceptions.logIfUnexpected(logger, ctx.channel(), cause);
Exceptions.logIfUnexpected(logger, ctx.channel(), protocol(), cause);
if (ctx.channel().isActive()) {
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.http;

import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Stream;

/**
* A {@link Http2Connection.Listener} that logs the received GOAWAY frames and makes sure disconnection.
*/
public class Http2GoAwayListener extends Http2ConnectionAdapter {

private static final Logger logger = LoggerFactory.getLogger(Http2GoAwayListener.class);

private final Channel ch;
private boolean goAwaySent;

public Http2GoAwayListener(Channel ch) {
this.ch = ch;
}

@Override
public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
goAwaySent = true;
onGoAway("Sent", lastStreamId, errorCode, debugData);
}

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
onGoAway("Received", lastStreamId, errorCode, debugData);

// Send a GOAWAY back to the peer and close the connection gracefully if we did not send GOAWAY yet.
// This will make sure that the connection is always closed after receiving GOAWAY,
// because otherwise we have to wait until the peer who sent GOAWAY to us closes the connection.
if (!goAwaySent) {
ch.close();
}
}

private void onGoAway(String sentOrReceived, int lastStreamId, long errorCode, ByteBuf debugData) {
if (errorCode != Http2Error.NO_ERROR.code()) {
if (logger.isWarnEnabled()) {
logger.warn("{} {} a GOAWAY frame: lastStreamId={}, errorCode={}, debugData=\"{}\" (Hex: {})",
ch, sentOrReceived, lastStreamId, errorStr(errorCode),
debugData.toString(StandardCharsets.UTF_8),
ByteBufUtil.hexDump(debugData));
}
} else {
if (logger.isInfoEnabled()) {
logger.debug("{} {} a GOAWAY frame: lastStreamId={}, errorCode=NO_ERROR",
ch, sentOrReceived, lastStreamId);
}
}
}

private static String errorStr(long errorCode) {
final Http2Error error = Http2Error.valueOf(errorCode);
return error != null ? error.toString() + '(' + errorCode + ')'
: "UNKNOWN(" + errorCode + ')';
}

@Override
public void onStreamRemoved(Http2Stream stream) {
if (stream.id() == 1) {
logger.debug("{} HTTP/2 upgrade stream removed: {}", ch, stream.state());
}
}
}
17 changes: 13 additions & 4 deletions src/main/java/com/linecorp/armeria/common/util/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;

import com.linecorp.armeria.client.ClosedSessionException;
import com.linecorp.armeria.common.SessionProtocol;

import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
Expand All @@ -46,23 +47,31 @@ public final class Exceptions {
/**
* Logs the specified exception if it is {@linkplain #isExpected(Throwable)} unexpected}.
*/
public static void logIfUnexpected(Logger logger, Channel ch, Throwable cause) {
public static void logIfUnexpected(Logger logger, Channel ch, SessionProtocol protocol, Throwable cause) {
if (!logger.isWarnEnabled() || isExpected(cause)) {
return;
}

logger.warn("{} Unexpected exception:", ch, cause);
logger.warn("{}[{}] Unexpected exception:",
ch, protocolName(protocol), cause);
}

/**
* Logs the specified exception if it is {@linkplain #isExpected(Throwable)} unexpected}.
*/
public static void logIfUnexpected(Logger logger, Channel ch, String debugData, Throwable cause) {
public static void logIfUnexpected(Logger logger, Channel ch, SessionProtocol protocol,
String debugData, Throwable cause) {

if (!logger.isWarnEnabled() || isExpected(cause)) {
return;
}

logger.warn("{} Unexpected exception: {}", ch, debugData, cause);
logger.warn("{}[{}] Unexpected exception: {}",
ch, protocolName(protocol), debugData, cause);
}

private static String protocolName(SessionProtocol protocol) {
return protocol != null ? protocol.uriText() : "<unknown>";
}

/**
Expand Down
66 changes: 43 additions & 23 deletions src/main/java/com/linecorp/armeria/server/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ final class HttpServerHandler extends ChannelInboundHandlerAdapter {
final Throwable cause = future.cause();
final Channel ch = future.channel();
if (cause != null) {
Exceptions.logIfUnexpected(logger, ch, cause);
Exceptions.logIfUnexpected(logger, ch, protocol(ch), cause);
}
safeClose(ch);
};
Expand All @@ -77,11 +77,22 @@ final class HttpServerHandler extends ChannelInboundHandlerAdapter {
final Throwable cause = future.cause();
if (cause != null) {
final Channel ch = future.channel();
Exceptions.logIfUnexpected(logger, ch, cause);
Exceptions.logIfUnexpected(logger, ch, protocol(ch), cause);
safeClose(ch);
}
};

private static SessionProtocol protocol(Channel ch) {
final HttpServerHandler handler = ch.pipeline().get(HttpServerHandler.class);
final SessionProtocol protocol;
if (handler != null) {
protocol = handler.protocol;
} else {
protocol = null;
}
return protocol;
}

static void safeClose(Channel ch) {
if (!ch.isActive()) {
return;
Expand Down Expand Up @@ -110,7 +121,7 @@ static void safeClose(Channel ch) {
private static final Exception SERVICE_NOT_FOUND = new ServiceNotFoundException();

private final ServerConfig config;
private SessionProtocol sessionProtocol;
private SessionProtocol protocol;

private boolean isReading;

Expand All @@ -137,13 +148,13 @@ static void safeClose(Channel ch) {

private boolean handledLastRequest;

HttpServerHandler(ServerConfig config, SessionProtocol sessionProtocol) {
assert sessionProtocol == SessionProtocol.H1 ||
sessionProtocol == SessionProtocol.H1C ||
sessionProtocol == SessionProtocol.H2;
HttpServerHandler(ServerConfig config, SessionProtocol protocol) {
assert protocol == SessionProtocol.H1 ||
protocol == SessionProtocol.H1C ||
protocol == SessionProtocol.H2;

this.config = requireNonNull(config, "config");
this.sessionProtocol = requireNonNull(sessionProtocol, "protocol");
this.protocol = requireNonNull(protocol, "protocol");
}

@Override
Expand All @@ -159,16 +170,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

private void handleHttp2Settings(ChannelHandlerContext ctx, Http2Settings h2settings) {
logger.debug("{} HTTP/2 settings: {}", ctx.channel(), h2settings);

useHeadOfLineBlocking = false;
switch (sessionProtocol) {
case H1:
sessionProtocol = SessionProtocol.H2;
break;
case H1C:
sessionProtocol = SessionProtocol.H2C;
break;
}
}

private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
Expand Down Expand Up @@ -217,7 +218,7 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest req) throw
final ServiceCodec codec = service.codec();
final Promise<Object> promise = ctx.executor().newPromise();
final DecodeResult decodeResult = codec.decodeRequest(
serviceCfg, ctx.channel(), sessionProtocol,
serviceCfg, ctx.channel(), protocol,
hostname, path, mappedPath, req.content(), req, promise);

switch (decodeResult.type()) {
Expand Down Expand Up @@ -423,12 +424,12 @@ private void respond(ChannelHandlerContext ctx, int reqSeq, FullHttpRequest req,
// A response to a HEAD request must have no content.
content = Unpooled.EMPTY_BUFFER;
if (cause != null) {
Exceptions.logIfUnexpected(logger, ctx.channel(), errorMessage(status), cause);
Exceptions.logIfUnexpected(logger, ctx.channel(), protocol, errorMessage(status), cause);
}
} else {
final String msg = errorMessage(status);
if (cause != null) {
Exceptions.logIfUnexpected(logger, ctx.channel(), msg, cause);
Exceptions.logIfUnexpected(logger, ctx.channel(), protocol, msg, cause);
}
content = Unpooled.copiedBuffer(msg, StandardCharsets.UTF_8);
}
Expand Down Expand Up @@ -526,22 +527,41 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof UpgradeEvent) {
assert !isReading;

// Upgraded to HTTP/2.
useHeadOfLineBlocking = false;
switch (protocol) {
case H1:
protocol = SessionProtocol.H2;
break;
case H1C:
protocol = SessionProtocol.H2C;
break;
}

final FullHttpRequest req = ((UpgradeEvent) evt).upgradeRequest();
req.headers().set(STREAM_ID, "1");

// Remove the headers related with the upgrade.
req.headers().remove(HttpHeaderNames.CONNECTION);
req.headers().remove(HttpHeaderNames.UPGRADE);
req.headers().remove(Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER);

logger.debug("{} Handling the pre-upgrade request ({}): {}",
ctx.channel(), ((UpgradeEvent) evt).protocol(), req);

// Set the stream ID of the pre-upgrade request, which is always 1.
req.headers().set(STREAM_ID, "1");

channelRead(ctx, req);
channelReadComplete(ctx);
return;
}

logger.warn("{} Unexpected user event: {}", ctx.channel(), evt);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Exceptions.logIfUnexpected(logger, ctx.channel(), cause);
Exceptions.logIfUnexpected(logger, ctx.channel(), protocol, cause);
if (ctx.channel().isActive()) {
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws
if (pendingResCount == 0 && evt.isFirst()) {
logger.debug("{} Closing due to idleness", ctx.channel());
ctx.close();
return;
}

ctx.fireUserEventTriggered(evt);
Expand Down
Loading

0 comments on commit 74e18db

Please sign in to comment.