Skip to content

Commit

Permalink
#1819 trace format v2
Browse files Browse the repository at this point in the history
  - refactoring
  • Loading branch information
emeroad committed Jul 27, 2016
1 parent bb0d1ee commit 6db7cd1
Show file tree
Hide file tree
Showing 42 changed files with 1,056 additions and 608 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.navercorp.pinpoint.collector.dao;

import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;

/**
* @author Woonduk Kang(emeroad)
*/
public interface TraceDao {
void insert(TSpan span);
void insert(SpanBo span);

void insertSpanChunk(TSpanChunk spanChunk);
void insertSpanChunk(SpanChunkBo spanChunk);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +29,7 @@ public DualWriteHbaseTraceDao(TraceDao master, TraceDao slave) {
}

@Override
public void insert(TSpan span) {
public void insert(SpanBo span) {
Throwable masterException = null;
try {
master.insert(span);
Expand All @@ -45,15 +45,15 @@ public void insert(TSpan span) {
}

@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
Throwable masterException = null;
try {
master.insertSpanChunk(spanChunk);
master.insertSpanChunk(spanChunkBo);
} catch (Throwable e) {
masterException = e;
}
try {
slave.insertSpanChunk(spanChunk);
slave.insertSpanChunk(spanChunkBo);
} catch (Throwable e) {
logger.warn("slave insertSpanChunk(TSpanChunk) Error:{}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TracesDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;

import com.navercorp.pinpoint.common.server.bo.BasicSpan;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import com.navercorp.pinpoint.common.server.bo.filter.SpanEventFilter;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.AnnotationSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanEventEncodingContext;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanEventSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanSerializer;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import static com.navercorp.pinpoint.common.hbase.HBaseTables.*;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.TSpanEvent;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -72,24 +73,22 @@ public class HbaseTraceDao implements TracesDao {
private AbstractRowKeyDistributor rowKeyDistributor;

@Override
public void insert(final TSpan span) {
if (span == null) {
public void insert(final SpanBo spanBo) {
if (spanBo == null) {
throw new NullPointerException("span must not be null");
}

final SpanBo spanBo = new SpanBo(span);

long acceptedTime = acceptedTimeService.getAcceptedTime();
spanBo.setCollectorAcceptTime(acceptedTime);
long acceptedTime = spanBo.getCollectorAcceptTime();

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(span));
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanBo));
final Put put = new Put(rowKey, acceptedTime);

this.spanSerializer.serialize(spanBo, put, null);
this.annotationSerializer.serialize(spanBo, put, null);


addNestedSpanEvent(put, span);
addNestedSpanEvent(put, spanBo);

boolean success = hbaseTemplate.asyncPut(TRACES, put);
if (!success) {
Expand All @@ -101,34 +100,32 @@ private byte[] getDistributeRowKey(byte[] transactionId) {
return rowKeyDistributor.getDistributedKey(transactionId);
}

private void addNestedSpanEvent(Put put, TSpan span) {
final List<TSpanEvent> spanEventBoList = span.getSpanEventList();
private void addNestedSpanEvent(Put put, SpanBo span) {
final List<SpanEventBo> spanEventBoList = span.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}


for (TSpanEvent spanEvent : spanEventBoList) {
final SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
addColumn(put, spanEventBo);
for (SpanEventBo spanEvent : spanEventBoList) {
addColumn(put, span, spanEvent);
}
}

@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunkBo));
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final Put put = new Put(rowKey, acceptedTime);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}


for (TSpanEvent spanEvent : spanEventBoList) {
final SpanEventBo spanEventBo = new SpanEventBo(spanChunk, spanEvent);
addColumn(put, spanEventBo);
for (SpanEventBo spanEventBo : spanEventBoList) {
addColumn(put, spanChunkBo, spanEventBo);
}

if (!put.isEmpty()) {
Expand All @@ -139,11 +136,12 @@ public void insertSpanChunk(TSpanChunk spanChunk) {
}
}

private void addColumn(Put put, SpanEventBo spanEventBo) {
private void addColumn(Put put, BasicSpan basicSpan, SpanEventBo spanEventBo) {
if (!spanEventFilter.filter(spanEventBo)) {
return;
}
this.spanEventSerializer.serialize(spanEventBo, put, null);
SpanEventEncodingContext spanEventEncodingContext = new SpanEventEncodingContext(basicSpan.getSpanId(), spanEventBo);
this.spanEventSerializer.serialize(spanEventEncodingContext, put, null);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.SpanChunkSerializerV2;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.SpanSerializerV2;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.common.util.TransactionId;
import com.navercorp.pinpoint.common.util.TransactionIdUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.TSpanEvent;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
Expand All @@ -24,8 +17,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static com.navercorp.pinpoint.common.hbase.HBaseTables.TRACE_V2;
Expand All @@ -41,11 +32,6 @@ public class HbaseTraceDaoV2 implements TraceDao {
@Autowired
private HbaseOperations2 hbaseTemplate;

@Autowired
private AcceptedTimeService acceptedTimeService;

@Autowired
private SpanEventFilter spanEventFilter;

@Autowired
private SpanSerializerV2 spanSerializer;
Expand All @@ -59,19 +45,15 @@ public class HbaseTraceDaoV2 implements TraceDao {


@Override
public void insert(final TSpan span) {
if (span == null) {
throw new NullPointerException("span must not be null");
public void insert(final SpanBo spanBo) {
if (spanBo == null) {
throw new NullPointerException("spanBo must not be null");
}

final SpanBo spanBo = new SpanBo(span);
List<SpanEventBo> spanEventBoList = buildSpanEventList(span);
spanBo.addSpanEventBoList(spanEventBoList);

long acceptedTime = acceptedTimeService.getAcceptedTime();
spanBo.setCollectorAcceptTime(acceptedTime);
long acceptedTime = spanBo.getCollectorAcceptTime();

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(span));
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanBo));
final Put put = new Put(rowKey, acceptedTime);

this.spanSerializer.serialize(spanBo, put, null);
Expand All @@ -84,41 +66,7 @@ public void insert(final TSpan span) {

}

private List<SpanEventBo> buildSpanEventList(TSpan span) {
final List<TSpanEvent> spanEventList = span.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventList)) {
return Collections.emptyList();
}

List<SpanEventBo> spanEventBoList = new ArrayList<>(spanEventList.size());
for (TSpanEvent spanEvent : spanEventList) {
final SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
if (!spanEventFilter.filter(spanEventBo)) {
continue;
}
spanEventBoList.add(spanEventBo);
}


return spanEventBoList;
}

private List<SpanEventBo> buildSpanEventBoList(TSpanChunk tSpanChunk) {
List<TSpanEvent> spanEventList = tSpanChunk.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventList)) {
return new ArrayList<>();
}
List<SpanEventBo> spanEventBoList = new ArrayList<>(spanEventList.size());
for (TSpanEvent tSpanEvent : spanEventList) {
SpanEventBo spanEventBo = new SpanEventBo(tSpanChunk, tSpanEvent);
if (!spanEventFilter.filter(spanEventBo)) {
continue;
}
spanEventBoList.add(spanEventBo);
}

return spanEventBoList;
}

private byte[] getDistributeRowKey(byte[] transactionId) {
byte[] distributedKey = rowKeyDistributor.getDistributedKey(transactionId);
Expand All @@ -128,14 +76,14 @@ private byte[] getDistributeRowKey(byte[] transactionId) {


@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
SpanChunkBo spanChunkBo = buildSpanChunkBo(spanChunk);
public void insertSpanChunk(SpanChunkBo spanChunkBo) {

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunkBo));

final long acceptedTime = spanChunkBo.getCollectorAcceptTime();
final Put put = new Put(rowKey, acceptedTime);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}
Expand All @@ -150,28 +98,6 @@ public void insertSpanChunk(TSpanChunk spanChunk) {
}
}

public SpanChunkBo buildSpanChunkBo(TSpanChunk tSpanChunk) {
SpanChunkBo spanChunkBo = new SpanChunkBo();
spanChunkBo.setAgentId(tSpanChunk.getAgentId());
spanChunkBo.setApplicationId(tSpanChunk.getApplicationName());
spanChunkBo.setAgentStartTime(tSpanChunk.getAgentStartTime());

final TransactionId transactionId = TransactionIdUtils.parseTransactionId(tSpanChunk.getTransactionId());
final String traceAgentId = transactionId.getAgentId();
if (traceAgentId == null) {
spanChunkBo.setTraceAgentId(spanChunkBo.getAgentId());
} else {
spanChunkBo.setTraceAgentId(traceAgentId);
}
spanChunkBo.setTraceAgentStartTime(transactionId.getAgentStartTime());
spanChunkBo.setTraceTransactionSequence(transactionId.getTransactionSequence());

spanChunkBo.setSpanId(tSpanChunk.getSpanId());

List<SpanEventBo> spanEventBoList = buildSpanEventBoList(tSpanChunk);
spanChunkBo.addSpanEventBoList(spanEventBoList);
return spanChunkBo;
}



Expand Down
Loading

0 comments on commit 6db7cd1

Please sign in to comment.