From 22bc533a1b56f36e00bd1894fec8fb4d7bc31e2a Mon Sep 17 00:00:00 2001 From: Jaehong Kim Date: Tue, 7 Feb 2023 15:14:07 +0900 Subject: [PATCH] [#9702] Update reactor-netty plugin --- .../spring-webflux-plugin-testweb/pom.xml | 4 + .../plugin/SpringWebfluxPluginController.java | 46 ++++- .../plugin/reactor/netty/HttpCallContext.java | 87 +++++++++ .../netty/HttpCallContextAccessor.java | 23 +++ .../reactor/netty/ReactorNettyPlugin.java | 24 +++ .../AbstractHttpServerHandleInterceptor.java | 107 ++++++----- .../ChannelOperationsInterceptor.java | 36 ++++ ...HandlerObserverConstructorInterceptor.java | 63 +++++++ ...ndlerObserverOnStateChangeInterceptor.java | 174 ++++++++++++++++++ ...erverHandleHttpServerStateInterceptor.java | 41 ++--- .../HttpServerHandleStateInterceptor.java | 43 ++--- 11 files changed, 544 insertions(+), 104 deletions(-) create mode 100644 plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContext.java create mode 100644 plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContextAccessor.java create mode 100644 plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverConstructorInterceptor.java create mode 100644 plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverOnStateChangeInterceptor.java diff --git a/agent-testweb/spring-webflux-plugin-testweb/pom.xml b/agent-testweb/spring-webflux-plugin-testweb/pom.xml index 0446c53be872..b6eaace97eb6 100644 --- a/agent-testweb/spring-webflux-plugin-testweb/pom.xml +++ b/agent-testweb/spring-webflux-plugin-testweb/pom.xml @@ -31,6 +31,10 @@ spring-boot-autoconfigure ${spring.boot.version} + + com.navercorp.pinpoint + pinpoint-agent-testweb-commons + \ No newline at end of file diff --git a/agent-testweb/spring-webflux-plugin-testweb/src/main/java/com/pinpoint/test/plugin/SpringWebfluxPluginController.java b/agent-testweb/spring-webflux-plugin-testweb/src/main/java/com/pinpoint/test/plugin/SpringWebfluxPluginController.java index e5aa3cb0e8b9..b9bca65d1abb 100644 --- a/agent-testweb/spring-webflux-plugin-testweb/src/main/java/com/pinpoint/test/plugin/SpringWebfluxPluginController.java +++ b/agent-testweb/spring-webflux-plugin-testweb/src/main/java/com/pinpoint/test/plugin/SpringWebfluxPluginController.java @@ -16,6 +16,9 @@ package com.pinpoint.test.plugin; +import com.pinpoint.test.common.view.ApiLinkPage; +import com.pinpoint.test.common.view.HrefTag; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpMethod; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.bind.annotation.GetMapping; @@ -24,19 +27,60 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.method.HandlerMethod; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.result.method.RequestMappingInfo; +import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + @RestController public class SpringWebfluxPluginController { + + private final RequestMappingHandlerMapping handlerMapping; + + @Autowired + public SpringWebfluxPluginController(RequestMappingHandlerMapping handlerMapping) { + this.handlerMapping = handlerMapping; + } + + @GetMapping("/") + String welcome() { + Map handlerMethods = this.handlerMapping.getHandlerMethods(); + List list = new ArrayList<>(); + for (RequestMappingInfo info : handlerMethods.keySet()) { + for (String path : info.getDirectPaths()) { + list.add(HrefTag.of(path)); + } + } + list.sort(Comparator.comparing(HrefTag::getPath)); + return new ApiLinkPage("spring-webflux-plugin-testweb") + .addHrefTag(list) + .build(); + } + @GetMapping("/server/welcome/**") - public Mono welcome(ServerWebExchange exchange) { + public Mono serverWelcome(ServerWebExchange exchange) { exchange.getAttributes().put("pinpoint.metric.uri-template", "/test"); return Mono.just("Welcome Home"); } + @GetMapping("/server/wait/3s") + public Mono wait3(ServerWebExchange exchange) { + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + } + return Mono.just("Welcome Home"); + } + @PostMapping("/server/post") public Mono welcome(@RequestBody String body) { return Mono.just("Post=" + body); diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContext.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContext.java new file mode 100644 index 000000000000..50ee47915ac4 --- /dev/null +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContext.java @@ -0,0 +1,87 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.plugin.reactor.netty; + +public class HttpCallContext { + private long readBeginTime; + private long readEndTime; + private boolean readFail; + + private long writeBeginTime; + private long writeEndTime; + private boolean writeFail; + + public void setReadBeginTime(long readBeginTime) { + this.readBeginTime = readBeginTime; + } + + public void setReadEndTime(long readEndTime) { + this.readEndTime = readEndTime; + } + + public boolean isReadFail() { + return readFail; + } + + public void setReadFail(boolean readFail) { + this.readFail = readFail; + } + + public void setWriteBeginTime(long writeBeginTime) { + this.writeBeginTime = writeBeginTime; + } + + public void setWriteEndTime(long writeEndTime) { + this.writeEndTime = writeEndTime; + } + + public boolean isWriteFail() { + return writeFail; + } + + public void setWriteFail(boolean writeFail) { + this.writeFail = writeFail; + } + + public long getWriteElapsedTime() { + if (writeBeginTime == 0) { + return 0; + } + long result = writeEndTime - writeBeginTime; + return result > 0 ? result : 0; + } + + public long getReadElapsedTime() { + if (readBeginTime == 0) { + return 0; + } + long result = readEndTime - readBeginTime; + return result > 0 ? result : 0; + } + + @Override + public String toString() { + return "HttpCallContext{" + + "readBeginTime=" + readBeginTime + + ", readEndTime=" + readEndTime + + ", readFail=" + readFail + + ", writeBeginTime=" + writeBeginTime + + ", writeEndTime=" + writeEndTime + + ", writeFail=" + writeFail + + '}'; + } +} diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContextAccessor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContextAccessor.java new file mode 100644 index 000000000000..d9a5c8254279 --- /dev/null +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/HttpCallContextAccessor.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.plugin.reactor.netty; + +public interface HttpCallContextAccessor { + void _$PINPOINT$_setHttpCallContext(HttpCallContext httpCallContext); + + HttpCallContext _$PINPOINT$_getHttpCallContext(); +} diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java index 6425e2bffc12..b3e95ce3794e 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java @@ -50,6 +50,8 @@ import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundCompleteInterceptor; import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundErrorInterceptor; import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsSendInterceptor; +import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpIOHandlerObserverConstructorInterceptor; +import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpIOHandlerObserverOnStateChangeInterceptor; import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleHttpServerStateInterceptor; import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleStateInterceptor; import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpTcpClientConnectInterceptor; @@ -101,6 +103,8 @@ public void setup(ProfilerPluginSetupContext context) { transformTemplate.transform("reactor.netty.http.client.HttpClientConnect$HttpTcpClient", HttpTcpClientTransform.class); transformTemplate.transform("reactor.netty.http.client.HttpClientConnect$HttpClientHandler", HttpClientHandleTransform.class); transformTemplate.transform("reactor.netty.http.client.HttpClientOperations", HttpClientOperationsTransform.class); + transformTemplate.transform("reactor.netty.http.client.HttpClientConnect$MonoHttpConnect", FluxAndMonoTransform.class); + transformTemplate.transform("reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver", HttpIOHandlerObserverTransform.class); } transformTemplate.transform("reactor.netty.ByteBufFlux", FluxAndMonoOperatorTransform.class); @@ -257,6 +261,26 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + public static class HttpIOHandlerObserverTransform implements TransformCallback { + @Override + public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { + InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer); + target.addField(AsyncContextAccessor.class); + target.addField(HttpCallContextAccessor.class); + + final InstrumentMethod constructorMethod = target.getConstructor("reactor.core.publisher.MonoSink", "reactor.netty.http.client.HttpClientConnect$HttpClientHandler"); + if (constructorMethod != null) { + constructorMethod.addInterceptor(HttpIOHandlerObserverConstructorInterceptor.class); + } + final InstrumentMethod onStateChangeMethod = target.getDeclaredMethod("onStateChange", "reactor.netty.Connection", "reactor.netty.ConnectionObserver$State"); + if (onStateChangeMethod != null) { + onStateChangeMethod.addInterceptor(HttpIOHandlerObserverOnStateChangeInterceptor.class); + } + + return target.toBytecode(); + } + } + public static class FluxAndMonoTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/AbstractHttpServerHandleInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/AbstractHttpServerHandleInterceptor.java index c01aa5e3d202..a35d9fa9432b 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/AbstractHttpServerHandleInterceptor.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/AbstractHttpServerHandleInterceptor.java @@ -23,12 +23,14 @@ import com.navercorp.pinpoint.bootstrap.context.AsyncContextUtils; import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder; +import com.navercorp.pinpoint.bootstrap.context.SpanRecorder; import com.navercorp.pinpoint.bootstrap.context.Trace; import com.navercorp.pinpoint.bootstrap.context.TraceContext; import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor; import com.navercorp.pinpoint.bootstrap.logging.PLogger; import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory; import com.navercorp.pinpoint.bootstrap.plugin.RequestRecorderFactory; +import com.navercorp.pinpoint.bootstrap.plugin.http.HttpStatusCodeRecorder; import com.navercorp.pinpoint.bootstrap.plugin.request.RequestAdaptor; import com.navercorp.pinpoint.bootstrap.plugin.request.ServerCookieRecorder; import com.navercorp.pinpoint.bootstrap.plugin.request.ServerHeaderRecorder; @@ -37,6 +39,7 @@ import com.navercorp.pinpoint.bootstrap.plugin.request.util.ParameterRecorder; import com.navercorp.pinpoint.bootstrap.plugin.response.ServletResponseListener; import com.navercorp.pinpoint.bootstrap.plugin.response.ServletResponseListenerBuilder; +import com.navercorp.pinpoint.common.util.ArrayArgumentUtils; import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants; import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyPluginConfig; import io.netty.handler.codec.http.HttpResponseStatus; @@ -56,6 +59,10 @@ public abstract class AbstractHttpServerHandleInterceptor implements AroundInter private final ServletRequestListener servletRequestListener; private final ServletResponseListener servletResponseListener; + abstract boolean isReceived(Object[] args); + + abstract boolean isClosed(Object[] args); + public AbstractHttpServerHandleInterceptor(TraceContext traceContext, MethodDescriptor descriptor, RequestRecorderFactory requestRecorderFactory) { this.traceContext = traceContext; this.methodDescriptor = descriptor; @@ -89,53 +96,57 @@ public void before(Object target, Object[] args) { logger.beforeInterceptor(target, args); } - if (traceContext.currentRawTraceObject() != null) { - if (isDisconnecting(args)) { - final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(args, 0); - if (asyncContext != null) { - if (AsyncContextUtils.asyncStateFinish(asyncContext)) { - if (isDebug) { - logger.debug("Finished asyncState. asyncTraceId={}", asyncContext); - } - } - } + try { + if (isReceived(args)) { + received(args); + } else if (isClosed(args)) { + closed(args); + } + } catch (Throwable t) { + if (isInfo) { + logger.info("Failed to servlet request event handle.", t); } - // duplicate trace. + } + } + + private void received(final Object[] args) { + final HttpServerRequest request = ArrayArgumentUtils.getArgument(args, 0, HttpServerRequest.class); + if (request == null) { + return; + } + final HttpServerResponse response = ArrayArgumentUtils.getArgument(args, 0, HttpServerResponse.class); + if (response == null) { return; } - try { - if (Boolean.FALSE == isReceived(args)) { - // invalid args - return; - } + this.servletRequestListener.initialized(request, ReactorNettyConstants.REACTOR_NETTY_INTERNAL, this.methodDescriptor); + this.servletResponseListener.initialized(response, ReactorNettyConstants.REACTOR_NETTY_INTERNAL, this.methodDescriptor); //must after request listener due to trace block begin - final HttpServerRequest request = (HttpServerRequest) args[0]; - final HttpServerResponse response = (HttpServerResponse) args[0]; - this.servletRequestListener.initialized(request, ReactorNettyConstants.REACTOR_NETTY_INTERNAL, this.methodDescriptor); - this.servletResponseListener.initialized(response, ReactorNettyConstants.REACTOR_NETTY_INTERNAL, this.methodDescriptor); //must after request listener due to trace block begin + // Set end-point + final Trace trace = this.traceContext.currentTraceObject(); + if (trace == null) { + return; + } - // Set end-point - final Trace trace = this.traceContext.currentTraceObject(); - if (trace == null) { - return; + final SpanEventRecorder recorder = trace.currentSpanEventRecorder(); + if (recorder != null) { + // make asynchronous trace-id + final boolean asyncStateSupport = enableAsyncEndPoint; + final AsyncContext asyncContext = recorder.recordNextAsyncContext(asyncStateSupport); + ((AsyncContextAccessor) args[0])._$PINPOINT$_setAsyncContext(asyncContext); + if (isDebug) { + logger.debug("Set asyncContext to args[0]. asyncContext={}", asyncContext); } + } + } - final SpanEventRecorder recorder = trace.currentSpanEventRecorder(); - if (recorder != null) { - // make asynchronous trace-id - final boolean asyncStateSupport = enableAsyncEndPoint; - final AsyncContext asyncContext = recorder.recordNextAsyncContext(asyncStateSupport); - ((AsyncContextAccessor) args[0])._$PINPOINT$_setAsyncContext(asyncContext); - if (isDebug) { - logger.debug("Set asyncContext to args[0]. asyncContext={}", asyncContext); - } - } - } catch (Throwable t) { - if (isInfo) { - logger.info("Failed to servlet request event handle.", t); - } + private void closed(final Object[] args) { + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(args, 0); + if (asyncContext == null) { + return; } + // closed + AsyncContextUtils.asyncStateFinish(asyncContext); } @Override @@ -145,15 +156,9 @@ public void after(Object target, Object[] args, Object result, Throwable throwab } try { - if (Boolean.FALSE == isReceived(args)) { - return; + if (isReceived(args)) { + received(args, throwable); } - - final HttpServerRequest request = (HttpServerRequest) args[0]; - final HttpServerResponse response = (HttpServerResponse) args[0]; - final int statusCode = getStatusCode(response); - this.servletResponseListener.destroyed(response, throwable, statusCode); //must before request listener due to trace block ending - this.servletRequestListener.destroyed(request, throwable, statusCode); } catch (Throwable t) { if (isInfo) { logger.info("Failed to servlet request event handle.", t); @@ -161,9 +166,13 @@ public void after(Object target, Object[] args, Object result, Throwable throwab } } - abstract boolean isReceived(Object[] args); - - abstract boolean isDisconnecting(Object[] args); + private void received(Object[] args, Throwable throwable) { + final HttpServerRequest request = (HttpServerRequest) args[0]; + final HttpServerResponse response = (HttpServerResponse) args[0]; + final int statusCode = getStatusCode(response); + this.servletResponseListener.destroyed(response, throwable, statusCode); //must before request listener due to trace block ending + this.servletRequestListener.destroyed(request, throwable, statusCode); + } private int getStatusCode(final HttpServerResponse response) { try { diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/ChannelOperationsInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/ChannelOperationsInterceptor.java index 6c31de28935f..a1eaa3500d36 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/ChannelOperationsInterceptor.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/ChannelOperationsInterceptor.java @@ -16,20 +16,31 @@ package com.navercorp.pinpoint.plugin.reactor.netty.interceptor; +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils; +import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig; import com.navercorp.pinpoint.bootstrap.context.AsyncContext; import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder; +import com.navercorp.pinpoint.bootstrap.context.SpanRecorder; +import com.navercorp.pinpoint.bootstrap.context.Trace; import com.navercorp.pinpoint.bootstrap.context.TraceContext; import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventEndPointInterceptor; +import com.navercorp.pinpoint.bootstrap.plugin.http.HttpStatusCodeRecorder; import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants; +import io.netty.handler.codec.http.HttpResponseStatus; +import reactor.netty.http.server.HttpServerResponse; /** * @author jaehong.kim */ public class ChannelOperationsInterceptor extends AsyncContextSpanEventEndPointInterceptor { + private final HttpStatusCodeRecorder httpStatusCodeRecorder; + public ChannelOperationsInterceptor(TraceContext traceContext, MethodDescriptor descriptor) { super(traceContext, descriptor); + final ProfilerConfig config = traceContext.getProfilerConfig(); + this.httpStatusCodeRecorder = new HttpStatusCodeRecorder(config.getHttpStatusCodeErrors()); } @Override @@ -41,5 +52,30 @@ public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] a recorder.recordApi(methodDescriptor); recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_INTERNAL); recorder.recordException(throwable); + + if (target instanceof HttpServerResponse) { + final HttpServerResponse httpServerResponse = (HttpServerResponse) target; + final int statusCode = getStatusCode(httpServerResponse); + + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target); + if (asyncContext != null) { + final Trace trace = asyncContext.currentAsyncTraceObject(); + if (trace != null) { + final SpanRecorder spanRecorder = trace.getSpanRecorder(); + httpStatusCodeRecorder.record(spanRecorder, statusCode); + } + } + } + } + + private int getStatusCode(final HttpServerResponse response) { + try { + HttpResponseStatus status = response.status(); + if (status != null) { + return status.code(); + } + } catch (Exception ignored) { + } + return 0; } } diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverConstructorInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverConstructorInterceptor.java new file mode 100644 index 000000000000..b637b556f0dd --- /dev/null +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverConstructorInterceptor.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.plugin.reactor.netty.interceptor; + +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils; +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor; +import com.navercorp.pinpoint.bootstrap.logging.PLogger; +import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory; +import com.navercorp.pinpoint.plugin.reactor.netty.HttpCallContext; +import com.navercorp.pinpoint.plugin.reactor.netty.HttpCallContextAccessor; + +public class HttpIOHandlerObserverConstructorInterceptor implements AroundInterceptor { + private final PLogger logger = PLoggerFactory.getLogger(getClass()); + private final boolean isDebug = logger.isDebugEnabled(); + + public HttpIOHandlerObserverConstructorInterceptor() { + } + + @Override + public void before(Object target, Object[] args) { + } + + @Override + public void after(Object target, Object[] args, Object result, Throwable throwable) { + if (isDebug) { + logger.afterInterceptor(target, args, result, throwable); + } + + if (throwable != null) { + return; + } + + try { + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(args, 1); + if (asyncContext == null) { + return; + } + AsyncContextAccessorUtils.setAsyncContext(asyncContext, target); + if (target instanceof HttpCallContextAccessor) { + ((HttpCallContextAccessor) target)._$PINPOINT$_setHttpCallContext(new HttpCallContext()); + } + } catch (Throwable th) { + if (logger.isWarnEnabled()) { + logger.warn("AFTER error. Caused:{}", th.getMessage(), th); + } + } + } +} diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverOnStateChangeInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverOnStateChangeInterceptor.java new file mode 100644 index 000000000000..9170cebb0c8d --- /dev/null +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpIOHandlerObserverOnStateChangeInterceptor.java @@ -0,0 +1,174 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.plugin.reactor.netty.interceptor; + +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils; +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; +import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder; +import com.navercorp.pinpoint.bootstrap.context.TraceContext; +import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventSimpleAroundInterceptor; +import com.navercorp.pinpoint.common.trace.AnnotationKey; +import com.navercorp.pinpoint.common.util.ArrayArgumentUtils; +import com.navercorp.pinpoint.common.util.IntBooleanIntBooleanValue; +import com.navercorp.pinpoint.plugin.reactor.netty.HttpCallContext; +import com.navercorp.pinpoint.plugin.reactor.netty.HttpCallContextAccessor; +import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants; +import reactor.netty.ConnectionObserver; + +public class HttpIOHandlerObserverOnStateChangeInterceptor extends AsyncContextSpanEventSimpleAroundInterceptor { + // The request has been prepared and ready for I/O handler to be invoked + private static final String REQUEST_PREPARED = "[request_prepared]"; + // The request has been sent + private static final String REQUEST_SENT = "[request_sent]"; + // The request has been sent but the response has not been fully received and the connection has been prematurely closed + private static final String RESPONSE_INCOMPLETE = "[response_incomplete]"; + // The response status and headers have been received + private static final String RESPONSE_RECEIVED = "[response_received]"; + // The response fully received + private static final String RESPONSE_COMPLETED = "[response_completed]"; + // Propagated when a connection has been established and is available + private static final String CONNECTED = "[connected]"; + // Propagated when a connection is bound to a channelOperation and ready for user interaction + private static final String CONFIGURED = "[configured]"; + // Propagated when a connection has been reused / acquired (keep-alive or pooling) + private static final String ACQUIRED = "[acquired]"; + // Propagated when a connection has been released but not fully closed (keep-alive or pooling) + private static final String RELEASED = "[released]"; + // Propagated when a connection is being fully closed + private static final String DISCONNECTING = "[disconnecting]"; + + public HttpIOHandlerObserverOnStateChangeInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) { + super(traceContext, methodDescriptor); + } + + @Override + public AsyncContext getAsyncContext(Object target, Object[] args) { + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target); + if (asyncContext == null) { + return null; + } + // for compatibility. + final Object state = ArrayArgumentUtils.getArgument(args, 1, Object.class); + if (state == null) { + return null; + } + + final String rawState = state.toString(); + if (isReady(rawState) || isClosed(rawState)) { + return asyncContext; + } + + if (target instanceof HttpCallContextAccessor) { + final HttpCallContext callContext = ((HttpCallContextAccessor) target)._$PINPOINT$_getHttpCallContext(); + setupHttpCall(callContext, rawState); + } + + return null; + } + + @Override + public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) { + // for compatibility. + final Object state = ArrayArgumentUtils.getArgument(args, 1, Object.class); + if (state == null) { + return; + } + final String rawState = state.toString(); + if (isReady(rawState)) { + final String value = "READY " + rawState; + recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, value); + } else if (isClosed(rawState)) { + if (target instanceof HttpCallContextAccessor) { + final HttpCallContext httpCallContext = ((HttpCallContextAccessor) target)._$PINPOINT$_getHttpCallContext(); + final IntBooleanIntBooleanValue value = new IntBooleanIntBooleanValue((int) httpCallContext.getWriteElapsedTime(), httpCallContext.isWriteFail(), (int) httpCallContext.getReadElapsedTime(), httpCallContext.isReadFail()); + recorder.recordAttribute(AnnotationKey.HTTP_IO, value); + // Clear HttpCallContext + ((HttpCallContextAccessor) target)._$PINPOINT$_setHttpCallContext(null); + } + final String value = "CLOSED " + rawState; + recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, value); + } + } + + @Override + public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) { + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target); + if (asyncContext == null) { + return null; + } + + // for compatibility. + final Object state = ArrayArgumentUtils.getArgument(args, 1, Object.class); + if (state == null) { + return null; + } + + final String rawState = state.toString(); + if (isReady(rawState) || isClosed(rawState)) { + return asyncContext; + } + + return null; + } + + @Override + public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) { + recorder.recordException(throwable); + recorder.recordApi(methodDescriptor); + recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT_INTERNAL); + } + + boolean isReady(String state) { + if (state == null) { + return false; + } + if (state.equals(CONNECTED) || state.equals(ACQUIRED)) { + return true; + } + return false; + } + + boolean isClosed(String state) { + if (state == null) { + return false; + } + + if (state.equals(RELEASED) || state.equals(DISCONNECTING)) { + return true; + } + return false; + } + + void setupHttpCall(HttpCallContext httpCallContext, String state) { + if (httpCallContext == null || state == null) { + return; + } + + if (state.equals(REQUEST_PREPARED)) { + httpCallContext.setWriteBeginTime(System.currentTimeMillis()); + } else if (state.equals(REQUEST_SENT)) { + httpCallContext.setWriteEndTime(System.currentTimeMillis()); + } else if (state.equals(RESPONSE_RECEIVED)) { + httpCallContext.setReadBeginTime(System.currentTimeMillis()); + } else if (state.equals(RESPONSE_COMPLETED)) { + httpCallContext.setReadEndTime(System.currentTimeMillis()); + } else if (state.equals(RESPONSE_INCOMPLETE)) { + httpCallContext.setReadFail(Boolean.TRUE); + } + } +} diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleHttpServerStateInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleHttpServerStateInterceptor.java index 358c5142f34a..afb988923eab 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleHttpServerStateInterceptor.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleHttpServerStateInterceptor.java @@ -20,6 +20,7 @@ import com.navercorp.pinpoint.bootstrap.context.TraceContext; import com.navercorp.pinpoint.bootstrap.plugin.RequestRecorderFactory; +import com.navercorp.pinpoint.common.util.ArrayArgumentUtils; import com.navercorp.pinpoint.common.util.ArrayUtils; import reactor.netty.ConnectionObserver; import reactor.netty.http.server.HttpServerRequest; @@ -34,37 +35,27 @@ public HttpServerHandleHttpServerStateInterceptor(TraceContext traceContext, Met super(traceContext, descriptor, requestRecorderFactory); } - public boolean validate(Object[] args) { - if (ArrayUtils.getLength(args) < 2) { - return false; - } - - if (!(args[1] instanceof ConnectionObserver.State)) { - return false; - } - - return true; - } - public boolean isReceived(Object[] args) { - if (!validate(args)) { - return false; - } - ConnectionObserver.State state = (ConnectionObserver.State) args[1]; - if (state != HttpServerState.REQUEST_RECEIVED) { - return false; + final ConnectionObserver.State state = ArrayArgumentUtils.getArgument(args, 1, ConnectionObserver.State.class); + // The request was received + if (state != null && state == HttpServerState.REQUEST_RECEIVED) { + return true; } - return true; + return false; } - public boolean isDisconnecting(Object[] args) { - if (!validate(args)) { + public boolean isClosed(Object[] args) { + final ConnectionObserver.State state = ArrayArgumentUtils.getArgument(args, 1, ConnectionObserver.State.class); + if (state == null) { return false; } - ConnectionObserver.State state = (ConnectionObserver.State) args[1]; - if (state != HttpServerState.DISCONNECTING) { - return false; + + // ACQUIRED: Propagated when a connection has been reused / acquired (keep-alive or pooling) + // RELEASED: Propagated when a connection has been released but not fully closed (keep-alive or pooling) + // DISCONNECTING: Propagated when a connection is being fully closed + if (state == HttpServerState.DISCONNECTING || state == HttpServerState.ACQUIRED || state == HttpServerState.RELEASED) { + return true; } - return true; + return false; } } \ No newline at end of file diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleStateInterceptor.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleStateInterceptor.java index ba32858d7733..29acf3bc3ed1 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleStateInterceptor.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/interceptor/HttpServerHandleStateInterceptor.java @@ -20,6 +20,7 @@ import com.navercorp.pinpoint.bootstrap.context.TraceContext; import com.navercorp.pinpoint.bootstrap.plugin.RequestRecorderFactory; +import com.navercorp.pinpoint.common.util.ArrayArgumentUtils; import com.navercorp.pinpoint.common.util.ArrayUtils; import reactor.netty.ConnectionObserver; import reactor.netty.http.server.HttpServerRequest; @@ -33,41 +34,25 @@ public HttpServerHandleStateInterceptor(TraceContext traceContext, MethodDescrip super(traceContext, descriptor, requestRecorderFactory); } - public boolean validate(Object[] args) { - if (ArrayUtils.getLength(args) < 2) { - return false; - } - - if (!(args[1] instanceof ConnectionObserver.State)) { - return false; - } - - return true; - } - public boolean isReceived(Object[] args) { - if (!validate(args)) { - return false; - } - - ConnectionObserver.State state = (ConnectionObserver.State) args[1]; - if (state != ConnectionObserver.State.CONFIGURED) { - return false; + final ConnectionObserver.State state = ArrayArgumentUtils.getArgument(args, 1, ConnectionObserver.State.class); + if (state != null && state == ConnectionObserver.State.CONFIGURED) { + return true; } - - return true; + return false; } - public boolean isDisconnecting(Object[] args) { - if (!validate(args)) { + public boolean isClosed(Object[] args) { + final ConnectionObserver.State state = ArrayArgumentUtils.getArgument(args, 1, ConnectionObserver.State.class); + if (state == null) { return false; } - - ConnectionObserver.State state = (ConnectionObserver.State) args[1]; - if (state != ConnectionObserver.State.DISCONNECTING) { - return false; + // ACQUIRED: Propagated when a connection has been reused / acquired (keep-alive or pooling) + // RELEASED: Propagated when a connection has been released but not fully closed (keep-alive or pooling) + // DISCONNECTING: Propagated when a connection is being fully closed + if (state == ConnectionObserver.State.DISCONNECTING || state == ConnectionObserver.State.ACQUIRED || state == ConnectionObserver.State.RELEASED) { + return true; } - - return true; + return false; } } \ No newline at end of file