Skip to content

Commit

Permalink
#2524 refactoring async api
Browse files Browse the repository at this point in the history
 - extract AsyncState
  • Loading branch information
emeroad committed Jan 26, 2017
1 parent e478590 commit 261d962
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 94 deletions.
Expand Up @@ -21,7 +21,11 @@
* @author jaehong.kim
*/
@InterfaceAudience.LimitedPrivate("vert.x")
public interface AsyncTraceCloseable {
public interface AsyncState {

void close();
void setup();

boolean await();

void finish();
}
@@ -0,0 +1,27 @@
/*
* Copyright 2017 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.bootstrap.context;

import com.navercorp.pinpoint.common.annotations.InterfaceAudience;

/**
* @author Woonduk Kang(emeroad)
*/
@InterfaceAudience.LimitedPrivate("vert.x")
public interface AsyncStateSupport {
AsyncState getAsyncState();
}
Expand Up @@ -16,7 +16,8 @@
package com.navercorp.pinpoint.bootstrap.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncTraceIdAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceCloseable;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.AsyncStateSupport;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceId;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void after(Object target, Object[] args, Object result, Throwable throwab
if (isAsyncTraceDestination(trace)) {
deleteAsyncTrace(trace);
}
closeAsyncTraceId(asyncTraceId);
finishAsyncState(asyncTraceId);
}
}

Expand Down Expand Up @@ -210,13 +211,11 @@ private boolean isAsyncTraceDestination(final Trace trace) {
return scope != null && !scope.isActive();
}

private void closeAsyncTraceId(final AsyncTraceId asyncTraceId) {
if (asyncTraceId instanceof AsyncTraceCloseable) {
AsyncTraceCloseable closeable = (AsyncTraceCloseable) asyncTraceId;
closeable.close();
if (isDebug) {
logger.debug("Close AsyncTraceCloseable. asyncTraceId={}", asyncTraceId);
}
private void finishAsyncState(final AsyncTraceId asyncTraceId) {
if (asyncTraceId instanceof AsyncStateSupport) {
final AsyncStateSupport asyncStateSupport = (AsyncStateSupport) asyncTraceId;
AsyncState asyncState = asyncStateSupport.getAsyncState();
asyncState.finish();
}
}

Expand Down
Expand Up @@ -31,15 +31,14 @@ public class AsyncTrace implements Trace {

private int asyncId;
private short asyncSequence;
private AsyncTraceCloser closer;
private AsyncState asyncState;

public AsyncTrace(final Trace trace, final AsyncTraceCloser closer) {
if (closer == null) {
throw new IllegalArgumentException("closer must not be null.");
public AsyncTrace(final Trace trace, final AsyncState asyncState) {
if (asyncState == null) {
throw new IllegalArgumentException("asyncState must not be null.");
}

this.trace = trace;
this.closer = closer;
this.asyncState = asyncState;
this.entryPoint = true;
}

Expand All @@ -48,7 +47,7 @@ public AsyncTrace(final Trace trace, final int asyncId, final short asyncSequenc
this.asyncId = asyncId;
this.asyncSequence = asyncSequence;

this.closer = null;
this.asyncState = null;
this.entryPoint = false;

this.trace.getSpanRecorder().recordStartTime(startTime);
Expand Down Expand Up @@ -157,9 +156,10 @@ public AsyncTraceId getAsyncTraceId() {
@Override
public AsyncTraceId getAsyncTraceId(boolean closeable) {
final AsyncTraceId asyncTraceId = this.trace.getAsyncTraceId();
if (closeable && this.entryPoint && this.closer != null) {
this.closer.setup();
return new CloseableAsyncTraceId(asyncTraceId, this.closer);
final AsyncState asyncState = this.asyncState;
if (closeable && this.entryPoint && asyncState != null) {
asyncState.setup();
return new StatefulAsyncTraceId(asyncTraceId, asyncState);
}

return asyncTraceId;
Expand All @@ -176,24 +176,25 @@ public void close() {
}

private void closeOrFlush() {
if (this.closer == null) {
final AsyncState asyncState = this.asyncState;
if (asyncState == null) {
return;
}

if (this.closer.await()) {
if (asyncState.await()) {
// flush.
this.trace.flush();
if (isDebug) {
logger.debug("Flush trace={}, closer={}", this, this.closer);
logger.debug("Flush trace={}, asyncState={}", this, this.asyncState);
}
} else {
// close.
this.trace.close();
if (isDebug) {
logger.debug("Close trace={}. closer={}", this, this.closer);
logger.debug("Close trace={}. asyncState={}", this, this.asyncState);
}
}
this.closer = null;
this.asyncState = null;
}

@Override
Expand Down
Expand Up @@ -15,51 +15,53 @@
*/
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncTraceCloseable;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.common.annotations.InterfaceAudience;

/**
* @author jaehong.kim
*/
@InterfaceAudience.LimitedPrivate("vert.x")
public class AsyncTraceCloser implements AsyncTraceCloseable {
public class ListenableAsyncState implements AsyncState {

private final CompletionCallback completionCallback;
private final AsyncStateListener asyncStateListener;

private boolean closed = false;
private boolean setup = false;
private boolean await = false;
private boolean finish = false;

public AsyncTraceCloser(CompletionCallback completionCallback) {
if (completionCallback == null) {
throw new NullPointerException("completionCallback must not be null");
public ListenableAsyncState(AsyncStateListener asyncStateListener) {
if (asyncStateListener == null) {
throw new NullPointerException("asyncStateListener must not be null");
}
this.completionCallback = completionCallback;
this.asyncStateListener = asyncStateListener;
}

@Override
public void close() {
boolean fireCallback = false;
public void finish() {
boolean finished = false;
synchronized (this) {
if (this.await && !this.closed) {
fireCallback = true;
if (this.await && !this.finish) {
finished = true;
}
this.closed = true;
this.finish = true;
}
if (fireCallback) {
completionCallback.onComplete();
if (finished) {
this.asyncStateListener.finish();
}
}

@Override
public void setup() {
synchronized (this) {
this.setup = true;
}
}

@Override
public boolean await() {
synchronized (this) {
if (!this.setup || this.closed) {
if (!this.setup || this.finish) {
return false;
}
this.await = true;
Expand All @@ -68,17 +70,17 @@ public boolean await() {
}

@InterfaceAudience.LimitedPrivate("LocalTraceContext")
public interface CompletionCallback {
void onComplete();
public interface AsyncStateListener {
void finish();
}

@Override
public String toString() {
return "AsyncTraceCloser{" +
"completionCallback=" + completionCallback +
", closed=" + closed +
return "ListenableAsyncState{" +
"asyncStateListener=" + asyncStateListener +
", setup=" + setup +
", await=" + await +
", finish=" + finish +
'}';
}
}
Expand Up @@ -25,19 +25,20 @@
* @author Woonduk Kang(emeroad)
*/
@InterfaceAudience.LimitedPrivate("vert.x")
public class SpanCompletionCallback implements AsyncTraceCloser.CompletionCallback {
public class SpanAsyncStateListener implements ListenableAsyncState.AsyncStateListener {

private final AtomicIntegerFieldUpdater<SpanCompletionCallback> CLOSED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(SpanCompletionCallback.class, "closed");
private final AtomicIntegerFieldUpdater<SpanAsyncStateListener> CLOSED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(SpanAsyncStateListener.class, "closed");
private static final int OPEN = 0;
private static final int CLOSED = 1;

@SuppressWarnings("unused")
private volatile int closed = OPEN;

private final Span span;
private final Storage storage;

SpanCompletionCallback(Span span, Storage storage) {
SpanAsyncStateListener(Span span, Storage storage) {
if (span == null) {
throw new NullPointerException("span must not be null");
}
Expand All @@ -49,7 +50,7 @@ public class SpanCompletionCallback implements AsyncTraceCloser.CompletionCallba
}

@Override
public void onComplete() {
public void finish() {
if (CLOSED_UPDATER.compareAndSet(this, OPEN, CLOSED)) {
if (span.isTimeRecording()) {
span.markAfterTime();
Expand Down
Expand Up @@ -15,7 +15,8 @@
*/
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncTraceCloseable;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.AsyncStateSupport;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceId;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.common.annotations.InterfaceAudience;
Expand All @@ -24,20 +25,19 @@
* @author jaehong.kim
*/
@InterfaceAudience.LimitedPrivate("vert.x")
public class CloseableAsyncTraceId implements AsyncTraceId, AsyncTraceCloseable {
public class StatefulAsyncTraceId implements AsyncTraceId, AsyncStateSupport {
private final AsyncTraceId traceId;
private final AsyncTraceCloseable closeable;
private final AsyncState asyncState;

public CloseableAsyncTraceId(final AsyncTraceId traceId, final AsyncTraceCloseable closeable) {
public StatefulAsyncTraceId(final AsyncTraceId traceId, final AsyncState asyncState) {
if (traceId == null ) {
throw new IllegalArgumentException("traceId must not be null");
}
if (closeable == null) {
throw new NullPointerException("closeable must not be null");
if (asyncState == null) {
throw new NullPointerException("asyncState must not be null");
}

this.traceId = traceId;
this.closeable = closeable;
this.asyncState = asyncState;
}

@Override
Expand Down Expand Up @@ -106,10 +106,8 @@ public TraceId getParentTraceId() {
}

@Override
public void close() {
final AsyncTraceCloseable copy = this.closeable;
if (copy != null) {
copy.close();
}
public AsyncState getAsyncState() {
return asyncState;
}

}
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.AsyncTraceId;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
Expand Down Expand Up @@ -222,8 +223,8 @@ public Trace continueAsyncTraceObject(final TraceId traceId) {
final Storage storage = storageFactory.createStorage();
trace.setStorage(storage);

final SpanCompletionCallback callback = new SpanCompletionCallback(trace.getSpan(), storageFactory.createStorage());
final AsyncTraceCloser closer = new AsyncTraceCloser(callback);
final SpanAsyncStateListener callback = new SpanAsyncStateListener(trace.getSpan(), storageFactory.createStorage());
final ListenableAsyncState closer = new ListenableAsyncState(callback);
final AsyncTrace asyncTrace = new AsyncTrace(trace, closer);
bind(asyncTrace);
return asyncTrace;
Expand All @@ -240,8 +241,8 @@ public Trace newAsyncTraceObject() {
final Storage storage = storageFactory.createStorage();
trace.setStorage(storage);

final SpanCompletionCallback callback = new SpanCompletionCallback(trace.getSpan(), storageFactory.createStorage());
final AsyncTraceCloser closer = new AsyncTraceCloser(callback);
final SpanAsyncStateListener callback = new SpanAsyncStateListener(trace.getSpan(), storageFactory.createStorage());
final AsyncState closer = new ListenableAsyncState(callback);
final AsyncTrace asyncTrace = new AsyncTrace(trace, closer);
bind(asyncTrace);

Expand Down

0 comments on commit 261d962

Please sign in to comment.