Skip to content

Commit

Permalink
Fix a bug where ClosedSessionException is set to responseCause fo…
Browse files Browse the repository at this point in the history
…r a success response. (#4632)

Motivation:

It would be legitimate that an HTTP/1 client closes a connection after data is fully received. Although the connection is closed, the stream on the server side can be still open. Because `HttpResponseSubscriber` may be complete after fully sending data and then receiving `onComplete()` signal. Receiving `onComplete()`, `HttpResponseSubscriber` tries to write an empty chunk as a mark of EOS to the disconnected channel. https://github.com/line/armeria/blob/fee87f8da942d0e6343e814489524b73db60ef3b/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java#L317-L320

The write attempt would fail with `ClosedSessionException` and the failure is set to `responseCause`. Practically, `responseCause` is meaningless and false positive.

Modifications:

- Check a connection is active as well when `Http1ObjectEncoder` determines a session is writable.
- Do not send EOS and mark a request as success when an HTTP/1 session is inactive when handling `onComplete()` in `HttpResponseSubscriber`

Result:

You no longer see `ClosedSessionException` when a connection is closed after a response data has been fully sent in HTTP/1.
  • Loading branch information
ikhoon committed Feb 8, 2023
1 parent bfc1924 commit 3713d52
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public final boolean isWritable(int id, int streamId) {
}

protected final boolean isWritable(int id) {
return id < minClosedId;
return id < minClosedId && !isClosed();
}

protected final void updateClosedId(int id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.CancellationException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpHeaders;
Expand All @@ -39,6 +40,7 @@
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.common.Http1ObjectEncoder;
Expand Down Expand Up @@ -314,14 +316,36 @@ public void onComplete() {
responseEncoder.writeTrailers(req.id(), req.streamId(), additionalTrailers)
.addListener(writeHeadersFutureListener(true));
ctx.flush();
} else if (responseEncoder.isWritable(req.id(), req.streamId())) {
responseEncoder.writeData(req.id(), req.streamId(), HttpData.empty(), true)
.addListener(writeDataFutureListener(true, true));
ctx.flush();
} else {
if (isWritable()) {
responseEncoder.writeData(req.id(), req.streamId(), HttpData.empty(), true)
.addListener(writeDataFutureListener(true, true));
ctx.flush();
} else {
if (!reqCtx.sessionProtocol().isMultiplex()) {
// An HTTP/1 connection is closed by a remote peer after all data is sent,
// so we can assume the HTTP/1 request is complete successfully.
succeed();
} else {
fail(ClosedStreamException.get());
}
}
}
}
}

private void succeed() {
if (tryComplete(null)) {
final Throwable capturedException = CapturedServiceException.get(reqCtx);
if (capturedException != null) {
endLogRequestAndResponse(capturedException);
} else {
endLogRequestAndResponse();
}
maybeWriteAccessLog();
}
}

@Override
void fail(Throwable cause) {
if (tryComplete(cause)) {
Expand Down Expand Up @@ -434,9 +458,13 @@ public void operationComplete(ChannelFuture future) throws Exception {
// 3) and the protocol is HTTP/1,
// it is very likely that a client closed the connection after receiving the
// complete content, which is not really a problem.
final Throwable cause = future.cause();
isSuccess = endOfStream && wroteEmptyData &&
future.cause() instanceof ClosedChannelException &&
responseEncoder instanceof Http1ObjectEncoder;
responseEncoder instanceof Http1ObjectEncoder &&
(cause instanceof ClosedChannelException ||
// A ClosedSessionException may be raised by HttpObjectEncoder
// if a channel was closed.
cause instanceof ClosedSessionException);
}
handleWriteComplete(future, endOfStream, isSuccess);
}
Expand All @@ -451,15 +479,7 @@ void handleWriteComplete(ChannelFuture future, boolean endOfStream, boolean isSu
maybeLogFirstResponseBytesTransferred();

if (endOfStream) {
if (tryComplete(null)) {
final Throwable capturedException = CapturedServiceException.get(reqCtx);
if (capturedException != null) {
endLogRequestAndResponse(capturedException);
} else {
endLogRequestAndResponse();
}
maybeWriteAccessLog();
}
succeed();
}

if (!isSubscriptionCompleted) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.SplitHttpResponse;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.internal.testing.FlakyTest;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

@FlakyTest
class Http1ServerEarlyDisconnectionTest {

@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service("/", (ctx, req) -> {
final HttpResponseWriter writer = HttpResponse.streaming();
writer.write(ResponseHeaders.builder(200)
.contentLength(10)
.build());
ctx.blockingTaskExecutor().execute(() -> {
writer.write(HttpData.ofUtf8("0123456789"));

// Wait for the client to close the connection.
// Note: The sleep duration should be less than 1 second after which `ServerHandler`
// calls `cleanup()` to remove `unfinishedRequests`.
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

writer.close();
});

return writer;
});
}
};

private static CountDownLatch latch;

@BeforeAll
static void beforeAll() {
latch = new CountDownLatch(1);
}

@Test
void closeConnectionWhenAllContentAreReceived() throws InterruptedException {
final ClientFactory clientFactory = ClientFactory.builder().build();
final WebClient client = WebClient.builder(server.uri(SessionProtocol.H1C))
.factory(clientFactory)
.build();
final HttpResponse response = client.get("/");
final SplitHttpResponse split = response.split();
final ResponseHeaders headers = split.headers().join();
final long contentLength = headers.contentLength();
split.body().subscribe(new Subscriber<HttpData>() {

private int received;

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(HttpData httpData) {
received += httpData.length();
if (received >= contentLength) {
// All data is received, so it should be safe to close the connection.
clientFactory.close();
latch.countDown();
}
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

final ServiceRequestContext ctx = server.requestContextCaptor().take();
final RequestLog log = ctx.log().whenComplete().join();
assertThat(log.responseCause()).isNull();
}
}

0 comments on commit 3713d52

Please sign in to comment.