Skip to content

Commit

Permalink
[#9595] Clean up async context end-point patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Feb 20, 2023
1 parent 22bc533 commit 3ba290b
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 773 deletions.
Expand Up @@ -35,7 +35,6 @@
public abstract class AsyncContextSpanEventEndPointInterceptor implements AroundInterceptor {
protected final PLogger logger = PLoggerFactory.getLogger(getClass());
protected final boolean isDebug = logger.isDebugEnabled();
protected static final String ASYNC_TRACE_SCOPE = AsyncContext.ASYNC_TRACE_SCOPE;

protected final MethodDescriptor methodDescriptor;
protected final TraceContext traceContext;
Expand All @@ -53,7 +52,6 @@ public void before(Object target, Object[] args) {

final AsyncContext asyncContext = getAsyncContext(target, args);
if (asyncContext == null) {
logger.debug("AsyncContext not found");
return;
}

Expand All @@ -63,47 +61,46 @@ public void before(Object target, Object[] args) {
}

if (isDebug) {
logger.debug("Asynchronous invocation. asyncTraceId={}, trace={}", asyncContext, trace);
logger.debug("Asynchronous invocation. asyncContext={}, trace={}", asyncContext, trace);
}

// entry scope.
ScopeUtils.entryAsyncTraceScope(trace);

try {
// trace event for default & async.
final SpanEventRecorder recorder = trace.traceBlockBegin();
doInBeforeTrace(recorder, asyncContext, target, args);
prepareBefore(asyncContext, trace, recorder, target, args);
doInBeforeTrace(recorder, target, args);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
logger.warn("BEFORE. Caused:{}", th.getMessage(), th);
}
}
}

protected abstract void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args);
protected void prepareBefore(AsyncContext asyncContext, Trace trace, SpanEventRecorder recorder, Object target, Object[] args) {
}

protected abstract void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args);

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
logger.afterInterceptor(target, args, result, throwable);
}

final AsyncContext asyncContext = getAsyncContext(target, args);
final AsyncContext asyncContext = getAsyncContext(target, args, result, throwable);
if (asyncContext == null) {
if (isDebug) {
logger.debug("Not found asynchronous invocation metadata");
}
return;
}
if (isDebug) {
logger.debug("Asynchronous invocation. asyncContext={}", asyncContext);
}

final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
return;
}
if (isDebug) {
logger.debug("Asynchronous invocation. asyncTraceId={}, trace={}", asyncContext, trace);
logger.debug("Asynchronous invocation. asyncContext={}, trace={}", asyncContext, trace);
}

// leave scope.
Expand All @@ -118,6 +115,7 @@ public void after(Object target, Object[] args, Object result, Throwable throwab

try {
final SpanEventRecorder recorder = trace.currentSpanEventRecorder();
prepareAfter(asyncContext, trace, recorder, target, args, result, throwable);
doInAfterTrace(recorder, target, args, result, throwable);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
Expand All @@ -126,40 +124,35 @@ public void after(Object target, Object[] args, Object result, Throwable throwab
} finally {
trace.traceBlockEnd();
if (ScopeUtils.isAsyncTraceEndScope(trace)) {
if (isDebug) {
logger.debug("Arrived at async trace destination. asyncTraceId={}", asyncContext);
}
deleteAsyncTrace(trace);
}
finishAsyncState(asyncContext);
}
}

protected void prepareAfter(AsyncContext asyncContext, Trace trace, SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
}

protected abstract void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable);

protected AsyncContext getAsyncContext(Object target, Object[] args) {
return AsyncContextAccessorUtils.getAsyncContext(target);
}

protected AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
return AsyncContextAccessorUtils.getAsyncContext(target);
}

private Trace getAsyncTrace(AsyncContext asyncContext) {
final Trace trace = asyncContext.continueAsyncTraceObject();
if (trace == null) {
if (isDebug) {
logger.debug("Failed to continue async trace. 'result is null'");
}
return null;
}
if (isDebug) {
logger.debug("getAsyncTrace() trace {}, asyncContext={}", trace, asyncContext);
}

return trace;
}

private void deleteAsyncTrace(final Trace trace) {
if (isDebug) {
logger.debug("Delete async trace {}.", trace);
}
traceContext.removeTraceObject();
trace.close();
}
Expand Down

This file was deleted.

Expand Up @@ -21,6 +21,7 @@
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventEndPointInterceptor;
import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.plugin.akka.http.AkkaHttpConstants;
import scala.Option;
Expand All @@ -35,11 +36,11 @@ public RequestContextImplCompleteInterceptor(TraceContext traceContext, MethodDe
}

@Override
protected void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) {
}

@Override
protected void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
try {
if (result instanceof Future && ((Future) result).isCompleted()) {
Option value = ((Future) result).value();
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.navercorp.pinpoint.plugin.akka.http.interceptor;

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventEndPointInterceptor;
import com.navercorp.pinpoint.common.util.ArrayArgumentUtils;
import com.navercorp.pinpoint.plugin.akka.http.AkkaHttpConstants;

Expand All @@ -30,17 +30,16 @@ public RequestContextImplFailInterceptor(TraceContext traceContext, MethodDescri
}

@Override
protected void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) {
Throwable th = ArrayArgumentUtils.getArgument(args, 0, Throwable.class);
if (th != null) {
recorder.recordException(th);
}
}

@Override
protected void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordServiceType(AkkaHttpConstants.AKKA_HTTP_SERVER_INTERNAL);
}

}
Expand Up @@ -16,26 +16,24 @@

package com.navercorp.pinpoint.plugin.akka.http.interceptor;

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventEndPointInterceptor;
import com.navercorp.pinpoint.plugin.akka.http.AkkaHttpConstants;

public class RequestContextImplRedirectInterceptor extends AsyncContextSpanEventEndPointInterceptor {


public RequestContextImplRedirectInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);
}

@Override
protected void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {

public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) {
}

@Override
protected void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordServiceType(AkkaHttpConstants.AKKA_HTTP_SERVER_INTERNAL);
if (throwable != null) {
Expand Down
Expand Up @@ -52,7 +52,7 @@ public void setUp() throws Exception {

@Test
public void doInBeforeTrace() {
interceptor.doInBeforeTrace(recorder, null, null, new Object[]{marshallable});
interceptor.doInBeforeTrace(recorder, null, new Object[]{marshallable});
}

@Test
Expand Down

0 comments on commit 3ba290b

Please sign in to comment.