Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure HttpServerMetricsRecorder#recordServerConnectionInactive/Close is invoked for websockets #3229

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ final class WebsocketServerOperations extends HttpServerOperations
else {
removeHandler(NettyPipeline.HttpTrafficHandler);
removeHandler(NettyPipeline.AccessLogHandler);
removeHandler(NettyPipeline.HttpMetricsHandler);
ChannelHandler handler = channel.pipeline().get(NettyPipeline.HttpMetricsHandler);
if (handler != null) {
replaceHandler(NettyPipeline.HttpMetricsHandler,
new WebsocketHttpServerMetricsHandler((AbstractHttpServerMetricsHandler) handler));
}

handshakerResult = channel.newPromise();
HttpRequest request = new DefaultFullHttpRequest(replaced.version(),
Expand Down Expand Up @@ -295,4 +299,70 @@ public String selectedSubprotocol() {
static final AtomicIntegerFieldUpdater<WebsocketServerOperations> CLOSE_SENT =
AtomicIntegerFieldUpdater.newUpdater(WebsocketServerOperations.class,
"closeSent");

static final class WebsocketHttpServerMetricsHandler extends AbstractHttpServerMetricsHandler {

final HttpServerMetricsRecorder recorder;

WebsocketHttpServerMetricsHandler(AbstractHttpServerMetricsHandler copy) {
super(copy);
this.recorder = copy.recorder();
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
try {
if (channelOpened && recorder instanceof MicrometerHttpServerMetricsRecorder) {
// For custom user recorders, we don't propagate the channelInactive event, because this will be done
// by the ChannelMetricsHandler itself. ChannelMetricsHandler is only present when the recorder is
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
channelOpened = false;
// Always use the real connection local address without any proxy information
recorder.recordServerConnectionClosed(ctx.channel().localAddress());
}

if (channelActivated) {
channelActivated = false;
// Always use the real connection local address without any proxy information
recorder.recordServerConnectionInactive(ctx.channel().localAddress());
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
finally {
ctx.fireChannelInactive();
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@Override
public HttpServerMetricsRecorder recorder() {
return recorder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ static void createSelfSignedCertificate() throws CertificateException {
* <li> /5 is used by testServerConnectionsRecorder test</li>
* <li> /6 is used by testServerConnectionsMicrometerConnectionClose test</li>
* <li> /7 is used by testServerConnectionsRecorderConnectionClose test</li>
* <li> /8 is used by testServerConnectionsWebsocketMicrometer test</li>
* <li> /9 is used by testServerConnectionsWebsocketRecorder test</li>
* </ul>
*/
@BeforeEach
Expand Down Expand Up @@ -200,7 +202,12 @@ void setUp() {
.get("/7", (req, res) -> {
checkServerConnectionsRecorder(req);
return Mono.delay(Duration.ofMillis(200)).then(res.send());
}));
})
.get("/8", (req, res) -> res.sendWebsocket((in, out) ->
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsMicrometer(req)))))
.get("/9", (req, res) -> res.sendWebsocket((in, out) ->
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsRecorder(req)))))
);

provider = ConnectionProvider.create("HttpMetricsHandlerTests", 1);
httpClient = createClient(provider, () -> disposableServer.address())
Expand Down Expand Up @@ -746,6 +753,28 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
}
}

@Test
void testServerConnectionsWebsocketMicrometer() throws Exception {
disposableServer = httpServer
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
.bindNow();

String address = formatSocketAddress(disposableServer.address());

httpClient.websocket()
.uri("/8")
.handle((in, out) -> in.receive().aggregate().asString())
.as(StepVerifier::create)
.expectNext("Hello World!")
.expectComplete()
.verify(Duration.ofSeconds(30));

// make sure the client socket is closed on the server side before checking server metrics
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
}

@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
Expand Down Expand Up @@ -849,6 +878,32 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
}
}

@Test
void testServerConnectionsWebsocketRecorder() throws Exception {
ServerRecorder.INSTANCE.reset();
disposableServer = httpServer.metrics(true, ServerRecorder.supplier(), Function.identity())
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
.bindNow();

String address = formatSocketAddress(disposableServer.address());

httpClient.websocket()
.uri("/9")
.handle((in, out) -> in.receive().aggregate().asString())
.as(StepVerifier::create)
.expectNext("Hello World!")
.expectComplete()
.verify(Duration.ofSeconds(30));

// make sure the client socket is closed on the server side before checking server metrics
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertThat(ServerRecorder.INSTANCE.error.get()).isNull();
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
}

@Test
void testIssue896() throws Exception {
disposableServer = httpServer.noSSL()
Expand Down