Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 13, 2023
1 parent 780e487 commit aebdd91
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public void channelActive(ChannelHandlerContext ctx) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
Expand All @@ -78,10 +78,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
Expand All @@ -97,9 +97,9 @@ public void channelRegistered(ChannelHandlerContext ctx) {
}
if (ctx.pipeline().get(NettyPipeline.SslHandler) != null) {
ctx.pipeline()
.addBefore(NettyPipeline.SslHandler,
NettyPipeline.TlsMetricsHandler,
tlsMetricsHandler());
.addBefore(NettyPipeline.SslHandler,
NettyPipeline.TlsMetricsHandler,
tlsMetricsHandler());
}

ctx.fireChannelRegistered();
Expand All @@ -123,10 +123,10 @@ else if (msg instanceof DatagramPacket) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
Expand All @@ -151,10 +151,10 @@ else if (msg instanceof DatagramPacket) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

//"FutureReturnValueIgnored" this is deliberate
Expand All @@ -167,10 +167,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireExceptionCaught(cause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,12 +91,13 @@ static class TlsMetricsHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
long tlsHandshakeTimeStart = System.nanoTime();
ctx.pipeline().get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
recordTlsHandshakeTime(ctx, tlsHandshakeTimeStart, f.isSuccess() ? SUCCESS : ERROR);
});
ctx.pipeline()
.get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
recordTlsHandshakeTime(ctx, tlsHandshakeTimeStart, f.isSuccess() ? SUCCESS : ERROR);
});
ctx.fireChannelActive();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME;
import static reactor.netty.Metrics.URI;
import static reactor.netty.Metrics.formatSocketAddress;

/**
* A {@link ChannelMetricsRecorder} implementation for integration with Micrometer.
Expand Down Expand Up @@ -83,7 +84,7 @@ public MicrometerChannelMetricsRecorder(String name, String protocol) {

@Override
public void recordDataReceived(SocketAddress remoteAddress, long bytes) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, address,
key -> filter(DistributionSummary.builder(name + DATA_RECEIVED)
.baseUnit(BYTES_UNIT)
Expand All @@ -97,7 +98,7 @@ public void recordDataReceived(SocketAddress remoteAddress, long bytes) {

@Override
public void recordDataSent(SocketAddress remoteAddress, long bytes) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, address,
key -> filter(DistributionSummary.builder(name + DATA_SENT)
.baseUnit(BYTES_UNIT)
Expand All @@ -111,7 +112,7 @@ public void recordDataSent(SocketAddress remoteAddress, long bytes) {

@Override
public void incrementErrorsCount(SocketAddress remoteAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
Counter c = MapUtils.computeIfAbsent(errorsCache, address,
key -> filter(Counter.builder(name + ERRORS)
.description(ERRORS_DESCRIPTION)
Expand All @@ -124,7 +125,7 @@ public void incrementErrorsCount(SocketAddress remoteAddress) {

@Override
public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(null, address, null, status);
Timer timer = MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey,
key -> filter(Timer.builder(name + TLS_HANDSHAKE_TIME)
Expand All @@ -138,7 +139,7 @@ public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, S

@Override
public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(null, address, null, status);
Timer timer = MapUtils.computeIfAbsent(connectTimeCache, meterKey,
key -> filter(Timer.builder(name + CONNECT_TIME)
Expand All @@ -152,7 +153,7 @@ public void recordConnectTime(SocketAddress remoteAddress, Duration time, String

@Override
public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(null, address, null, status);
Timer timer = MapUtils.computeIfAbsent(addressResolverTimeCache, meterKey,
key -> filter(Timer.builder(name + ADDRESS_RESOLVER)
Expand Down Expand Up @@ -200,14 +201,15 @@ protected String protocol() {

@Nullable
LongAdder getTotalConnectionsAdder(SocketAddress serverAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(serverAddress);
String address = formatSocketAddress(serverAddress);
return MapUtils.computeIfAbsent(totalConnectionsCache, address,
key -> {
LongAdder totalConnectionsAdder = new LongAdder();
Gauge gauge = filter(Gauge.builder(name + CONNECTIONS_TOTAL, totalConnectionsAdder, LongAdder::longValue)
.description(TOTAL_CONNECTIONS_DESCRIPTION)
.tags(URI, protocol, LOCAL_ADDRESS, address)
.register(REGISTRY));
Gauge gauge = filter(
Gauge.builder(name + CONNECTIONS_TOTAL, totalConnectionsAdder, LongAdder::longValue)
.description(TOTAL_CONNECTIONS_DESCRIPTION)
.tags(URI, protocol, LOCAL_ADDRESS, address)
.register(REGISTRY));
return gauge != null ? totalConnectionsAdder : null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
});
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
Expand All @@ -138,7 +138,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof LastHttpContent) {
// Detect if we have received an early response before the request has been fully flushed.
// In this case, invoke recordwrite now (because next we will reset all class fields).
// In this case, invoke #recordWrite now (because next we will reset all class fields).
lastReadSeq = (lastReadSeq + 1) & 0x7F_FF_FF_FF;
if ((lastReadSeq > lastWriteSeq) || (lastReadSeq == 0 && lastWriteSeq == Integer.MAX_VALUE)) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
Expand All @@ -149,10 +149,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
ctx.fireChannelRead(msg);
}
Expand All @@ -163,10 +163,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx);
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
ctx.fireExceptionCaught(cause);
}
Expand Down Expand Up @@ -241,6 +241,6 @@ private void reset() {
dataSent = 0;
dataReceivedTime = 0;
dataSentTime = 0;
// don't reset lastWriteSeq and lastReadSeq, which must be incremented for ever
// don't reset lastWriteSeq and lastReadSeq, which must be incremented forever
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,8 +59,7 @@ protected void recordException(ChannelHandlerContext ctx) {
@Override
protected void recordWrite(SocketAddress address) {
if (contextView != null) {
recorder.recordDataSentTime(contextView, address,
path, method,
recorder.recordDataSentTime(contextView, address, path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataSent(contextView, address, path, dataSent);
Expand All @@ -73,12 +72,10 @@ protected void recordWrite(SocketAddress address) {
@Override
protected void recordRead(SocketAddress address) {
if (contextView != null) {
recorder.recordDataReceivedTime(contextView, address,
path, method, status,
recorder.recordDataReceivedTime(contextView, address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder.recordResponseTime(contextView, address,
path, method, status,
recorder.recordResponseTime(contextView, address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataReceived(contextView, address, path, dataReceived);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import reactor.netty.Metrics;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;

import java.net.SocketAddress;
Expand All @@ -32,6 +31,7 @@
import static reactor.netty.Metrics.PENDING_STREAMS;
import static reactor.netty.Metrics.REGISTRY;
import static reactor.netty.Metrics.REMOTE_ADDRESS;
import static reactor.netty.Metrics.formatSocketAddress;

final class MicrometerHttp2ConnectionProviderMeterRegistrar {
static final String ACTIVE_CONNECTIONS_DESCRIPTION =
Expand All @@ -48,7 +48,7 @@ private MicrometerHttp2ConnectionProviderMeterRegistrar() {
}

void registerMetrics(String poolName, String id, SocketAddress remoteAddress, InstrumentedPool.PoolMetrics metrics) {
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
String addressAsString = formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName);

Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::acquiredSize)
Expand All @@ -73,7 +73,7 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
}

void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
String addressAsString = formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName);

REGISTRY.remove(new Meter.Id(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, tags, null, null, Meter.Type.GAUGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package reactor.netty.http.client;

import io.micrometer.core.instrument.Timer;
import reactor.netty.Metrics;
import reactor.netty.channel.MeterKey;
import reactor.netty.http.MicrometerHttpMetricsRecorder;
import reactor.netty.internal.util.MapUtils;
Expand All @@ -33,6 +32,7 @@
import static reactor.netty.Metrics.RESPONSE_TIME;
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.URI;
import static reactor.netty.Metrics.formatSocketAddress;

/**
* @author Violeta Georgieva
Expand All @@ -48,7 +48,7 @@ private MicrometerHttpClientMetricsRecorder() {

@Override
public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) {
String address = Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(uri, address, method, status);
Timer dataReceivedTime = MapUtils.computeIfAbsent(dataReceivedTimeCache, meterKey,
key -> filter(Timer.builder(name() + DATA_RECEIVED_TIME)
Expand All @@ -62,7 +62,7 @@ public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, Stri

@Override
public void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time) {
String address = Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(uri, address, method, null);
Timer dataSentTime = MapUtils.computeIfAbsent(dataSentTimeCache, meterKey,
key -> filter(Timer.builder(name() + DATA_SENT_TIME)
Expand All @@ -76,7 +76,7 @@ public void recordDataSentTime(SocketAddress remoteAddress, String uri, String m

@Override
public void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) {
String address = Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
MeterKey meterKey = new MeterKey(uri, address, method, status);
Timer responseTime = MapUtils.computeIfAbsent(responseTimeCache, meterKey,
key -> filter(Timer.builder(name() + RESPONSE_TIME)
Expand Down

0 comments on commit aebdd91

Please sign in to comment.