Skip to content

Commit

Permalink
- refactoring SpanBo
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 4, 2016
1 parent b9dbcbb commit 6cda79f
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 65 deletions.
Expand Up @@ -18,33 +18,27 @@

import com.navercorp.pinpoint.collector.dao.TracesDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;
import com.navercorp.pinpoint.common.server.bo.serializer.AnnotationSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.SpanEventSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.SpanSerializer;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.bo.AnnotationBo;
import com.navercorp.pinpoint.common.server.bo.AnnotationBoList;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
import com.navercorp.pinpoint.common.buffer.Buffer;
import static com.navercorp.pinpoint.common.hbase.HBaseTables.*;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.BytesUtils;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.thrift.dto.TAnnotation;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.TSpanEvent;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -67,6 +61,12 @@ public class HbaseTraceDao implements TracesDao {
@Autowired
private SpanSerializer spanSerializer;

@Autowired
private SpanEventSerializer spanEventSerializer;

@Autowired
private AnnotationSerializer annotationSerializer;

@Autowired
@Qualifier("traceDistributor")
private AbstractRowKeyDistributor rowKeyDistributor;
Expand All @@ -83,16 +83,8 @@ public void insert(final TSpan span) {
final Put put = new Put(rowKey);

this.spanSerializer.serialize(spanBo, put, null);
this.annotationSerializer.serialize(spanBo, put, null);

// TODO if we can identify whether the columnName is duplicated or not,
// we can also know whether the span id is duplicated or not.
final byte[] spanId = Bytes.toBytes(spanBo.getSpanId());

List<TAnnotation> annotations = span.getAnnotations();
if (CollectionUtils.isNotEmpty(annotations)) {
byte[] bytes = writeAnnotation(annotations);
put.addColumn(TRACES_CF_ANNOTATION, spanId, bytes);
}

addNestedSpanEvent(put, span);

Expand Down Expand Up @@ -147,24 +139,8 @@ private void addColumn(Put put, SpanEventBo spanEventBo) {
if (!spanEventFilter.filter(spanEventBo)) {
return;
}

byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
byte[] value = spanEventBo.writeValue();
final long acceptedTime = acceptedTimeService.getAcceptedTime();

put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime, value);
this.spanEventSerializer.serialize(spanEventBo, put, null);
}

private byte[] writeAnnotation(List<TAnnotation> annotations) {
List<AnnotationBo> boList = new ArrayList<>(annotations.size());
for (TAnnotation ano : annotations) {
AnnotationBo annotationBo = new AnnotationBo(ano);
boList.add(annotationBo);
}

Buffer buffer = new AutomaticBuffer(64);
AnnotationBoList annotationBoList = new AnnotationBoList(boList);
annotationBoList.writeValue(buffer);
return buffer.getBuffer();
}
}
Expand Up @@ -63,6 +63,10 @@ public int getVersion() {
return version & 0xFF;
}

public byte getRawVersion() {
return version;
}

public void setVersion(int version) {
if (version < 0 || version > 255) {
throw new IllegalArgumentException("out of range (0~255) " + version);
Expand All @@ -85,6 +89,10 @@ public int getValueType() {
return valueType;
}

public byte getRawValueType() {
return valueType;
}

public void setValueType(byte valueType) {
this.valueType = valueType;
}
Expand All @@ -105,6 +113,7 @@ public void setValue(Object value) {
this.value = value;
}

@Deprecated
public void writeValue(Buffer buffer) {
// long timestamp; // required 8
// long duration; // optional 8
Expand Down
Expand Up @@ -26,15 +26,16 @@
* @author emeroad
*/
public class AnnotationBoList {

private List<AnnotationBo> annotationBoList;

public AnnotationBoList() {
this.annotationBoList = new ArrayList<AnnotationBo>();
this.annotationBoList = new ArrayList<>();
}


public AnnotationBoList(int annotationBoListSize) {
this.annotationBoList = new ArrayList<AnnotationBo>(annotationBoListSize);
this.annotationBoList = new ArrayList<>(annotationBoListSize);
}

public AnnotationBoList(List<AnnotationBo> annotationBoList) {
Expand All @@ -53,7 +54,8 @@ public void addAnnotationBo(AnnotationBo annotationBo) {
this.annotationBoList.add(annotationBo);
}

public void writeValue(Buffer writer){
@Deprecated
public void writeValue(Buffer writer) {

int size = this.annotationBoList.size();
writer.putVar(size);
Expand All @@ -67,7 +69,7 @@ public void readValue(Buffer reader) {
if (size == 0) {
return;
}
this.annotationBoList = new ArrayList<AnnotationBo>(size);
this.annotationBoList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
AnnotationBo bo = new AnnotationBo();
bo.readValue(reader);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.common.server.bo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
Expand Down Expand Up @@ -128,7 +129,7 @@ public SpanBo(TSpan span) {
this.exceptionMessage = exceptionInfo.getStringValue();
}

setAnnotationList(span.getAnnotations());
this.annotationBoList = buildAnnotationList(span.getAnnotations());
}

public SpanBo(String traceAgentId, long traceAgentStartTime, long traceTransactionSequence, long startTime, int elapsed, long spanId) {
Expand Down Expand Up @@ -290,15 +291,16 @@ public List<AnnotationBo> getAnnotationBoList() {
return annotationBoList;
}

public void setAnnotationList(List<TAnnotation> anoList) {
private List<AnnotationBo> buildAnnotationList(List<TAnnotation> anoList) {
if (anoList == null) {
return;
return Collections.emptyList();
}
List<AnnotationBo> boList = new ArrayList<AnnotationBo>(anoList.size());
List<AnnotationBo> boList = new ArrayList<>(anoList.size());
for (TAnnotation ano : anoList) {
boList.add(new AnnotationBo(ano));
final AnnotationBo annotationBo = new AnnotationBo(ano);
boList.add(annotationBo);
}
this.annotationBoList = boList;
return boList;
}

public void setAnnotationBoList(List<AnnotationBo> anoList) {
Expand All @@ -310,7 +312,7 @@ public void setAnnotationBoList(List<AnnotationBo> anoList) {

public void addSpanEvent(SpanEventBo spanEventBo) {
if (spanEventBoList == null) {
spanEventBoList = new ArrayList<SpanEventBo>();
spanEventBoList = new ArrayList<>();
}
spanEventBoList.add(spanEventBo);
}
Expand Down
Expand Up @@ -128,15 +128,15 @@ public SpanEventBo(TSpan tSpan, TSpanEvent tSpanEvent) {
this.exceptionMessage = exceptionInfo.getStringValue();
}

if(tSpanEvent.isSetAsyncId()) {
if (tSpanEvent.isSetAsyncId()) {
this.asyncId = tSpanEvent.getAsyncId();
}

if(tSpanEvent.isSetNextAsyncId()) {
if (tSpanEvent.isSetNextAsyncId()) {
this.nextAsyncId = tSpanEvent.getNextAsyncId();
}

if(tSpanEvent.isSetAsyncSequence()) {
if (tSpanEvent.isSetAsyncSequence()) {
this.asyncSequence = tSpanEvent.getAsyncSequence();
}
}
Expand Down Expand Up @@ -196,11 +196,11 @@ public SpanEventBo(TSpanChunk spanChunk, TSpanEvent spanEvent) {
this.asyncId = spanEvent.getAsyncId();
}

if(spanEvent.isSetNextAsyncId()) {
if (spanEvent.isSetNextAsyncId()) {
this.nextAsyncId = spanEvent.getNextAsyncId();
}

if(spanEvent.isSetAsyncSequence()) {
if (spanEvent.isSetAsyncSequence()) {
this.asyncSequence = spanEvent.getAsyncSequence();
}
}
Expand All @@ -223,6 +223,14 @@ public void setAgentId(String agentId) {
this.agentId = agentId;
}

public String getApplicationId() {
return applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public long getAgentStartTime() {
return this.agentStartTime;
}
Expand Down Expand Up @@ -414,14 +422,12 @@ public void setAsyncSequence(short asyncSequence) {
this.asyncSequence = asyncSequence;
}

@Deprecated
public byte[] writeValue() {
final Buffer buffer = new AutomaticBuffer(512);

buffer.put(version);

// buffer.put(mostTraceID);
// buffer.put(leastTraceID);

buffer.putPrefixedString(agentId);
buffer.putPrefixedString(applicationId);
buffer.putVar(agentStartTime);
Expand Down Expand Up @@ -496,8 +502,8 @@ public int readValue(byte[] bytes, int offset, int length) {
}

this.annotationBoList = readAnnotation(buffer);
if(buffer.getOffset() < endOffset) {
nextAsyncId = buffer.readSVarInt();
if (buffer.getOffset() < endOffset) {
nextAsyncId = buffer.readSVarInt();
}

return buffer.getOffset();
Expand Down
@@ -0,0 +1,70 @@
package com.navercorp.pinpoint.common.server.bo.serializer;

import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.server.bo.AnnotationBo;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;

import static com.navercorp.pinpoint.common.hbase.HBaseTables.TRACES_CF_ANNOTATION;

/**
* @author Woonduk Kang(emeroad)
*/
@Component
public class AnnotationSerializer implements HbaseSerializer<SpanBo, Put> {


@Override
public void serialize(SpanBo spanBo, Put put, SerializationContext context) {

// TODO if we can identify whether the columnName is duplicated or not,
// we can also know whether the span id is duplicated or not.
final byte[] spanId = Bytes.toBytes(spanBo.getSpanId());

final List<AnnotationBo> annotations = spanBo.getAnnotationBoList();
if (CollectionUtils.isNotEmpty(annotations)) {
byte[] bytes = writeAnnotationList(annotations);
put.addColumn(TRACES_CF_ANNOTATION, spanId, bytes);
}

}

private byte[] writeAnnotationList(List<AnnotationBo> annotationList) {
final Buffer buffer = new AutomaticBuffer(64);
return writeAnnotationList(annotationList, buffer);
}

// for test
public byte[] writeAnnotationList(List<AnnotationBo> annotationList, Buffer buffer) {

if (annotationList == null) {
annotationList = Collections.emptyList();
}
final int size = annotationList.size();

buffer.putVar(size);
for (AnnotationBo annotationBo : annotationList) {
writeAnnotation(annotationBo, buffer);
}

return buffer.getBuffer();
}

// for test
public void writeAnnotation(AnnotationBo annotationBo, Buffer puffer) {
// int key; // required 4
// int valueTypeCode; // required 4
// ByteBuffer value; // optional 4 + buf.length
puffer.put(annotationBo.getRawVersion());
puffer.putSVar(annotationBo.getKey());
puffer.put(annotationBo.getRawValueType());
puffer.putPrefixedBytes(annotationBo.getByteValue());
}
}

0 comments on commit 6cda79f

Please sign in to comment.