Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable HttpClient/HttpServer metrics when protocol is H2/H2C #2066

Merged
merged 8 commits into from
Mar 9, 2022
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,18 @@ protected AbstractHttpClientMetricsHandler(@Nullable Function<String, String> ur
this.uriTagValue = uriTagValue;
}

protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy) {
this.contextView = copy.contextView;
this.dataReceived = copy.dataReceived;
this.dataReceivedTime = copy.dataReceivedTime;
this.dataSent = copy.dataSent;
this.dataSentTime = copy.dataSentTime;
this.method = copy.method;
this.path = copy.path;
this.status = copy.status;
this.uriTagValue = copy.uriTagValue;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,11 @@ final class ContextAwareHttpClientMetricsHandler extends AbstractHttpClientMetri
this.recorder = recorder;
}

ContextAwareHttpClientMetricsHandler(ContextAwareHttpClientMetricsHandler copy) {
super(copy);
this.recorder = copy.recorder;
}

@Override
protected ContextAwareHttpClientMetricsRecorder recorder() {
return recorder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
Expand Down Expand Up @@ -99,9 +100,15 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
MonoSink<Connection> sink) {
boolean acceptGzip = config instanceof HttpClientConfig && ((HttpClientConfig) config).acceptGzip;
boolean acceptGzip = false;
ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null;
Function<String, String> uriTagValue = null;
if (config instanceof HttpClientConfig) {
acceptGzip = ((HttpClientConfig) config).acceptGzip;
uriTagValue = ((HttpClientConfig) config).uriTagValue;
}
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
acceptGzip, pendingAcquireTimeout, pool, sink);
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, uriTagValue);
}

@Override
Expand Down Expand Up @@ -194,10 +201,12 @@ static final class DisposableAcquire
final ConnectionObserver obs;
final ChannelOperations.OnSetup opsFactory;
final boolean acceptGzip;
final ChannelMetricsRecorder metricsRecorder;
final long pendingAcquireTimeout;
final InstrumentedPool<Connection> pool;
final boolean retried;
final MonoSink<Connection> sink;
final Function<String, String> uriTagValue;

PooledRef<Connection> pooledRef;
Subscription subscription;
Expand All @@ -206,28 +215,34 @@ static final class DisposableAcquire
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
MonoSink<Connection> sink) {
MonoSink<Connection> sink,
@Nullable Function<String, String> uriTagValue) {
this.cancellations = Disposables.composite();
this.obs = obs;
this.opsFactory = opsFactory;
this.acceptGzip = acceptGzip;
this.metricsRecorder = metricsRecorder;
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.pool = pool;
this.retried = false;
this.sink = sink;
this.uriTagValue = uriTagValue;
}

DisposableAcquire(DisposableAcquire parent) {
this.cancellations = parent.cancellations;
this.obs = parent.obs;
this.opsFactory = parent.opsFactory;
this.acceptGzip = parent.acceptGzip;
this.metricsRecorder = parent.metricsRecorder;
this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
this.pool = parent.pool;
this.retried = true;
this.sink = parent.sink;
this.uriTagValue = parent.uriTagValue;
}

@Override
Expand Down Expand Up @@ -280,7 +295,7 @@ else if (p.state != null) {
return;
}

HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip)
HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue)
.addListener(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
Expand Down Expand Up @@ -553,7 +554,7 @@ static void configureHttp11OrH2CleartextPipeline(
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();

Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip));
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

Expand Down Expand Up @@ -607,11 +608,17 @@ static void configureHttp11Pipeline(ChannelPipeline p,
}
}

static Future<Http2StreamChannel> openStream(Channel channel, Http2ConnectionProvider.DisposableAcquire owner,
ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, boolean acceptGzip) {
static Future<Http2StreamChannel> openStream(
Channel channel,
Http2ConnectionProvider.DisposableAcquire owner,
ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel);
bootstrap.option(ChannelOption.AUTO_READ, false);
bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip));
bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
return bootstrap.open();
}

Expand Down Expand Up @@ -648,12 +655,21 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {

final boolean acceptGzip;
final Http2FrameCodec http2FrameCodec;
final ChannelMetricsRecorder metricsRecorder;
final ChannelOperations.OnSetup opsFactory;
final Function<String, String> uriTagValue;

H2CleartextCodec(Http2FrameCodec http2FrameCodec, ChannelOperations.OnSetup opsFactory, boolean acceptGzip) {
H2CleartextCodec(
Http2FrameCodec http2FrameCodec,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
this.acceptGzip = acceptGzip;
this.http2FrameCodec = http2FrameCodec;
this.metricsRecorder = metricsRecorder;
this.opsFactory = opsFactory;
this.uriTagValue = uriTagValue;
}

@Override
Expand All @@ -672,11 +688,12 @@ public void handlerAdded(ChannelHandlerContext ctx) {
if (responseTimeoutHandler != null) {
pipeline.remove(NettyPipeline.ResponseTimeoutHandler);
http2MultiplexHandler = new Http2MultiplexHandler(new H2Codec(opsFactory, acceptGzip),
new H2Codec(owner, obs, opsFactory, acceptGzip, responseTimeoutHandler.getReaderIdleTimeInMillis()));
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder,
responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue));
}
else {
http2MultiplexHandler = new Http2MultiplexHandler(new H2Codec(opsFactory, acceptGzip),
new H2Codec(owner, obs, opsFactory, acceptGzip));
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
}
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler);
Expand All @@ -690,51 +707,59 @@ public void handlerAdded(ChannelHandlerContext ctx) {

static final class H2Codec extends ChannelInitializer<Channel> {
final boolean acceptGzip;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
final ChannelOperations.OnSetup opsFactory;
final Http2ConnectionProvider.DisposableAcquire owner;
final long responseTimeoutMillis;
final Function<String, String> uriTagValue;

H2Codec(boolean acceptGzip) {
// Handle inbound streams (server pushes)
// TODO this is not supported
this(null, null, null, acceptGzip, -1);
this(null, null, null, acceptGzip, null, -1, null);
}

H2Codec(@Nullable ChannelOperations.OnSetup opsFactory, boolean acceptGzip) {
// Handle inbound streams (server pushes)
// TODO this is not supported
this(null, null, opsFactory, acceptGzip, -1);
this(null, null, opsFactory, acceptGzip, null, -1, null);
}

H2Codec(
@Nullable Http2ConnectionProvider.DisposableAcquire owner,
@Nullable ConnectionObserver observer,
@Nullable ChannelOperations.OnSetup opsFactory,
boolean acceptGzip) {
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this(owner, observer, opsFactory, acceptGzip, -1);
this(owner, observer, opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue);
}

H2Codec(
@Nullable Http2ConnectionProvider.DisposableAcquire owner,
@Nullable ConnectionObserver observer,
@Nullable ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
long responseTimeoutMillis) {
@Nullable ChannelMetricsRecorder metricsRecorder,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this.acceptGzip = acceptGzip;
this.metricsRecorder = metricsRecorder;
this.observer = observer;
this.opsFactory = opsFactory;
this.owner = owner;
this.responseTimeoutMillis = responseTimeoutMillis;
this.uriTagValue = uriTagValue;
}

@Override
protected void initChannel(Channel ch) {
if (observer != null && opsFactory != null && owner != null) {
Http2ConnectionProvider.registerClose(ch, owner);
addStreamHandlers(ch, observer.then(StreamConnectionObserver.INSTANCE), opsFactory);
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory);
}
else {
// Handle server pushes (inbound streams)
Expand All @@ -753,6 +778,27 @@ void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnS

ChannelOperations.addReactiveBridge(ch, opsFactory, obs);

if (metricsRecorder != null) {
if (metricsRecorder instanceof HttpClientMetricsRecorder) {
ChannelHandler handler;
Channel parent = ch.parent();
ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler);
if (existingHandler != null) {
// This use case can happen only in HTTP/2 clear text connection upgrade
parent.pipeline().remove(NettyPipeline.HttpMetricsHandler);
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) :
new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler);
}
else {
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) :
new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue);
}
pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
}
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
Expand Down Expand Up @@ -937,7 +983,16 @@ public void onUncaughtException(Connection connection, Throwable error) {

static final class StreamConnectionObserver implements ConnectionObserver {

static final StreamConnectionObserver INSTANCE = new StreamConnectionObserver();
final Context context;

StreamConnectionObserver(Context context) {
this.context = context;
}

@Override
public Context currentContext() {
return context;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,11 @@ final class HttpClientMetricsHandler extends AbstractHttpClientMetricsHandler {
this.recorder = recorder;
}

HttpClientMetricsHandler(HttpClientMetricsHandler copy) {
super(copy);
this.recorder = copy.recorder;
}

@Override
protected HttpClientMetricsRecorder recorder() {
return recorder;
Expand Down