Skip to content

Commit

Permalink
[#4558] Apply span v2 compression
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Apr 29, 2019
1 parent e614683 commit cb7750e
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 50 deletions.
Expand Up @@ -7,6 +7,8 @@
*/
public interface BasicSpan {

int getVersion();

String getAgentId();
void setAgentId(String agentId);

Expand Down
Expand Up @@ -73,56 +73,65 @@ public class SpanBo implements Event, BasicSpan {
private byte loggingTransactionInfo; //optional




public SpanBo() {
}

@Override
public int getVersion() {
return version & 0xFF;
}


public byte getRawVersion() {
return version;
}

public void setVersion(int version) {
checkVersion(version);
// check range
this.version = (byte) (version & 0xFF);
}

static void checkVersion(int version) {
if (version < 0 || version > 255) {
throw new IllegalArgumentException("out of range (0~255)");
}
// check range
this.version = (byte) (version & 0xFF);
}

@Override
public TransactionId getTransactionId() {
return this.transactionId;
}

public void setTransactionId(TransactionId transactionId) {
this.transactionId = transactionId;
}


@Override
public String getAgentId() {
return agentId;
}

@Override
public void setAgentId(String agentId) {
this.agentId = agentId;
}

@Override
public String getApplicationId() {
return applicationId;
}

@Override
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

@Override
public long getAgentStartTime() {
return agentStartTime;
}

@Override
public void setAgentStartTime(long agentStartTime) {
this.agentStartTime = agentStartTime;
}
Expand Down Expand Up @@ -153,11 +162,12 @@ public void setRpc(String rpc) {
this.rpc = rpc;
}


@Override
public long getSpanId() {
return spanId;
}

@Override
public void setSpanId(long spanId) {
this.spanId = spanId;
}
Expand Down
Expand Up @@ -36,14 +36,18 @@ public class SpanChunkBo implements BasicSpan {
public SpanChunkBo() {
}

public byte getVersion() {
return version;
@Override
public int getVersion() {
return version & 0xFF;
}

public void setVersion(byte version) {
this.version = version;
public void setVersion(int version) {
SpanBo.checkVersion(version);
// check range
this.version = (byte) (version & 0xFF);
}

@Override
public String getAgentId() {
return agentId;
}
Expand All @@ -52,6 +56,7 @@ public void setAgentId(String agentId) {
this.agentId = agentId;
}

@Override
public String getApplicationId() {
return applicationId;
}
Expand All @@ -60,6 +65,7 @@ public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

@Override
public long getAgentStartTime() {
return agentStartTime;
}
Expand All @@ -77,10 +83,12 @@ public void setTransactionId(TransactionId transactionId) {
this.transactionId = transactionId;
}

@Override
public long getSpanId() {
return spanId;
}

@Override
public void setSpanId(long spanId) {
this.spanId = spanId;
}
Expand Down
Expand Up @@ -45,6 +45,7 @@
import com.navercorp.pinpoint.grpc.trace.PSpan;
import com.navercorp.pinpoint.grpc.trace.PSpanChunk;
import com.navercorp.pinpoint.grpc.trace.PSpanEvent;
import com.navercorp.pinpoint.io.SpanVersion;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -68,7 +69,6 @@ public class GrpcSpanFactory {

private static final AnnotationFactory<PAnnotation> annotationFactory = new AnnotationFactory<>(new GrpcAnnotationHandler());

private static final int TRACEFORMAT_V2 = 1;
public GrpcSpanFactory() {
}

Expand Down Expand Up @@ -98,14 +98,15 @@ public SpanBo buildSpanBo(PSpan pSpan, AgentHeaderFactory.Header header) {
}

private void checkVersion(int version) {
if (version != TRACEFORMAT_V2) {
if (version != SpanVersion.TRACE_V2) {
throw new IllegalStateException("unsupported version:" + version);
}
}

// for test
SpanBo newSpanBo(PSpan pSpan, AgentHeaderFactory.Header header) {
final SpanBo spanBo = new SpanBo();
spanBo.setVersion(pSpan.getVersion());
spanBo.setAgentId(header.getAgentId());
spanBo.setApplicationId(header.getApplicationName());
spanBo.setAgentStartTime(header.getAgentStartTime());
Expand Down Expand Up @@ -232,6 +233,7 @@ public SpanChunkBo buildSpanChunkBo(PSpanChunk pSpanChunk, AgentHeaderFactory.He
// for test
SpanChunkBo newSpanChunkBo(PSpanChunk pSpanChunk, AgentHeaderFactory.Header header) {
final SpanChunkBo spanChunkBo = new SpanChunkBo();
spanChunkBo.setVersion(pSpanChunk.getVersion());
spanChunkBo.setAgentId(header.getAgentId());
spanChunkBo.setApplicationId(header.getApplicationName());
spanChunkBo.setAgentStartTime(header.getAgentStartTime());
Expand All @@ -241,6 +243,7 @@ SpanChunkBo newSpanChunkBo(PSpanChunk pSpanChunk, AgentHeaderFactory.Header head
TransactionId transactionId = newTransactionId(pSpanChunk.getTransactionId(), spanChunkBo);
spanChunkBo.setTransactionId(transactionId);

spanChunkBo.setKeyTime(pSpanChunk.getKeyTime());

spanChunkBo.setSpanId(pSpanChunk.getSpanId());
spanChunkBo.setEndPoint(pSpanChunk.getEndPoint());
Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.bitfield.SpanEventQualifierBitField;
import com.navercorp.pinpoint.common.util.TransactionId;
import com.navercorp.pinpoint.common.server.bo.AnnotationTranscoder;
import com.navercorp.pinpoint.io.SpanVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -81,10 +82,12 @@ private SpanBo readSpan(Buffer qualifier, Buffer columnValue, SpanDecodingContex

private void readSpanChunkValue(Buffer buffer, SpanChunkBo spanChunk, SpanEventBo firstSpanEvent, SpanDecodingContext decodingContext) {
final byte version = buffer.readByte();
if (version != 0) {
throw new IllegalStateException("unknown version :" + version);
}

spanChunk.setVersion(version);
if (version == SpanVersion.TRACE_V2) {
final long keyTime = buffer.readVLong();
spanChunk.setKeyTime(keyTime);
}

List<SpanEventBo> spanEventBoList = readSpanEvent(buffer, firstSpanEvent, decodingContext);
spanChunk.addSpanEventBoList(spanEventBoList);
Expand All @@ -93,9 +96,7 @@ private void readSpanChunkValue(Buffer buffer, SpanChunkBo spanChunk, SpanEventB
public void readSpanValue(Buffer buffer, SpanBo span, SpanEventBo firstSpanEvent, SpanDecodingContext decodingContext) {

final byte version = buffer.readByte();
if (version != 0) {
throw new IllegalStateException("unknown version :" + version);
}

span.setVersion(version);

final SpanBitFiled bitFiled = new SpanBitFiled(buffer.readByte());
Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.bitfield.SpanEventBitField;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.bitfield.SpanEventQualifierBitField;
import com.navercorp.pinpoint.common.server.bo.AnnotationTranscoder;
import com.navercorp.pinpoint.io.SpanVersion;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,9 +95,12 @@ public ByteBuffer encodeSpanChunkColumnValue(SpanEncodingContext<SpanChunkBo> en

final Buffer buffer = new AutomaticBuffer(256);

final byte version = spanChunkBo.getVersion();
final byte version = (byte) spanChunkBo.getVersion();
buffer.putByte(version);

if (version == SpanVersion.TRACE_V2) {
long keyTime = spanChunkBo.getKeyTime();
buffer.putVLong(keyTime);
}

final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
writeSpanEventList(buffer, spanEventBoList, encodingContext);
Expand Down
Expand Up @@ -17,15 +17,15 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig;
import com.navercorp.pinpoint.thrift.dto.TraceConstants;
import com.navercorp.pinpoint.io.SpanVersion;

/**
* @author Woonduk Kang(emeroad)
*/
public enum TraceDataFormatVersion {

V1(TraceConstants.TRACE_V1),
V2(TraceConstants.TRACE_V2);
V1(SpanVersion.TRACE_V1),
V2(SpanVersion.TRACE_V2);

private static final String THRIFT_TRACE_VERSION_KEY = "profiler.transport.thrift.trace.dataformat.version";

Expand Down
Expand Up @@ -45,68 +45,68 @@ public SpanProcessorProtoV2() {
}

@Override
public void preProcess(Span span, PSpan.Builder tSpan) {
tSpan.setVersion(V2);
public void preProcess(Span span, PSpan.Builder pSpan) {
pSpan.setVersion(V2);

final List<SpanEvent> spanEventList = span.getSpanEventList();
Collections.sort(spanEventList, SEQUENCE_COMPARATOR);
}

@Override
public void preProcess(SpanChunk spanChunk, PSpanChunk.Builder tSpanChunk) {
tSpanChunk.setVersion(V2);
public void preProcess(SpanChunk spanChunk, PSpanChunk.Builder pSpanChunk) {
pSpanChunk.setVersion(V2);

final List<SpanEvent> spanEventList = spanChunk.getSpanEventList();
Collections.sort(spanEventList, SEQUENCE_COMPARATOR);
}

@Override
public void postProcess(SpanChunk span, PSpanChunk.Builder tSpan) {
final List<SpanEvent> spanEventList = span.getSpanEventList();
final List<PSpanEvent.Builder> tSpanEventList = tSpan.getSpanEventBuilderList();
public void postProcess(SpanChunk spanChunk, PSpanChunk.Builder pSpanChunk) {
final List<SpanEvent> spanEventList = spanChunk.getSpanEventList();
final List<PSpanEvent.Builder> tSpanEventList = pSpanChunk.getSpanEventBuilderList();
long keyTime = getKeyTime(spanEventList);
tSpan.setKeyTime(keyTime);
pSpanChunk.setKeyTime(keyTime);
postProcess(keyTime, spanEventList, tSpanEventList);
}

@Override
public void postProcess(Span span, PSpan.Builder tSpan) {
public void postProcess(Span span, PSpan.Builder pSpan) {
final List<SpanEvent> spanEventList = span.getSpanEventList();
final List<PSpanEvent.Builder> tSpanEventList = tSpan.getSpanEventBuilderList();
long keyTime = getKeyTime(spanEventList);
final List<PSpanEvent.Builder> tSpanEventList = pSpan.getSpanEventBuilderList();
long keyTime = span.getStartTime();
postProcess(keyTime, spanEventList, tSpanEventList);
}

private void postProcess(long keyTime, List<SpanEvent> spanEventList, List<PSpanEvent.Builder> tSpanEventList) {
if (!(CollectionUtils.nullSafeSize(spanEventList) == CollectionUtils.nullSafeSize(tSpanEventList))) {
private void postProcess(long keyTime, List<SpanEvent> spanEventList, List<PSpanEvent.Builder> pSpanEventList) {
if (!(CollectionUtils.nullSafeSize(spanEventList) == CollectionUtils.nullSafeSize(pSpanEventList))) {
throw new IllegalStateException("list size not same");
}

final Iterator<PSpanEvent.Builder> tSpanEventIterator = tSpanEventList.iterator();
final Iterator<PSpanEvent.Builder> pSpanEventIterator = pSpanEventList.iterator();

int prevDepth = 0;
boolean first = true;
for (SpanEvent spanEvent : spanEventList) {
final PSpanEvent.Builder tSpanEvent = tSpanEventIterator.next();
final PSpanEvent.Builder pSpanEvent = pSpanEventIterator.next();

final long startTime = spanEvent.getStartTime();
final long startElapsedTime = startTime - keyTime;
tSpanEvent.setStartElapsed((int) startElapsedTime);
pSpanEvent.setStartElapsed((int) startElapsedTime);
keyTime = startTime;

if (first) {
first = false;
int depth = spanEvent.getDepth();
prevDepth = depth;
tSpanEvent.setDepth(depth);
pSpanEvent.setDepth(depth);
} else {
int currentDepth = spanEvent.getDepth();

if (currentDepth == prevDepth) {
// skip
tSpanEvent.setDepth(0);
pSpanEvent.setDepth(0);
} else {
tSpanEvent.setDepth(currentDepth);
pSpanEvent.setDepth(currentDepth);
}
prevDepth = currentDepth;
}
Expand Down
Expand Up @@ -46,6 +46,7 @@
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TransactionIdEncoder;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.io.SpanVersion;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -97,7 +98,7 @@ public GeneratedMessageV3 toMessage(Object message) {
PSpan buildPSpan(Span span) {
final PSpan.Builder pSpan = PSpan.newBuilder();

// tSpan.setVersion(span.getVersion());
pSpan.setVersion(SpanVersion.TRACE_V2);

// pSpanBuilder.applicationName(applicationName);
// pSpanBuilder.setAgentId(agentId);
Expand Down Expand Up @@ -189,6 +190,7 @@ private List<PSpanEvent> buildPSpanEventList(List<SpanEvent> spanEventList) {
@VisibleForTesting
PSpanChunk buildPSpanChunk(SpanChunk spanChunk) {
final PSpanChunk.Builder pSpanChunk = PSpanChunk.newBuilder();
pSpanChunk.setVersion(SpanVersion.TRACE_V2);

// tSpanChunk.setApplicationName(applicationName);
// tSpanChunk.setAgentId(agentId);
Expand Down
Expand Up @@ -94,7 +94,7 @@ public void postProcess() {
PSpan pSpan = builder.build();

List<PSpanEvent> spanEventList = pSpan.getSpanEventList();
long keyStartTime = span.getSpanEventList().get(0).getStartTime();
long keyStartTime = span.getStartTime();
Iterator<SpanEvent> spanEventIterator = span.getSpanEventList().iterator();
for (PSpanEvent pSpanEvent : spanEventList) {
SpanEvent next = spanEventIterator.next();
Expand Down

0 comments on commit cb7750e

Please sign in to comment.