From 13e26e18358f18a7ec01471d64fc7cbcee67973e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 29 Apr 2024 14:35:00 +0300 Subject: [PATCH 1/3] Cache the request info when collecting metrics Do not search for HttpServerOperations every time when a request info is needed. --- .../AbstractHttpServerMetricsHandler.java | 101 ++++++++++++------ .../ContextAwareHttpServerMetricsHandler.java | 14 ++- .../MicrometerHttpServerMetricsHandler.java | 12 +-- 3 files changed, 83 insertions(+), 44 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index ad6645b75..5f5ac5353 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -30,6 +30,7 @@ import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.annotation.Nullable; +import reactor.util.context.ContextView; import java.net.SocketAddress; import java.time.Duration; @@ -53,12 +54,21 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler { boolean channelActivated; boolean channelOpened; + ContextView contextView; + long dataReceived; long dataReceivedTime; long dataSent; long dataSentTime; + boolean initialized; + + String method; + String path; + SocketAddress remoteSocketAddress; + String status; + final Function methodTagValue; final Function uriTagValue; @@ -72,10 +82,16 @@ protected AbstractHttpServerMetricsHandler( protected AbstractHttpServerMetricsHandler(AbstractHttpServerMetricsHandler copy) { this.channelActivated = copy.channelActivated; this.channelOpened = copy.channelOpened; + this.contextView = copy.contextView; this.dataReceived = copy.dataReceived; this.dataReceivedTime = copy.dataReceivedTime; this.dataSent = copy.dataSent; this.dataSentTime = copy.dataSentTime; + this.initialized = copy.initialized; + this.method = copy.method; + this.path = copy.path; + this.remoteSocketAddress = copy.remoteSocketAddress; + this.status = copy.status; this.methodTagValue = copy.methodTagValue; this.uriTagValue = copy.uriTagValue; } @@ -138,8 +154,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { HttpServerOperations ops = (HttpServerOperations) channelOps; - startWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), - methodTagValue.apply(ops.method().name()), ops.status().codeAsText().toString()); + if (!initialized) { + method = methodTagValue.apply(ops.method().name()); + path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); + // Always take the remote address from the operations in order to consider proxy information + // Use remoteSocketAddress() in order to obtain UDS info + remoteSocketAddress = ops.remoteSocketAddress(); + initialized = true; + } + if (contextView == null) { + contextView = ops.currentContext(); + } + status = ops.status().codeAsText().toString(); + startWrite(ops); } } @@ -149,10 +176,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) promise.addListener(future -> { ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; try { - recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), - methodTagValue.apply(ops.method().name()), ops.status().codeAsText().toString()); + recordWrite((HttpServerOperations) channelOps); } catch (RuntimeException e) { // Allow request-response exchange to continue, unaffected by metrics problem @@ -163,8 +188,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } recordInactiveConnectionOrStream(ctx.channel()); - - dataSent = 0; }); } } @@ -182,12 +205,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + HttpServerOperations ops = null; try { if (msg instanceof HttpRequest) { + reset(); ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), methodTagValue.apply(ops.method().name())); + ops = (HttpServerOperations) channelOps; + if (!initialized) { + method = methodTagValue.apply(ops.method().name()); + path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); + // Always take the remote address from the operations in order to consider proxy information + // Use remoteSocketAddress() in order to obtain UDS info + remoteSocketAddress = ops.remoteSocketAddress(); + initialized = true; + } + startRead(ops); } channelActivated = true; @@ -204,13 +237,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { dataReceived += extractProcessedDataFromBuffer(msg); if (msg instanceof LastHttpContent) { - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - recordRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), methodTagValue.apply(ops.method().name())); - } - - dataReceived = 0; + recordRead(); } } catch (RuntimeException e) { @@ -221,17 +248,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } ctx.fireChannelRead(msg); + + if (ops != null) { + // ContextView is available only when a subscription to the I/O Handler happens + contextView = ops.currentContext(); + } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { try { - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - // Always take the remote address from the operations in order to consider proxy information - recordException(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path)); - } + recordException(); } catch (RuntimeException e) { // Allow request-response exchange to continue, unaffected by metrics problem @@ -255,21 +282,21 @@ else if (msg instanceof ByteBuf) { protected abstract HttpServerMetricsRecorder recorder(); - protected void recordException(HttpServerOperations ops, String path) { + protected void recordException() { // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().incrementErrorsCount(ops.remoteSocketAddress(), path); + recorder().incrementErrorsCount(remoteSocketAddress, path); } - protected void recordRead(HttpServerOperations ops, String path, String method) { + protected void recordRead() { recorder().recordDataReceivedTime(path, method, Duration.ofNanos(System.nanoTime() - dataReceivedTime)); // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().recordDataReceived(ops.remoteSocketAddress(), path, dataReceived); + recorder().recordDataReceived(remoteSocketAddress, path, dataReceived); } - protected void recordWrite(HttpServerOperations ops, String path, String method, String status) { + protected void recordWrite(HttpServerOperations ops) { Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime); recorder().recordDataSentTime(path, method, status, dataSentTimeDuration); @@ -282,7 +309,7 @@ protected void recordWrite(HttpServerOperations ops, String path, String method, // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent); + recorder().recordDataSent(remoteSocketAddress, path, dataSent); } protected void recordActiveConnection(SocketAddress localAddress) { @@ -301,11 +328,11 @@ protected void recordClosedStream(SocketAddress localAddress) { recorder().recordStreamClosed(localAddress); } - protected void startRead(HttpServerOperations ops, String path, String method) { + protected void startRead(HttpServerOperations ops) { dataReceivedTime = System.nanoTime(); } - protected void startWrite(HttpServerOperations ops, String path, String method, String status) { + protected void startWrite(HttpServerOperations ops) { dataSentTime = System.nanoTime(); } @@ -331,6 +358,20 @@ void recordInactiveConnectionOrStream(Channel channel) { } } + void reset() { + // There is no need to reset 'channelActivated' and 'channelOpened' + contextView = null; + dataReceived = 0; + dataReceivedTime = 0; + dataSent = 0; + dataSentTime = 0; + initialized = false; + method = null; + path = null; + remoteSocketAddress = null; + status = null; + } + static final Set STANDARD_METHODS; static { Set standardMethods = new HashSet<>(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java index b3096afe6..6e1f3f5fc 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java @@ -50,26 +50,24 @@ protected ContextAwareHttpServerMetricsRecorder recorder() { } @Override - protected void recordException(HttpServerOperations ops, String path) { + protected void recordException() { // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().incrementErrorsCount(ops.currentContext(), ops.remoteSocketAddress(), path); + recorder().incrementErrorsCount(contextView, remoteSocketAddress, path); } @Override - protected void recordRead(HttpServerOperations ops, String path, String method) { - ContextView contextView = ops.currentContext(); + protected void recordRead() { recorder().recordDataReceivedTime(contextView, path, method, Duration.ofNanos(System.nanoTime() - dataReceivedTime)); // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().recordDataReceived(contextView, ops.remoteSocketAddress(), path, dataReceived); + recorder().recordDataReceived(contextView, remoteSocketAddress, path, dataReceived); } @Override - protected void recordWrite(HttpServerOperations ops, String path, String method, String status) { - ContextView contextView = ops.currentContext(); + protected void recordWrite(HttpServerOperations ops) { Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime); recorder().recordDataSentTime(contextView, path, method, status, dataSentTimeDuration); @@ -83,6 +81,6 @@ protected void recordWrite(HttpServerOperations ops, String path, String method, // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().recordDataSent(contextView, ops.remoteSocketAddress(), path, dataSent); + recorder().recordDataSent(contextView, remoteSocketAddress, path, dataSent); } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsHandler.java index 567599d0b..ec393f15d 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsHandler.java @@ -85,13 +85,13 @@ protected HttpServerMetricsRecorder recorder() { } @Override - protected void recordWrite(HttpServerOperations ops, String path, String method, String status) { + protected void recordWrite(HttpServerOperations ops) { Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime); recorder().recordDataSentTime(path, method, status, dataSentTimeDuration); // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info - recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent); + recorder().recordDataSent(remoteSocketAddress, path, dataSent); // Cannot invoke the recorder anymore: // 1. The recorder is one instance only, it is invoked for all requests that can happen @@ -108,8 +108,8 @@ protected void recordWrite(HttpServerOperations ops, String path, String method, } @Override - protected void startRead(HttpServerOperations ops, String path, String method) { - super.startRead(ops, path, method); + protected void startRead(HttpServerOperations ops) { + super.startRead(ops); responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, method, path, ops); responseTimeObservation = Observation.createNotStarted(this.responseTimeName, responseTimeHandlerContext, OBSERVATION_REGISTRY); @@ -119,8 +119,8 @@ protected void startRead(HttpServerOperations ops, String path, String method) { // response @Override - protected void startWrite(HttpServerOperations ops, String path, String method, String status) { - super.startWrite(ops, path, method, status); + protected void startWrite(HttpServerOperations ops) { + super.startWrite(ops); if (responseTimeObservation == null) { responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, method, path, ops); From 5e3918fc875229e9ea6d81ac847aa8d87f98005f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 29 Apr 2024 22:02:53 +0300 Subject: [PATCH 2/3] Address feedback --- .../server/AbstractHttpServerMetricsHandler.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 5f5ac5353..1c1b4e8e3 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -212,14 +212,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { ops = (HttpServerOperations) channelOps; - if (!initialized) { - method = methodTagValue.apply(ops.method().name()); - path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); - // Always take the remote address from the operations in order to consider proxy information - // Use remoteSocketAddress() in order to obtain UDS info - remoteSocketAddress = ops.remoteSocketAddress(); - initialized = true; - } + method = methodTagValue.apply(ops.method().name()); + path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); + // Always take the remote address from the operations in order to consider proxy information + // Use remoteSocketAddress() in order to obtain UDS info + remoteSocketAddress = ops.remoteSocketAddress(); + initialized = true; startRead(ops); } From 20e2e7972ec91d816b9e7cd2c409d34cc1beea2a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 30 Apr 2024 09:45:35 +0300 Subject: [PATCH 3/3] Address feedback --- .../http/server/AbstractHttpServerMetricsHandler.java | 9 +++++++-- .../server/ContextAwareHttpServerMetricsHandler.java | 5 +++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 1c1b4e8e3..fb9b41457 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -30,6 +30,7 @@ import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; import reactor.util.context.ContextView; import java.net.SocketAddress; @@ -163,7 +164,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) initialized = true; } if (contextView == null) { - contextView = ops.currentContext(); + contextView(ops); } status = ops.status().codeAsText().toString(); startWrite(ops); @@ -249,7 +250,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { if (ops != null) { // ContextView is available only when a subscription to the I/O Handler happens - contextView = ops.currentContext(); + contextView(ops); } } @@ -280,6 +281,10 @@ else if (msg instanceof ByteBuf) { protected abstract HttpServerMetricsRecorder recorder(); + protected void contextView(HttpServerOperations ops) { + this.contextView = Context.empty(); + } + protected void recordException() { // Always take the remote address from the operations in order to consider proxy information // Use remoteSocketAddress() in order to obtain UDS info diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java index 6e1f3f5fc..1c259147c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/ContextAwareHttpServerMetricsHandler.java @@ -49,6 +49,11 @@ protected ContextAwareHttpServerMetricsRecorder recorder() { return recorder; } + @Override + protected void contextView(HttpServerOperations ops) { + this.contextView = ops.currentContext(); + } + @Override protected void recordException() { // Always take the remote address from the operations in order to consider proxy information