Skip to content

Commit

Permalink
[#2584] Apply AsynContext to vert.x plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 6, 2017
1 parent e26cb37 commit 5f20342
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 81 deletions.
Expand Up @@ -15,7 +15,7 @@
*/
package com.navercorp.pinpoint.plugin.vertx;

import com.navercorp.pinpoint.bootstrap.async.AsyncTraceIdAccessor;
import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.instrument.InstrumentClass;
import com.navercorp.pinpoint.bootstrap.instrument.InstrumentException;
import com.navercorp.pinpoint.bootstrap.instrument.InstrumentMethod;
Expand Down Expand Up @@ -114,7 +114,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
return null;
}

target.addField(AsyncTraceIdAccessor.class.getName());
target.addField(AsyncContextAccessor.class.getName());
final InstrumentMethod handleMethod = target.getDeclaredMethod("handle", "java.lang.Object");
if (handleMethod != null) {
handleMethod.addInterceptor("com.navercorp.pinpoint.plugin.vertx.interceptor.HandlerInterceptor");
Expand Down Expand Up @@ -183,7 +183,7 @@ private void addHttpServerRequestImpl() {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncTraceIdAccessor.class.getName());
target.addField(AsyncContextAccessor.class.getName());

final InstrumentMethod handleExceptionMethod = target.getDeclaredMethod("handleException", "java.lang.Throwable");
if (handleExceptionMethod != null) {
Expand All @@ -206,7 +206,7 @@ private void addHttpServerResponseImpl() {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncTraceIdAccessor.class.getName());
target.addField(AsyncContextAccessor.class.getName());

final InstrumentMethod endMethod = target.getDeclaredMethod("end");
if (endMethod != null) {
Expand Down Expand Up @@ -293,7 +293,7 @@ private void addHttpClientRequestImpl() {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncTraceIdAccessor.class.getName());
target.addField(AsyncContextAccessor.class.getName());

// for HttpClientResponseImpl.
for (InstrumentMethod method : target.getDeclaredMethods(MethodFilters.name("doHandleResponse"))) {
Expand Down Expand Up @@ -360,7 +360,7 @@ private void addHttpClientResponseImpl() {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncTraceIdAccessor.class.getName());
target.addField(AsyncContextAccessor.class.getName());

final InstrumentMethod handleEndMethod = target.getDeclaredMethod("handleEnd", "io.vertx.core.buffer.Buffer", "io.vertx.core.MultiMap");
if (handleEndMethod != null) {
Expand Down
@@ -0,0 +1,207 @@
/*
* Copyright 2017 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.vertx.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.AsyncStateSupport;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.common.util.Assert;

/**
* @author jaehong.kim
*/
public abstract class AsyncContextSpanEventEndPointInterceptor implements AroundInterceptor {

protected final PLogger logger = PLoggerFactory.getLogger(getClass());
protected final boolean isDebug = logger.isDebugEnabled();
protected static final String ASYNC_TRACE_SCOPE = AsyncContext.ASYNC_TRACE_SCOPE;

protected final MethodDescriptor methodDescriptor;
protected final TraceContext traceContext;

public AsyncContextSpanEventEndPointInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
this.traceContext = Assert.requireNonNull(traceContext, "traceContext must not be null");
this.methodDescriptor = Assert.requireNonNull(methodDescriptor, "methodDescriptor must not be null");

}

@Override
public void before(Object target, Object[] args) {
if (isDebug) {
logger.beforeInterceptor(target, args);
}

final AsyncContext asyncContext = getAsyncContext(target);
if (asyncContext == null) {
logger.debug("AsyncContext not found");
return;
}

final Trace trace = getAsyncTrace(asyncContext);
if (trace == null) {
return;
}

if (isDebug) {
logger.debug("Asynchronous invocation. asyncTraceId={}, trace={}", asyncContext, trace);
}
// entry scope.
entryAsyncTraceScope(trace);

try {
// trace event for default & async.
final SpanEventRecorder recorder = trace.traceBlockBegin();
doInBeforeTrace(recorder, asyncContext, target, args);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
logger.warn("BEFORE. Caused:{}", th.getMessage(), th);
}
}
}

protected abstract void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args);

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
logger.afterInterceptor(target, args, result, throwable);
}

final AsyncContext asyncContext = getAsyncContext(target);
if (asyncContext == null) {
logger.debug("Not found asynchronous invocation metadata");
return;
}
if (isDebug) {
logger.debug("Asynchronous invocation. asyncContext={}", asyncContext);
}

final Trace trace = asyncContext.continueAsyncTraceObject();
if (trace == null) {
return;
}
if (isDebug) {
logger.debug("Asynchronous invocation. asyncTraceId={}, trace={}", asyncContext, trace);
}

// leave scope.
if (!leaveAsyncTraceScope(trace)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to leave scope of async trace {}.", trace);
}
// delete unstable trace.
deleteAsyncTrace(trace);
return;
}

try {
final SpanEventRecorder recorder = trace.currentSpanEventRecorder();
doInAfterTrace(recorder, target, args, result, throwable);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
logger.warn("AFTER error. Caused:{}", th.getMessage(), th);
}
} finally {
trace.traceBlockEnd();
if (isAsyncTraceDestination(trace)) {
if(isDebug) {
logger.debug("Arrived at async trace destination. asyncTraceId={}", asyncContext);
}
deleteAsyncTrace(trace);
finishAsyncState(asyncContext);
}
}
}

protected abstract void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable);

protected AsyncContext getAsyncContext(Object target) {
return AsyncContextAccessorUtils.getAsyncContext(target);
}

private Trace getAsyncTrace(AsyncContext asyncContext) {
final Trace trace = asyncContext.continueAsyncTraceObject();
if (trace == null) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to continue async trace. 'result is null'");
}
return null;
}
if (isDebug) {
logger.debug("getAsyncTrace() trace {}, asyncContext={}", trace, asyncContext);
}

return trace;
}

private void deleteAsyncTrace(final Trace trace) {
if (isDebug) {
logger.debug("Delete async trace {}.", trace);
}
traceContext.removeTraceObject();
trace.close();
}

private void entryAsyncTraceScope(final Trace trace) {
final TraceScope scope = trace.getScope(ASYNC_TRACE_SCOPE);
if (scope != null) {
scope.tryEnter();
}
}

private boolean leaveAsyncTraceScope(final Trace trace) {
final TraceScope scope = trace.getScope(ASYNC_TRACE_SCOPE);
if (scope != null) {
if (scope.canLeave()) {
scope.leave();
} else {
return false;
}
}
return true;
}

private boolean isAsyncTraceDestination(final Trace trace) {
if (!trace.isAsync()) {
return false;
}

final TraceScope scope = trace.getScope(ASYNC_TRACE_SCOPE);
return scope != null && !scope.isActive();
}

private void finishAsyncState(final AsyncContext asyncContext) {
if (asyncContext instanceof AsyncStateSupport) {
final AsyncStateSupport asyncStateSupport = (AsyncStateSupport) asyncContext;
AsyncState asyncState = asyncStateSupport.getAsyncState();
asyncState.finish();
if (isDebug) {
logger.debug("finished asyncState. asyncTraceId={}", asyncContext);
}
}
}

}
Expand Up @@ -15,8 +15,8 @@
*/
package com.navercorp.pinpoint.plugin.vertx.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncTraceIdAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceId;
import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
Expand Down Expand Up @@ -57,25 +57,24 @@ public void before(Object target, Object[] args) {
return;
}

final AsyncTraceIdAccessorHandlers handlers = getAsyncTraceIdAccessorHandlers(args);
final AsyncContextAccessorHandlers handlers = getAsyncContextAccessorHandlers(args);
if (handlers.blockingCodeHandler != null || handlers.resultHandler != null) {
// make asynchronous trace-id
final AsyncTraceId asyncTraceId = trace.getAsyncTraceId();
recorder.recordNextAsyncId(asyncTraceId.getAsyncId());
final AsyncContext asyncContext = recorder.newAsyncContext();

if (handlers.blockingCodeHandler != null) {
// blockingCodeHandler
handlers.blockingCodeHandler._$PINPOINT$_setAsyncTraceId(asyncTraceId);
handlers.blockingCodeHandler._$PINPOINT$_setAsyncContext(asyncContext);
if (isDebug) {
logger.debug("Set asyncTraceId metadata for ContextImpl.executeBlocking blockingCodeHandler. asyncTraceId={}", asyncTraceId);
logger.debug("Set asyncTraceId metadata for ContextImpl.executeBlocking blockingCodeHandler. asyncContext={}", asyncContext);
}
}

if (handlers.resultHandler != null) {
// resultHandler.
handlers.resultHandler._$PINPOINT$_setAsyncTraceId(asyncTraceId);
handlers.resultHandler._$PINPOINT$_setAsyncContext(asyncContext);
if (isDebug) {
logger.debug("Set asyncTraceId metadata for ContextImpl.executeBlocking resultHandler. asyncTraceId={}", asyncTraceId);
logger.debug("Set asyncTraceId metadata for ContextImpl.executeBlocking resultHandler. asyncContext={}", asyncContext);
}
}
}
Expand All @@ -91,23 +90,23 @@ private boolean validate(final Object[] args) {
return true;
}

private AsyncTraceIdAccessorHandlers getAsyncTraceIdAccessorHandlers(final Object[] args) {
final AsyncTraceIdAccessorHandlers handlers = new AsyncTraceIdAccessorHandlers();
private AsyncContextAccessorHandlers getAsyncContextAccessorHandlers(final Object[] args) {
final AsyncContextAccessorHandlers handlers = new AsyncContextAccessorHandlers();
if (args.length == 2) {
// Action<T> action, Handler<AsyncResult<T>> resultHandler
if (args[1] instanceof AsyncTraceIdAccessor) {
handlers.resultHandler = (AsyncTraceIdAccessor) args[1];
if (args[1] instanceof AsyncContextAccessor) {
handlers.resultHandler = (AsyncContextAccessor) args[1];
return handlers;
}
} else if (args.length == 3) {
// Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler
// Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler
if (args[0] instanceof AsyncTraceIdAccessor) {
handlers.blockingCodeHandler = (AsyncTraceIdAccessor) args[0];
if (args[0] instanceof AsyncContextAccessor) {
handlers.blockingCodeHandler = (AsyncContextAccessor) args[0];
}

if (args[2] instanceof AsyncTraceIdAccessor) {
handlers.resultHandler = (AsyncTraceIdAccessor) args[2];
if (args[2] instanceof AsyncContextAccessor) {
handlers.resultHandler = (AsyncContextAccessor) args[2];
}
}

Expand Down Expand Up @@ -136,8 +135,8 @@ public void after(Object target, Object[] args, Object result, Throwable throwab
}
}

private class AsyncTraceIdAccessorHandlers {
private AsyncTraceIdAccessor blockingCodeHandler;
private AsyncTraceIdAccessor resultHandler;
private class AsyncContextAccessorHandlers {
private AsyncContextAccessor blockingCodeHandler;
private AsyncContextAccessor resultHandler;
}
}
Expand Up @@ -15,8 +15,8 @@
*/
package com.navercorp.pinpoint.plugin.vertx.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncTraceIdAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceId;
import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
Expand Down Expand Up @@ -57,11 +57,11 @@ public void before(Object target, Object[] args) {

if (validate(args)) {
// make asynchronous trace-id
final AsyncTraceId asyncTraceId = trace.getAsyncTraceId();
recorder.recordNextAsyncId(asyncTraceId.getAsyncId());
((AsyncTraceIdAccessor) args[0])._$PINPOINT$_setAsyncTraceId(asyncTraceId);
final AsyncContext asyncContext = recorder.newAsyncContext();

((AsyncContextAccessor) args[0])._$PINPOINT$_setAsyncContext(asyncContext);
if (isDebug) {
logger.debug("Set asyncTraceId metadata {}", asyncTraceId);
logger.debug("Set asyncContext {}", asyncContext);
}
}
}
Expand All @@ -74,9 +74,9 @@ private boolean validate(final Object[] args) {
return false;
}

if (!(args[0] instanceof AsyncTraceIdAccessor)) {
if (!(args[0] instanceof AsyncContextAccessor)) {
if (isDebug) {
logger.debug("Invalid args[0] object. Need metadata accessor({}).", AsyncTraceIdAccessor.class.getName());
logger.debug("Invalid args[0] object. Need metadata accessor({}).", AsyncContextAccessor.class.getName());
}
return false;
}
Expand Down

0 comments on commit 5f20342

Please sign in to comment.