Skip to content

Commit

Permalink
add TSpanEvent.asyncSequence
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Jun 12, 2015
1 parent 911cd04 commit d742861
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public interface AsyncTraceId extends TraceId {
long getSpanStartTime();

TraceId getParentTraceId();

short nextAsyncSequence();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public interface RecordableTrace {

void recordNextAsyncId(int asyncId);

void recordAsyncSequence(short sequence);

boolean isAsync();

long getTraceStartTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,8 @@ public AsyncTraceId getAsyncTraceId() {
@Override
public void close() {
}

@Override
public void recordAsyncSequence(short sequence) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void addNestedSpanEvent(Put put, TSpan span) {
long acceptedTime0 = acceptedTimeService.getAcceptedTime();
for (TSpanEvent spanEvent : spanEventBoList) {
SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId());
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
byte[] value = spanEventBo.writeValue();
put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime0, value);
}
Expand All @@ -128,7 +128,7 @@ public void insertSpanChunk(TSpanChunk spanChunk) {
SpanEventBo spanEventBo = new SpanEventBo(spanChunk, spanEvent);

byte[] value = spanEventBo.writeValue();
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId());
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());

put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class SpanEventBo implements Span {

private int asyncId = -1;
private int nextAsyncId = -1;

private short asyncSequence = -1;

public SpanEventBo() {
}

Expand Down Expand Up @@ -135,6 +136,10 @@ public SpanEventBo(TSpan tSpan, TSpanEvent tSpanEvent) {
if(tSpanEvent.isSetNextAsyncId()) {
this.nextAsyncId = tSpanEvent.getNextAsyncId();
}

if(tSpanEvent.isSetAsyncSequence()) {
this.asyncSequence = tSpanEvent.getAsyncSequence();
}
}

public SpanEventBo(TSpanChunk spanChunk, TSpanEvent spanEvent) {
Expand Down Expand Up @@ -195,6 +200,10 @@ public SpanEventBo(TSpanChunk spanChunk, TSpanEvent spanEvent) {
if(spanEvent.isSetNextAsyncId()) {
this.nextAsyncId = spanEvent.getNextAsyncId();
}

if(spanEvent.isSetAsyncSequence()) {
this.asyncSequence = spanEvent.getAsyncSequence();
}
}


Expand Down Expand Up @@ -397,6 +406,14 @@ public int getNextAsyncId() {
public void setNextAsyncId(int nextAsyncId) {
this.nextAsyncId = nextAsyncId;
}

public short getAsyncSequence() {
return asyncSequence;
}

public void setAsyncSequence(short asyncSequence) {
this.asyncSequence = asyncSequence;
}

public byte[] writeValue() {
final Buffer buffer = new AutomaticBuffer(512);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ public static byte[] add(final byte[] preFix, final short postfix) {
writeShort(postfix, buf, preFix.length);
return buf;
}

public static byte[] add(final byte[] preFix, final int postfix) {
byte[] buf = new byte[preFix.length + INT_BYTE_LENGTH];
System.arraycopy(preFix, 0, buf, 0, preFix.length);
writeInt(postfix, buf, preFix.length);
return buf;
}

public static byte[] add(final int preFix, final short postFix) {
byte[] buf = new byte[INT_BYTE_LENGTH + SHORT_BYTE_LENGTH];
Expand All @@ -313,11 +320,16 @@ public static byte[] add(final long preFix, final short postFix) {
return buf;
}

public static byte[] add(final long preFix, final short postFix, final int append) {
byte[] buf = new byte[LONG_BYTE_LENGTH + SHORT_BYTE_LENGTH + INT_BYTE_LENGTH];
writeLong(preFix, buf, 0);
writeShort(postFix, buf, LONG_BYTE_LENGTH);
writeInt(append, buf, LONG_BYTE_LENGTH + SHORT_BYTE_LENGTH);
public static byte[] add(final long preFix, final short postFix, final int intArg, final short shortArg) {
byte[] buf = new byte[LONG_BYTE_LENGTH + SHORT_BYTE_LENGTH + INT_BYTE_LENGTH + SHORT_BYTE_LENGTH];
int offset = 0;
writeLong(preFix, buf, offset);
offset += LONG_BYTE_LENGTH;
writeShort(postFix, buf, offset);
offset += SHORT_BYTE_LENGTH;
writeInt(intArg, buf, offset);
offset += INT_BYTE_LENGTH;
writeShort(shortArg, buf, offset);
return buf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ public class AsyncTrace implements Trace {

private final Trace trace;
private int asyncId;

private short asyncSequence;

public AsyncTrace(final Trace trace, final int asyncId) {
public AsyncTrace(final Trace trace, final int asyncId, final short asyncSequence) {
this.trace = trace;
this.asyncId = asyncId;
this.asyncSequence = asyncSequence;
traceBlockBegin(BEGIN_STACKID);
}

Expand Down Expand Up @@ -184,12 +185,14 @@ public void recordNextAsyncId(int asyncId) {
public void traceBlockBegin() {
trace.traceBlockBegin();
trace.recordAsyncId(asyncId);
trace.recordAsyncSequence(asyncSequence);
}

@Override
public void traceBlockBegin(int stackId) {
trace.traceBlockBegin(stackId);
trace.recordAsyncId(asyncId);
trace.recordAsyncSequence(asyncSequence);
}

@Override
Expand Down Expand Up @@ -227,4 +230,9 @@ public void close() {
traceBlockEnd(BEGIN_STACKID);
trace.close();
}

@Override
public void recordAsyncSequence(short sequence) {
trace.recordAsyncSequence(sequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class DefaultAsyncTraceId implements AsyncTraceId {
private final TraceId traceId;
private final int asyncId;
private final long startTime;
private short asyncSequence = 0;

public DefaultAsyncTraceId(final TraceId traceId, final int asyncId, final long startTime) {
this.traceId = traceId;
Expand All @@ -19,6 +20,11 @@ public int getAsyncId() {
return asyncId;
}

public synchronized short nextAsyncSequence() {
this.asyncSequence += 1;
return this.asyncSequence;
}

@Override
public TraceId getNextTraceId() {
return traceId.getNextTraceId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,4 +601,14 @@ public boolean isRootStack() {
public AsyncTraceId getAsyncTraceId() {
return new DefaultAsyncTraceId(traceId, traceContext.getAsyncId(), getTraceStartTime());
}

@Override
public void recordAsyncSequence(short asyncSequence) {
StackFrame currentStackFrame = this.currentStackFrame;
if(currentStackFrame instanceof SpanEventStackFrame) {
((SpanEventStackFrame) currentStackFrame).setAsyncSequence(asyncSequence);
} else {
throw new PinpointException("not SpanEventStackFrame");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,9 @@ public AsyncTraceId getAsyncTraceId() {
@Override
public void close() {
}

@Override
public void recordAsyncSequence(short sequence) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,7 @@ public AsyncTraceId getAsyncTraceId() {
throw new UnsupportedOperationException();
}

@Override
public void recordAsyncSequence(short sequence) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,8 @@ public void setAsyncId(int asyncId) {
public void setNextAsyncId(int asyncId) {
this.spanEvent.setNextAsyncId(asyncId);
}

public void setAsyncSequence(short asyncSequence) {
this.spanEvent.setAsyncSequence(asyncSequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public Trace continueAsyncTraceObject(AsyncTraceId traceId, int asyncId, long st
trace.setStorage(new AsyncStorage(storage));
trace.setSampling(true);

final AsyncTrace asyncTrace = new AsyncTrace(trace, asyncId);
final AsyncTrace asyncTrace = new AsyncTrace(trace, asyncId, traceId.nextAsyncSequence());
threadLocal.set(asyncTrace);

return asyncTrace;
Expand Down
Loading

0 comments on commit d742861

Please sign in to comment.