Skip to content

Commit

Permalink
add spanevent filter #1396
Browse files Browse the repository at this point in the history
 - prevent OOM
 - filter too many trace
  • Loading branch information
emeroad committed Jan 5, 2016
1 parent 31f7933 commit 0200004
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TracesDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;
import com.navercorp.pinpoint.collector.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.bo.AnnotationBo;
import com.navercorp.pinpoint.common.bo.AnnotationBoList;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class HbaseTraceDao implements TracesDao {
@Autowired
private AcceptedTimeService acceptedTimeService;

@Autowired
private SpanEventFilter spanEventFilter;

@Autowired
@Qualifier("traceDistributor")
private AbstractRowKeyDistributor rowKeyDistributor;
Expand Down Expand Up @@ -101,39 +105,52 @@ private byte[] getDistributeRowKey(byte[] transactionId) {
}

private void addNestedSpanEvent(Put put, TSpan span) {
List<TSpanEvent> spanEventBoList = span.getSpanEventList();
final List<TSpanEvent> spanEventBoList = span.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}

long acceptedTime0 = acceptedTimeService.getAcceptedTime();

for (TSpanEvent spanEvent : spanEventBoList) {
SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
byte[] value = spanEventBo.writeValue();
put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime0, value);
final SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
addColumn(put, spanEventBo);
}
}



@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
Put put = new Put(rowKey);
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
final Put put = new Put(rowKey);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}


long acceptedTime = acceptedTimeService.getAcceptedTime();
List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
for (TSpanEvent spanEvent : spanEventBoList) {
SpanEventBo spanEventBo = new SpanEventBo(spanChunk, spanEvent);
final SpanEventBo spanEventBo = new SpanEventBo(spanChunk, spanEvent);
addColumn(put, spanEventBo);
}

if (!put.isEmpty()) {
hbaseTemplate.put(TRACES, put);
}

byte[] value = spanEventBo.writeValue();
byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
}

put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime, value);
private void addColumn(Put put, SpanEventBo spanEventBo) {
if (!spanEventFilter.filter(spanEventBo)) {
return;
}
hbaseTemplate.put(TRACES, put);

byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
byte[] value = spanEventBo.writeValue();
final long acceptedTime = acceptedTimeService.getAcceptedTime();

put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime, value);
}

private byte[] writeAnnotation(List<TAnnotation> annotations) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* *
* * Copyright 2016 NAVER Corp.
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.navercorp.pinpoint.collector.dao.hbase.filter;

import com.navercorp.pinpoint.common.bo.SpanEventBo;
import com.navercorp.pinpoint.common.util.BytesUtils;


/**
* @author Woonduk Kang(emeroad)
*/
public class SequenceSpanEventFilter implements SpanEventFilter {

public static final int MAX_SEQUENCE = Short.MAX_VALUE;
public static final int DEFAULT_SEQUENCE_LIMIT = 1024*10;

private final int sequenceLimit;

public SequenceSpanEventFilter() {
this(DEFAULT_SEQUENCE_LIMIT);
}

public SequenceSpanEventFilter(int sequenceLimit) {
if (sequenceLimit > MAX_SEQUENCE) {
throw new IllegalArgumentException(sequenceLimit + " > MAX_SEQUENCE");
}
this.sequenceLimit = sequenceLimit;
}

@Override
public boolean filter(SpanEventBo spanEventBo) {
if (spanEventBo == null) {
return REJECT;
}
final int sequence = spanEventBo.getSequence();
if (sequence > sequenceLimit) {
return REJECT;
}
return ACCEPT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* *
* * Copyright 2016 NAVER Corp.
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.navercorp.pinpoint.collector.dao.hbase.filter;

import com.navercorp.pinpoint.common.bo.SpanEventBo;

/**
* @author Woonduk Kang(emeroad)
*/
public interface SpanEventFilter {
boolean ACCEPT = true;
boolean REJECT = false;

boolean filter(SpanEventBo spanEventBo);
}
4 changes: 4 additions & 0 deletions collector/src/main/resources/applicationContext-collector.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,8 @@

<bean id="hbaseSqlMetaDataPastVersionDao" class="com.navercorp.pinpoint.collector.dao.hbase.HbaseSqlMetaDataPastVersionDao"/>
<bean id="hbaseSqlMetaDataDao" class="com.navercorp.pinpoint.collector.dao.hbase.HbaseSqlMetaDataDao"/>

<bean id="sequenceSpanEventFilter" class="com.navercorp.pinpoint.collector.dao.hbase.filter.SequenceSpanEventFilter">
<constructor-arg index="0" value="${collector.spanEvent.sequence.limit:10000}"/>
</bean>
</beans>
2 changes: 2 additions & 0 deletions collector/src/main/resources/pinpoint-collector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ cluster.listen.port=
#collector.admin.password=
#collector.admin.api.rest.active=
#collector.admin.api.jmx.active=

collector.spanEvent.sequence.limit=10000
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* *
* * Copyright 2016 NAVER Corp.
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.navercorp.pinpoint.collector.dao.hbase.filter;

import com.navercorp.pinpoint.common.bo.SpanEventBo;
import org.junit.Assert;
import org.junit.Test;

/**
* @author Woonduk Kang(emeroad)
*/
public class SequenceSpanEventFilterTest {

@Test
public void testFilter_accept() throws Exception {
SpanEventFilter filter = new SequenceSpanEventFilter(100);

final SpanEventBo spanEventBo = new SpanEventBo();
spanEventBo.setSequence((short)11);

Assert.assertEquals(filter.filter(spanEventBo), SpanEventFilter.ACCEPT);

}


@Test
public void testFilter_reject() throws Exception {
SpanEventFilter filter = new SequenceSpanEventFilter(10);

final SpanEventBo spanEventBo = new SpanEventBo();
spanEventBo.setSequence((short)11);

Assert.assertEquals(filter.filter(spanEventBo), SpanEventFilter.REJECT);

}

@Test
public void testFilter_max() throws Exception {
new SequenceSpanEventFilter(Short.MAX_VALUE);

try {
new SequenceSpanEventFilter(Short.MAX_VALUE + 1);
Assert.fail();
} catch (Exception e) {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ public static int writeVar32(int value, final byte[] buf, int offset) {
}
}

public static int shortToUnsignedShort(short value) {
return value & 0xffff;
}

public static byte[] intToSVar32(int value) {
return intToVar32(intToZigZag(value));
}
Expand Down Expand Up @@ -532,7 +536,7 @@ public static byte[] add(final long preFix, final short postFix, final int intAr
writeShort(shortArg, buf, offset);
return buf;
}


public static byte[] toBytes(final String value) {
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,13 @@ public void testCheckBound() {
}

}

@Test
public void testShortToUnsignedShort() {
Assert.assertEquals(BytesUtils.shortToUnsignedShort((short)0), 0);
Assert.assertEquals(BytesUtils.shortToUnsignedShort(Short.MAX_VALUE), 32767);
final short maxOver = (short) (Short.MAX_VALUE + 1);
Assert.assertEquals(BytesUtils.shortToUnsignedShort(maxOver), 32768);
Assert.assertEquals(BytesUtils.shortToUnsignedShort((short)-1), 65535);
}
}

0 comments on commit 0200004

Please sign in to comment.