Skip to content
This repository has been archived by the owner on Sep 28, 2021. It is now read-only.

improve robustness of AsyncContextOngoingRequest #159

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,28 @@ public ServerInfo serverInfo() {

@Override
public void reply(Response<ByteString> response) {
final HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse();

final StatusType status = response.status();
httpResponse.setStatus(status.code(), status.reasonPhrase());

response.headers().forEach(httpResponse::addHeader);

response.payload().ifPresent(payload -> {
try {
payload.write(httpResponse.getOutputStream());
} catch (IOException e) {
LOGGER.error("Failed to write response", e);
}
});

asyncContext.complete();
try {
final HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse();

final StatusType status = response.status();
httpResponse.setStatus(status.code(), status.reasonPhrase());

response.headers().forEach(httpResponse::addHeader);

response.payload().ifPresent(payload -> {
try {
payload.write(httpResponse.getOutputStream());
} catch (IOException e) {
LOGGER.warn("Failed to write response", e);
}
});

asyncContext.complete();
} catch (Exception e) {
// this can happen 'normally' when the request has timed out, for instance. In this case,
// an IllegalStateException will be thrown.
LOGGER.warn("Error sending response", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -196,7 +198,7 @@ public void shouldHandleMissingCallingServiceHeader() throws Exception {
// note: this test may fail when running in IntelliJ, due to
// https://youtrack.jetbrains.com/issue/IDEA-122783
@Test
public void shouldLogErrorWritingResponse() throws Exception {
public void shouldLogWarningOnErrorWritingResponse() throws Exception {
HttpServletResponse spy = spy(response);
when(asyncContext.getResponse()).thenReturn(spy);
doReturn(outputStream).when(spy).getOutputStream();
Expand All @@ -221,13 +223,80 @@ public InetSocketAddress socketAddress() {
ongoingRequest.reply(Response.forPayload(ByteString.encodeUtf8("floop")));

List<LoggingEvent> events = testLogger.getLoggingEvents().stream()
.filter(event -> event.getLevel() == Level.ERROR)
.filter(event -> event.getLevel() == Level.WARN)
.filter(event -> event.getMessage().contains("Failed to write response"))
.collect(Collectors.toList());

assertThat(events, hasSize(1));
}

// note: this test may fail when running in IntelliJ, due to
// https://youtrack.jetbrains.com/issue/IDEA-122783
@Test
public void shouldLogWarningOnFailureToGetAsyncContextResponse() throws Exception {
when(asyncContext.getResponse()).thenThrow(new IllegalStateException("context completed test"));

ApolloRequestHandler.AsyncContextOngoingRequest ongoingRequest = new ApolloRequestHandler.AsyncContextOngoingRequest(
new ServerInfo() {
@Override
public String id() {
return "14";
}

@Override
public InetSocketAddress socketAddress() {
return InetSocketAddress.createUnresolved("localhost", 888);
}
},
Request.forUri("http://localhost:888"),
asyncContext, 9123
);

ongoingRequest.reply(Response.forPayload(ByteString.encodeUtf8("floop")));

List<String> events = testLogger.getLoggingEvents().stream()
.filter(event -> event.getLevel() == Level.WARN)
.map(LoggingEvent::getMessage)
.collect(Collectors.toList());

assertThat(events, hasSize(1));
assertThat(events, hasItem(containsString("Error sending response")));
}

// note: this test may fail when running in IntelliJ, due to
// https://youtrack.jetbrains.com/issue/IDEA-122783
@Test
public void shouldLogWarningOnFailureToCompleteAsyncContext() throws Exception {
when(asyncContext.getResponse()).thenReturn(response);
doThrow(new IllegalStateException("completed test")).when(asyncContext).complete();

ApolloRequestHandler.AsyncContextOngoingRequest ongoingRequest = new ApolloRequestHandler.AsyncContextOngoingRequest(
new ServerInfo() {
@Override
public String id() {
return "14";
}

@Override
public InetSocketAddress socketAddress() {
return InetSocketAddress.createUnresolved("localhost", 888);
}
},
Request.forUri("http://localhost:888"),
asyncContext, 9123
);

ongoingRequest.reply(Response.forPayload(ByteString.encodeUtf8("floop")));

List<String> events = testLogger.getLoggingEvents().stream()
.filter(event -> event.getLevel() == Level.WARN)
.map(LoggingEvent::getMessage)
.collect(Collectors.toList());

assertThat(events, hasSize(1));
assertThat(events, hasItem(containsString("Error sending response")));
}

@Test
public void shouldForwardRepliesToJetty() throws Exception {
requestHandler.handle("/floop", baseRequest, httpServletRequest, response);
Expand Down