Skip to content

Commit

Permalink
Improve the cleanup of streaming responses (#1119)
Browse files Browse the repository at this point in the history
Motivation:

A successfully sent response can be logged with AbortedStreamException
or ClosedChannelException due to race conditions.

Modifications:

- Make HttpResponseSubscriber handle AbortedStreamException and
  ClosedChannelException better so that they are not logged
  unnecessarily
- Make HttpServerHandler clean up unfinished HTTP/1 responses a little
  bit later so that they are not aborted too early.
- Miscellaneous:
  - Fix a bug in DeferredStreamMessage where NeverInvokedSubscriber is
    invoked, which happens when a DeferredStreamMessage is delegated to
    another DeferredStreamMessage.

Result:

- Much less chance of redundant AbortedStreamException or
  ClosedChannelException
  • Loading branch information
trustin committed Apr 5, 2018
1 parent c2123c6 commit 0ce1fb5
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 51 deletions.
Expand Up @@ -55,6 +55,12 @@ public class DeferredStreamMessage<T> extends AbstractStreamMessage<T> {
AtomicIntegerFieldUpdater.newUpdater(
DeferredStreamMessage.class, "subscribedToDelegate");

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<DeferredStreamMessage>
abortPendingUpdater =
AtomicIntegerFieldUpdater.newUpdater(
DeferredStreamMessage.class, "abortPending");

@Nullable
@SuppressWarnings("unused") // Updated only via delegateUpdater
private volatile StreamMessage<T> delegate;
Expand All @@ -73,7 +79,7 @@ public class DeferredStreamMessage<T> extends AbstractStreamMessage<T> {
// Only accessed from subscription's executor.
private long pendingDemand;

private volatile boolean abortPending;
private volatile int abortPending; // 0 - false, 1 - true

// Only accessed from subscription's executor.
private boolean cancelPending;
Expand All @@ -91,7 +97,7 @@ protected void delegate(StreamMessage<T> delegate) {
throw new IllegalStateException("delegate set already");
}

if (abortPending) {
if (abortPending != 0) {
delegate.abort();
}

Expand Down Expand Up @@ -199,9 +205,14 @@ private void doCancel() {
try {
delegateSubscription.cancel();
} finally {
final SubscriptionImpl subscription = this.subscription;
assert subscription != null;
subscription.clearSubscriber();
// Clear the subscriber when we become sure that the delegate will not produce events anymore.
final StreamMessage<T> delegate = this.delegate;
assert delegate != null;
if (delegate.isComplete()) {
subscription.clearSubscriber();
} else {
delegate.completionFuture().whenComplete((u1, u2) -> subscription.clearSubscriber());
}
}
} else {
cancelPending = true;
Expand Down Expand Up @@ -250,7 +261,9 @@ private void safeOnSubscribeToDelegate() {

@Override
public void abort() {
abortPending = true;
if (!abortPendingUpdater.compareAndSet(this, 0, 1)) {
return;
}

final SubscriptionImpl newSubscription = new SubscriptionImpl(
this, AbortingSubscriber.get(), ImmediateEventExecutor.INSTANCE, false);
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.server;

import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -36,6 +37,8 @@
import com.linecorp.armeria.common.HttpStatusClass;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.internal.Http1ObjectEncoder;
import com.linecorp.armeria.internal.HttpObjectEncoder;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -209,7 +212,7 @@ public void onNext(HttpObject o) {
return;
}

write(o, endOfStream, true);
write(o, endOfStream);
}

@Override
Expand All @@ -232,6 +235,11 @@ public void onError(Throwable cause) {
failAndRespond(cause,
AggregatedHttpMessage.of(((HttpStatusException) cause).httpStatus()),
Http2Error.CANCEL);
} else if (cause instanceof AbortedStreamException) {
// One of the two cases:
// - Client closed the connection too early.
// - Response publisher aborted the stream.
failAndReset((AbortedStreamException) cause);
} else {
logger.warn("{} Unexpected exception from a service or a response publisher: {}",
ctx.channel(), service(), cause);
Expand All @@ -254,37 +262,54 @@ public void onComplete() {
}

if (state != State.DONE) {
write(HttpData.EMPTY_DATA, true, true);
write(HttpData.EMPTY_DATA, true);
}
}

private void write(HttpObject o, boolean endOfStream, boolean flush) {
private void write(HttpObject o, boolean endOfStream) {
final Channel ch = ctx.channel();
if (endOfStream) {
setDone();
}

ch.eventLoop().execute(() -> write0(o, endOfStream, flush));
ch.eventLoop().execute(() -> write0(o, endOfStream));
}

private void write0(HttpObject o, boolean endOfStream, boolean flush) {
private void write0(HttpObject o, boolean endOfStream) {
final ChannelFuture future;
final boolean wroteEmptyData;
if (o instanceof HttpData) {
final HttpData data = (HttpData) o;
wroteEmptyData = data.isEmpty();
future = responseEncoder.writeData(ctx, req.id(), req.streamId(), data, endOfStream);
logBuilder().increaseResponseLength(data.length());
} else if (o instanceof HttpHeaders) {
wroteEmptyData = false;
future = responseEncoder.writeHeaders(ctx, req.id(), req.streamId(), (HttpHeaders) o, endOfStream);
} else {
// Should never reach here because we did validation in onNext().
throw new Error();
}

future.addListener((ChannelFuture f) -> {
final boolean isSuccess;
if (f.isSuccess()) {
isSuccess = true;
} else {
// If 1) the last chunk we attempted to send was empty,
// 2) the connection has been closed,
// 3) and the protocol is HTTP/1,
// it is very likely that a client closed the connection after receiving the complete content,
// which is not really a problem.
isSuccess = endOfStream && wroteEmptyData &&
f.cause() instanceof ClosedChannelException &&
responseEncoder instanceof Http1ObjectEncoder;
}

// Write an access log if:
// - every message has been sent successfully.
// - any write operation is failed with a cause.
if (f.isSuccess()) {
if (isSuccess) {
if (endOfStream && tryComplete()) {
logBuilder().endResponse();
accessLogWriter.accept(reqCtx.log());
Expand All @@ -304,18 +329,14 @@ private void write0(HttpObject o, boolean endOfStream, boolean flush) {
HttpServerHandler.CLOSE_ON_FAILURE.operationComplete(f);
});

if (flush) {
ctx.flush();
}

if (state == State.DONE) {
subscription.cancel();
}
ctx.flush();
}

private void setDone() {
private State setDone() {
cancelTimeout();
state = State.DONE;
final State oldState = this.state;
this.state = State.DONE;
return oldState;
}

private void failAndRespond(Throwable cause, AggregatedHttpMessage message, Http2Error error) {
Expand All @@ -325,15 +346,14 @@ private void failAndRespond(Throwable cause, AggregatedHttpMessage message, Http
logBuilder().responseHeaders(headers);
logBuilder().increaseResponseLength(content.length());

final State state = this.state; // Keep the state before calling fail() because it updates state.
setDone();
final State oldState = setDone();
subscription.cancel();

final int id = req.id();
final int streamId = req.streamId();

final ChannelFuture future;
if (wroteNothing(state)) {
if (wroteNothing(oldState)) {
// Did not write anything yet; we can send an error response instead of resetting the stream.
if (content.isEmpty()) {
future = responseEncoder.writeHeaders(ctx, id, streamId, headers, true);
Expand All @@ -346,7 +366,21 @@ private void failAndRespond(Throwable cause, AggregatedHttpMessage message, Http
future = responseEncoder.writeReset(ctx, id, streamId, error);
}

if (state != State.DONE) {
addCallbackAndFlush(cause, oldState, future);
}

private void failAndReset(AbortedStreamException cause) {
final State oldState = setDone();
subscription.cancel();

final ChannelFuture future =
responseEncoder.writeReset(ctx, req.id(), req.streamId(), Http2Error.CANCEL);

addCallbackAndFlush(cause, oldState, future);
}

private void addCallbackAndFlush(Throwable cause, State oldState, ChannelFuture future) {
if (oldState != State.DONE) {
future.addListener(unused -> {
// Write an access log always with a cause. Respect the first specified cause.
if (tryComplete()) {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.net.InetSocketAddress;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -190,16 +191,37 @@ public int unfinishedRequests() {
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
unfinishedResponses.keySet().forEach(StreamMessage::abort);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Give the unfinished streaming responses a chance to close themselves before we abort them,
// so that successful responses are not aborted due to a race condition like the following:
//
// 1) A publisher of a response stream sends the complete response
// but does not call StreamWriter.close() just yet.
// 2) An HTTP/1 client receives the complete response and closes the connection, which is totally fine.
// 3) The response stream is aborted once the server detects the disconnection.
// 4) The publisher calls StreamWriter.close() but it's aborted already.
//
// To reduce the chance of such situation, we wait a little bit before aborting unfinished responses.

switch (protocol) {
case H1C:
case H1:
// XXX(trustin): How much time is 'a little bit'?
ctx.channel().eventLoop().schedule(this::cleanup, 1, TimeUnit.SECONDS);
break;
default:
// HTTP/2 is unaffected by this issue because a client is expected to wait for a frame with
// endOfStream set.
cleanup();
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
private void cleanup() {
if (responseEncoder != null) {
responseEncoder.close();
}

unfinishedResponses.keySet().forEach(StreamMessage::abort);
}

@Override
Expand Down Expand Up @@ -366,17 +388,15 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th
}
})).exceptionally(CompletionActions::log);

res.completionFuture().handle(voidFunction((ret, cause) -> {
res.completionFuture().handleAsync(voidFunction((ret, cause) -> {
req.abort();
// NB: logBuilder.endResponse() is called by HttpResponseSubscriber below.
eventLoop.execute(() -> {
gracefulShutdownSupport.dec();
unfinishedResponses.remove(res);
if (--unfinishedRequests == 0 && handledLastRequest) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(CLOSE);
}
});
})).exceptionally(CompletionActions::log);
gracefulShutdownSupport.dec();
unfinishedResponses.remove(res);
if (--unfinishedRequests == 0 && handledLastRequest) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(CLOSE);
}
}), eventLoop).exceptionally(CompletionActions::log);

assert responseEncoder != null;
final HttpResponseSubscriber resSubscriber =
Expand Down
Expand Up @@ -282,16 +282,17 @@ private void invoke(ServiceRequestContext ctx, HttpResponseWriter res,
}

final HttpHeaders headers = toResponseHeaders(transport);
res.write(headers);
for (;;) {
final HttpData data = out.poll();
if (data == null || !res.tryWrite(data)) {
break;
if (res.tryWrite(headers)) {
for (;;) {
final HttpData data = out.poll();
if (data == null || !res.tryWrite(data)) {
break;
}
}
}
res.close();
} catch (Throwable t) {
logger.warn("{} Failed to produce a response:", ctx, t);
} finally {
res.close();
}
}
Expand Down
Expand Up @@ -391,16 +391,17 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc
try {
coyoteAdapter.service(coyoteReq, coyoteRes);
final HttpHeaders headers = convertResponse(coyoteRes);
res.write(headers);
for (;;) {
final HttpData d = data.poll();
if (d == null || !res.tryWrite(d)) {
break;
if (res.tryWrite(headers)) {
for (;;) {
final HttpData d = data.poll();
if (d == null || !res.tryWrite(d)) {
break;
}
}
}
res.close();
} catch (Throwable t) {
logger.warn("{} Failed to produce a response:", ctx, t);
} finally {
res.close();
}
});
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.junit.ClassRule;
import org.junit.Test;

Expand Down Expand Up @@ -98,6 +99,7 @@ public void jarBasedWebApp() throws Exception {
.startsWith("application/java");
assertThat(res.getFirstHeader(HttpHeaderNames.CONTENT_LENGTH.toString()).getValue())
.isEqualTo("1361");
EntityUtils.consume(res.getEntity());
}
}
}
Expand All @@ -111,6 +113,7 @@ public void jarBasedWebAppWithAlternativeRoot() throws Exception {
.startsWith("application/java");
assertThat(res.getFirstHeader(HttpHeaderNames.CONTENT_LENGTH.toString()).getValue())
.isEqualTo("1361");
EntityUtils.consume(res.getEntity());
}
}
}
Expand Down

0 comments on commit 0ce1fb5

Please sign in to comment.