Skip to content

Commit

Permalink
[#2584] Refactor AsyncContext
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 6, 2017
1 parent 51eb377 commit e26cb37
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 62 deletions.
Expand Up @@ -49,6 +49,8 @@ public interface SpanEventRecorder extends FrameAttachment {

AsyncContext newAsyncContext();

AsyncContext newAsyncContext(boolean stateful);

@Deprecated
void recordNextAsyncId(int asyncId);

Expand Down
Expand Up @@ -27,7 +27,12 @@
public interface AsyncContextFactory {
AsyncContext newAsyncContext(TraceRoot traceRoot);

AsyncContext newAsyncContext(TraceRoot traceRoot, AsyncState asyncState);


@Deprecated
AsyncTraceId newAsyncTraceId(TraceRoot traceRoot);

@Deprecated
AsyncTraceId newAsyncTraceId(TraceRoot traceRoot, AsyncState asyncState);
}
Expand Up @@ -61,6 +61,18 @@ public AsyncContext newAsyncContext(TraceRoot traceRoot) {
return new DefaultAsyncContext(traceFactory, traceRoot, asyncId, this.asyncMethodApiId);
}

@Override
public AsyncContext newAsyncContext(TraceRoot traceRoot, AsyncState asyncState) {
Assert.requireNonNull(traceRoot, "traceRoot must not be null");
Assert.requireNonNull(asyncState, "asyncState must not be null");

final TraceFactory traceFactory = traceFactoryProvider.get();
final int asyncId = asyncIdGenerator.nextAsyncId();
return new StatefulAsyncContext(traceFactory, traceRoot, asyncId, asyncMethodApiId, asyncState);
}


@Deprecated
@Override
public AsyncTraceId newAsyncTraceId(TraceRoot traceRoot) {
Assert.requireNonNull(traceRoot, "traceRoot must not be null");
Expand All @@ -69,6 +81,7 @@ public AsyncTraceId newAsyncTraceId(TraceRoot traceRoot) {
return new DefaultAsyncTraceId(traceRoot, asyncId);
}

@Deprecated
@Override
public AsyncTraceId newAsyncTraceId(TraceRoot traceRoot, AsyncState asyncState) {
Assert.requireNonNull(traceRoot, "traceRoot must not be null");
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.SpanRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.sampler.Sampler;
Expand All @@ -27,6 +28,7 @@
import com.navercorp.pinpoint.profiler.context.id.TraceRootFactory;
import com.navercorp.pinpoint.profiler.context.id.ListenableAsyncState;
import com.navercorp.pinpoint.profiler.context.recorder.RecorderFactory;
import com.navercorp.pinpoint.profiler.context.recorder.WrappedSpanEventRecorder;
import com.navercorp.pinpoint.profiler.context.storage.AsyncStorage;
import com.navercorp.pinpoint.profiler.context.storage.Storage;
import com.navercorp.pinpoint.profiler.context.storage.StorageFactory;
Expand Down Expand Up @@ -80,7 +82,10 @@ public Trace continueTraceObject(final TraceId traceId) {
final Storage storage = storageFactory.createStorage(traceRoot);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, true, recorderFactory);
final boolean samplingEnable = true;
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), samplingEnable);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();
final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, samplingEnable, spanRecorder, wrappedSpanEventRecorder);
return trace;
}

Expand All @@ -102,7 +107,10 @@ public Trace newTraceObject() {
final Storage storage = storageFactory.createStorage(traceRoot);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, true, recorderFactory);
final TraceId traceId = traceRoot.getTraceId();
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();
final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, sampling, spanRecorder, wrappedSpanEventRecorder);

return trace;
} else {
Expand All @@ -121,8 +129,13 @@ public Trace continueAsyncTraceObject(TraceRoot traceRoot, int asyncId, short as

final Storage asyncStorage = new AsyncStorage(storage);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final boolean samplingEnable = true;
final TraceId traceId = traceRoot.getTraceId();
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), samplingEnable);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();
// TODO AtomicIdGenerator.UNTRACKED_ID
final Trace trace = new DefaultTrace(span, callStack, asyncStorage, asyncContextFactory, true, recorderFactory);
final DefaultTrace trace = new DefaultTrace(span, callStack, asyncStorage, asyncContextFactory, samplingEnable, spanRecorder, wrappedSpanEventRecorder);

final Trace asyncTrace = new AsyncTrace(asyncContextFactory, traceRoot, trace, asyncId, asyncSequence);

Expand All @@ -139,11 +152,15 @@ public Trace continueAsyncTraceObject(final TraceId traceId) {
final Storage storage = storageFactory.createStorage(traceRoot);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final DefaultTrace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, true, recorderFactory);

final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory.createStorage(traceRoot));
final ListenableAsyncState stateListener = new ListenableAsyncState(asyncStateListener);
final AsyncTrace asyncTrace = new AsyncTrace(asyncContextFactory, traceRoot, trace, stateListener);
final AsyncState asyncState = new ListenableAsyncState(asyncStateListener);

final boolean sampling = true;
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(asyncState);
final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, sampling, spanRecorder, wrappedSpanEventRecorder);

final AsyncTrace asyncTrace = new AsyncTrace(asyncContextFactory, traceRoot, trace, asyncState);

return asyncTrace;
}
Expand All @@ -162,11 +179,17 @@ public Trace newAsyncTraceObject() {
final Storage storage = storageFactory.createStorage(traceRoot);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final DefaultTrace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, true, recorderFactory);

final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory.createStorage(traceRoot));
final AsyncState closer = new ListenableAsyncState(asyncStateListener);
final AsyncTrace asyncTrace = new AsyncTrace(asyncContextFactory, traceRoot, trace, closer);
final AsyncState asyncState = new ListenableAsyncState(asyncStateListener);


final TraceId traceId = traceRoot.getTraceId();
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(asyncState);

final Trace trace = new DefaultTrace(span, callStack, storage, asyncContextFactory, sampling, spanRecorder, wrappedSpanEventRecorder);

final AsyncTrace asyncTrace = new AsyncTrace(asyncContextFactory, traceRoot, trace, asyncState);

return asyncTrace;
} else {
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import com.navercorp.pinpoint.profiler.context.recorder.RecorderFactory;
import com.navercorp.pinpoint.profiler.context.recorder.WrappedSpanEventRecorder;
import com.navercorp.pinpoint.profiler.context.scope.DefaultTraceScopePool;
import org.slf4j.Logger;
Expand All @@ -41,14 +40,13 @@ public final class DefaultTrace implements Trace {

private final boolean sampling;

private final TraceRoot traceRoot;
private final CallStack callStack;

private final Storage storage;

private final Span span;
private final SpanRecorder spanRecorder;
private final WrappedSpanEventRecorder spanEventRecorder;
private final WrappedSpanEventRecorder wrappedSpanEventRecorder;

private final AsyncContextFactory asyncContextFactory;

Expand All @@ -58,28 +56,28 @@ public final class DefaultTrace implements Trace {
private final DefaultTraceScopePool scopePool = new DefaultTraceScopePool();


public DefaultTrace(Span span, CallStack callStack, Storage storage, AsyncContextFactory asyncContextFactory, boolean sampling, RecorderFactory recorderFactory) {
public DefaultTrace(Span span, CallStack callStack, Storage storage, AsyncContextFactory asyncContextFactory, boolean sampling,
SpanRecorder spanRecorder, WrappedSpanEventRecorder wrappedSpanEventRecorder) {

this.span = Assert.requireNonNull(span, "span must not be null");
this.callStack = Assert.requireNonNull(callStack, "callStack must not be null");
this.storage = Assert.requireNonNull(storage, "storage must not be null");
this.sampling = Assert.requireNonNull(sampling, "sampling must not be null");
this.asyncContextFactory = Assert.requireNonNull(asyncContextFactory, "asyncContextFactory must not be null");

this.traceRoot = span.getTraceRoot();
final TraceId traceId = traceRoot.getTraceId();

Assert.requireNonNull(recorderFactory, "recorderFactory must not be null");
this.spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
this.spanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();
this.spanRecorder = Assert.requireNonNull(spanRecorder, "spanRecorder must not be null");
this.wrappedSpanEventRecorder = Assert.requireNonNull(wrappedSpanEventRecorder, "wrappedSpanEventRecorder must not be null");

setCurrentThread();
}

private SpanEventRecorder wrappedSpanEventRecorder(SpanEvent spanEvent) {
final WrappedSpanEventRecorder spanEventRecorder = this.spanEventRecorder;
spanEventRecorder.setWrapped(spanEvent);
return spanEventRecorder;
private TraceRoot getTraceRoot0() {
return this.span.getTraceRoot();
}

private SpanEventRecorder wrappedSpanEventRecorder(WrappedSpanEventRecorder wrappedSpanEventRecorder, SpanEvent spanEvent) {
wrappedSpanEventRecorder.setWrapped(spanEvent);
return wrappedSpanEventRecorder;
}

@Override
Expand All @@ -90,7 +88,7 @@ public SpanEventRecorder traceBlockBegin() {
@Override
public SpanEventRecorder traceBlockBegin(final int stackId) {
// Set properties for the case when stackFrame is not used as part of Span.
final SpanEvent spanEvent = new SpanEvent(traceRoot);
final SpanEvent spanEvent = new SpanEvent(getTraceRoot0());
spanEvent.markStartTime();
spanEvent.setStackId(stackId);

Expand All @@ -103,7 +101,7 @@ public SpanEventRecorder traceBlockBegin(final int stackId) {
callStack.push(spanEvent);
}

return wrappedSpanEventRecorder(spanEvent);
return wrappedSpanEventRecorder(this.wrappedSpanEventRecorder, spanEvent);
}

@Override
Expand Down Expand Up @@ -181,12 +179,12 @@ public void flush() {
*/
@Override
public TraceId getTraceId() {
return this.traceRoot.getTraceId();
return getTraceRoot0().getTraceId();
}

@Override
public long getId() {
return this.traceRoot.getLocalTransactionId();
return getTraceRoot0().getLocalTransactionId();
}

@Override
Expand Down Expand Up @@ -242,7 +240,7 @@ public AsyncTraceId getAsyncTraceId() {
@Override
public AsyncTraceId getAsyncTraceId(boolean closeable) {
// ignored closeable.
return asyncContextFactory.newAsyncTraceId(traceRoot);
return asyncContextFactory.newAsyncTraceId(getTraceRoot0());
}

@Override
Expand All @@ -259,10 +257,10 @@ public SpanEventRecorder currentSpanEventRecorder() {
logger.warn("[DefaultTrace] Corrupted call stack found.", exception);
}
// make dummy.
spanEvent = new SpanEvent(traceRoot);
spanEvent = new SpanEvent(getTraceRoot0());
}

return wrappedSpanEventRecorder(spanEvent);
return wrappedSpanEventRecorder(this.wrappedSpanEventRecorder, spanEvent);
}

@Override
Expand All @@ -289,7 +287,7 @@ public TraceScope addScope(String name) {
public String toString() {
return "DefaultTrace{" +
"sampling=" + sampling +
", traceRoot=" + traceRoot +
", traceRoot=" + getTraceRoot0() +
'}';
}
}
Expand Up @@ -28,7 +28,7 @@
@InterfaceAudience.LimitedPrivate("vert.x")
public class SpanAsyncStateListener implements ListenableAsyncState.AsyncStateListener {

private final AtomicIntegerFieldUpdater<SpanAsyncStateListener> CLOSED_UPDATER
private static final AtomicIntegerFieldUpdater<SpanAsyncStateListener> CLOSED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(SpanAsyncStateListener.class, "closed");
private static final int OPEN = 0;
private static final int CLOSED = 1;
Expand Down
@@ -0,0 +1,49 @@
/*
* Copyright 2017 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.AsyncStateSupport;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;

/**
* @author Woonduk Kang(emeroad)
*/
public class StatefulAsyncContext extends DefaultAsyncContext implements AsyncStateSupport {


private final AsyncState asyncState;


public StatefulAsyncContext(TraceFactory traceFactory, TraceRoot traceRoot, int asyncId, int asyncMethodApiId, AsyncState asyncState) {
super(traceFactory, traceRoot, asyncId, asyncMethodApiId);
this.asyncState = Assert.requireNonNull(asyncState, "asyncState must not be null");
}

@Override
public AsyncState getAsyncState() {
return asyncState;
}

@Override
public String toString() {
return "StatefulAsyncContext{" +
"asyncState=" + asyncState +
"} " + super.toString();
}
}
Expand Up @@ -18,6 +18,7 @@

import com.google.inject.Inject;
import com.google.inject.Provider;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.SpanRecorder;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.AsyncContextFactory;
Expand Down Expand Up @@ -49,6 +50,14 @@ public SpanRecorder newSpanRecorder(Span span, boolean isRoot, boolean sampling)
@Override
public WrappedSpanEventRecorder newWrappedSpanEventRecorder() {
final AsyncContextFactory asyncContextFactory = asyncContextFactoryProvider.get();
return new WrappedSpanEventRecorder(asyncContextFactory, stringMetaDataService, sqlMetaDataService);
return new WrappedSpanEventRecorder(asyncContextFactory, stringMetaDataService, sqlMetaDataService, null);
}

@Override
public WrappedSpanEventRecorder newWrappedSpanEventRecorder(AsyncState asyncState) {
Assert.requireNonNull(asyncState, "asyncState must not be null");

final AsyncContextFactory asyncContextFactory = asyncContextFactoryProvider.get();
return new WrappedSpanEventRecorder(asyncContextFactory, stringMetaDataService, sqlMetaDataService, asyncState);
}
}
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.context.recorder;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.SpanRecorder;
import com.navercorp.pinpoint.profiler.context.Span;

Expand All @@ -27,4 +28,6 @@ public interface RecorderFactory {
SpanRecorder newSpanRecorder(final Span span, final boolean isRoot, final boolean sampling);

WrappedSpanEventRecorder newWrappedSpanEventRecorder();

WrappedSpanEventRecorder newWrappedSpanEventRecorder(AsyncState asyncState);
}

0 comments on commit e26cb37

Please sign in to comment.