Skip to content

Commit

Permalink
fix #926 Ensure URI tag used for micrometer does not contain the query
Browse files Browse the repository at this point in the history
Use "path" instead of "uri".
Calculate "path" only once.
  • Loading branch information
violetagg committed Dec 10, 2019
1 parent 5949bf7 commit dc3fb4b
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 86 deletions.
19 changes: 2 additions & 17 deletions src/main/java/reactor/netty/http/HttpInfos.java
Expand Up @@ -15,7 +15,6 @@
*/
package reactor.netty.http;

import java.net.URI;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -64,23 +63,9 @@ public interface HttpInfos {
/**
* Returns a normalized {@link #uri()} without the leading and trailing '/' if present
*
* @return a normalized {@link #uri()} without the leading and trailing
* @return a normalized {@link #uri()} without the leading and trailing '/' if present
*/
default String path() {
String uri = URI.create(uri()).getPath();
if (!uri.isEmpty()) {
if(uri.charAt(0) == '/'){
uri = uri.substring(1);
if(uri.length() <= 1){
return uri;
}
}
if(uri.charAt(uri.length() - 1) == '/'){
return uri.substring(0, uri.length() - 1);
}
}
return uri;
}
String path();

/**
* Returns the resolved target address
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/reactor/netty/http/HttpOperations.java
Expand Up @@ -16,6 +16,7 @@

package reactor.netty.http;

import java.net.URI;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -290,6 +291,27 @@ protected final boolean markSentHeaderAndBody() {
return HTTP_STATE.compareAndSet(this, READY, BODY_SENT);
}

/**
* Returns a normalized uri without the leading and trailing '/' if present
*
* @return a normalized uri without the leading and trailing '/' if present
*/
public static String resolvePath(String uri) {
String path = URI.create(uri).getPath();
if (!path.isEmpty()) {
if(path.charAt(0) == '/'){
path = path.substring(1);
if(path.length() <= 1){
return path;
}
}
if(path.charAt(path.length() - 1) == '/'){
return path.substring(0, path.length() - 1);
}
}
return path;
}

/**
* Outbound Netty HttpMessage
*
Expand Down
Expand Up @@ -80,6 +80,7 @@
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.channel.MicrometerChannelMetricsRecorder;
import reactor.netty.http.HttpOperations;
import reactor.netty.http.HttpResources;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.InetSocketAddressUtil;
Expand Down Expand Up @@ -532,6 +533,8 @@ Publisher<Void> requestWithBody(HttpClientOperations ch) {
.setProtocolVersion(HttpVersion.HTTP_1_1)
.headers();

ch.path = HttpOperations.resolvePath(ch.uri());

if (defaultHeaders != null) {
headers.set(defaultHeaders);
}
Expand Down
Expand Up @@ -36,6 +36,7 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.http.Cookies;
import reactor.netty.http.HttpOperations;
import reactor.netty.tcp.ProxyProvider;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;
Expand Down Expand Up @@ -69,6 +70,7 @@ final static class PreparingHttpClientRequest implements HttpClientRequest {
final HttpHeaders headers;
final HttpMethod method;
final String uri;
final String path;
final Context context;
final ClientCookieDecoder cookieDecoder;
final boolean isWebsocket;
Expand All @@ -78,6 +80,7 @@ final static class PreparingHttpClientRequest implements HttpClientRequest {
this.headers = c.headers;
this.cookieDecoder = c.cookieDecoder;
this.uri = c.uri;
this.path = HttpOperations.resolvePath(this.uri);
this.method = c.method;
this.isWebsocket = c.websocketSubprotocols != null;
}
Expand Down Expand Up @@ -163,6 +166,11 @@ public String uri() {
return uri;
}

@Override
public String path() {
return path;
}

@Override
public HttpVersion version() {
return HttpVersion.HTTP_1_1;
Expand Down
Expand Up @@ -33,9 +33,11 @@
*/
final class HttpClientMetricsHandler extends ChannelDuplexHandler {

HttpRequest request;
String path;

HttpResponse response;
String method;

String status;


long dataReceived;
Expand All @@ -58,7 +60,12 @@ final class HttpClientMetricsHandler extends ChannelDuplexHandler {
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
ChannelOperations<?,?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpClientOperations) {
HttpClientOperations ops = (HttpClientOperations) channelOps;
path = "/" + ops.path;
method = ops.method().name();
}

dataSentTime = System.nanoTime();
}
Expand All @@ -75,11 +82,10 @@ else if (msg instanceof ByteBuf) {
SocketAddress address = ctx.channel().remoteAddress();

recorder.recordDataSentTime(address,
request.uri(),
request.method().name(),
path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataSent(address, request.uri(), dataSent);
recorder.recordDataSent(address, path, dataSent);
});
}
//"FutureReturnValueIgnored" this is deliberate
Expand All @@ -89,7 +95,8 @@ else if (msg instanceof ByteBuf) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse) {
response = (HttpResponse) msg;
HttpResponse response = (HttpResponse) msg;
status = response.status().codeAsText().toString();

dataReceivedTime = System.nanoTime();
}
Expand All @@ -104,18 +111,14 @@ else if (msg instanceof ByteBuf) {
if (msg instanceof LastHttpContent) {
SocketAddress address = ctx.channel().remoteAddress();
recorder.recordDataReceivedTime(address,
request.uri(),
request.method().name(),
response.status().codeAsText().toString(),
path, method, status,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder.recordResponseTime(address,
request.uri(),
request.method().name(),
response.status().codeAsText().toString(),
path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataReceived(address, request.uri(), dataReceived);
recorder.recordDataReceived(address, path, dataReceived);
reset();
}

Expand All @@ -125,7 +128,7 @@ else if (msg instanceof ByteBuf) {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recorder.incrementErrorsCount(ctx.channel().remoteAddress(),
request != null ? request.uri() : resolveUri(ctx));
path != null ? path : resolveUri(ctx));

ctx.fireExceptionCaught(cause);
}
Expand All @@ -141,8 +144,9 @@ private String resolveUri(ChannelHandlerContext ctx) {
}

private void reset() {
request = null;
response = null;
path = null;
method = null;
status = null;
dataReceived = 0;
dataSent = 0;
dataReceivedTime = 0;
Expand Down
Expand Up @@ -98,6 +98,7 @@ class HttpClientOperations extends HttpOperations<NettyInbound, NettyOutbound>

Supplier<String>[] redirectedFrom = EMPTY_REDIRECTIONS;
String resourceUrl;
String path;

volatile ResponseState responseState;

Expand Down Expand Up @@ -427,6 +428,11 @@ public final String uri() {
return this.nettyRequest.uri();
}

@Override
public final String path() {
return this.path;
}

@Override
public String resourceUrl() {
return resourceUrl;
Expand Down
Expand Up @@ -42,10 +42,6 @@ final class HttpServerMetricsHandler extends ChannelDuplexHandler {

long dataSentTime;

String uri;

String method;


final HttpServerMetricsRecorder recorder;

Expand Down Expand Up @@ -78,28 +74,25 @@ else if (msg instanceof ByteBuf) {
ChannelOperations<?,?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
String path = "/" + ops.path;
String method = ops.method().name();
String status = ops.status().codeAsText().toString();
recorder.recordDataSentTime(
ops.uri(),
ops.method().name(),
ops.status().codeAsText().toString(),
path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));

if (dataReceivedTime != 0) {
recorder.recordResponseTime(
ops.uri(),
ops.method().name(),
ops.status().codeAsText().toString(),
path, method, status,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));
}
else {
recorder.recordResponseTime(
ops.uri(),
ops.method().name(),
ops.status().codeAsText().toString(),
path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));
}

recorder.recordDataSent(ops.remoteAddress(), ops.uri(), dataSent);
recorder.recordDataSent(ops.remoteAddress(), path, dataSent);

dataSent = 0;
}
Expand All @@ -114,10 +107,6 @@ else if (msg instanceof ByteBuf) {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
dataReceivedTime = System.nanoTime();
HttpRequest request = (HttpRequest) msg;
uri = request.uri();
method = request.method()
.name();
}

if (msg instanceof ByteBufHolder) {
Expand All @@ -128,9 +117,15 @@ else if (msg instanceof ByteBuf) {
}

if (msg instanceof LastHttpContent) {
recorder.recordDataReceivedTime(uri, method, Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder.recordDataReceived(ctx.channel().remoteAddress(), uri, dataReceived);
ChannelOperations<?,?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
String path = "/" + ops.path;
String method = ops.method().name();
recorder.recordDataReceivedTime(path, method, Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder.recordDataReceived(ctx.channel().remoteAddress(), path, dataReceived);
}

dataReceived = 0;
}
Expand All @@ -140,8 +135,10 @@ else if (msg instanceof ByteBuf) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (uri != null) {
recorder.incrementErrorsCount(ctx.channel().remoteAddress(), uri);
ChannelOperations<?,?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
recorder.incrementErrorsCount(ctx.channel().remoteAddress(), "/" + ops.path);
}

ctx.fireExceptionCaught(cause);
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/reactor/netty/http/server/HttpServerOperations.java
Expand Up @@ -93,6 +93,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
final HttpHeaders responseHeaders;
final Cookies cookieHolder;
final HttpRequest nettyRequest;
final String path;
final ConnectionInfo connectionInfo;
final ServerCookieEncoder cookieEncoder;
final ServerCookieDecoder cookieDecoder;
Expand All @@ -109,6 +110,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.nettyResponse = replaced.nettyResponse;
this.paramsResolver = replaced.paramsResolver;
this.nettyRequest = replaced.nettyRequest;
this.path = replaced.path;
this.compressionPredicate = replaced.compressionPredicate;
this.cookieEncoder = replaced.cookieEncoder;
this.cookieDecoder = replaced.cookieDecoder;
Expand All @@ -123,6 +125,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
ServerCookieDecoder decoder) {
super(c, listener);
this.nettyRequest = nettyRequest;
this.path = resolvePath(nettyRequest.uri());
this.nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
this.responseHeaders = nettyResponse.headers();
this.responseHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
Expand Down Expand Up @@ -407,6 +410,14 @@ public String uri() {
throw new IllegalStateException("request not parsed");
}

@Override
public String path() {
if (path != null) {
return path;
}
throw new IllegalStateException("request not parsed");
}

@Override
public HttpVersion version() {
if (nettyRequest != null) {
Expand Down

0 comments on commit dc3fb4b

Please sign in to comment.