Skip to content

Commit

Permalink
[#8372] Replace HTableMultiplexer with BufferedMutator
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 15, 2021
1 parent 15577df commit cc9a773
Show file tree
Hide file tree
Showing 14 changed files with 502 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* @author Woonduk Kang(emeroad)
*/
public interface TraceDao {
boolean insert(SpanBo span);
void insert(SpanBo span);

boolean insertSpanChunk(SpanChunkBo spanChunk);
void insertSpanChunk(SpanChunkBo spanChunk);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.collector.util.CollectorUtils;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.profiler.util.TransactionId;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
Expand Down Expand Up @@ -49,7 +49,7 @@ public class HbaseTraceDaoV2 implements TraceDao {

private static final HbaseColumnFamily.Trace descriptor = HbaseColumnFamily.TRACE_V2_SPAN;

private final HbaseOperations2 hbaseTemplate;
private final SimpleBatchWriter writer;
private final TableNameProvider tableNameProvider;

private final SpanSerializerV2 spanSerializer;
Expand All @@ -58,20 +58,20 @@ public class HbaseTraceDaoV2 implements TraceDao {

private final RowKeyEncoder<TransactionId> rowKeyEncoder;

public HbaseTraceDaoV2(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseTraceDaoV2(SimpleBatchWriter writer,
TableNameProvider tableNameProvider,
@Qualifier("traceRowKeyEncoderV2") RowKeyEncoder<TransactionId> rowKeyEncoder,
SpanSerializerV2 spanSerializer,
SpanChunkSerializerV2 spanChunkSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.writer = Objects.requireNonNull(writer, "writer");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
this.rowKeyEncoder = Objects.requireNonNull(rowKeyEncoder, "rowKeyEncoder");
this.spanSerializer = Objects.requireNonNull(spanSerializer, "spanSerializer");
this.spanChunkSerializer = Objects.requireNonNull(spanChunkSerializer, "spanChunkSerializer");
}

@Override
public boolean insert(final SpanBo spanBo) {
public void insert(final SpanBo spanBo) {
Objects.requireNonNull(spanBo, "spanBo");
if (logger.isDebugEnabled()) {
logger.debug("insert trace: {}", spanBo);
Expand All @@ -91,12 +91,11 @@ public boolean insert(final SpanBo spanBo) {
this.spanSerializer.serialize(spanBo, put, null);

TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable());

return hbaseTemplate.asyncPut(traceTableName, put);
writer.write(traceTableName, put);
}

@Override
public boolean insertSpanChunk(SpanChunkBo spanChunkBo) {
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
Objects.requireNonNull(spanChunkBo, "spanChunkBo");

TransactionId transactionId = spanChunkBo.getTransactionId();
Expand All @@ -107,16 +106,14 @@ public boolean insertSpanChunk(SpanChunkBo spanChunkBo) {

final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return true;
return;
}

this.spanChunkSerializer.serialize(spanChunkBo, put, null);

if (!put.isEmpty()) {
TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable());
return hbaseTemplate.asyncPut(traceTableName, put);
writer.write(traceTableName, put);
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.navercorp.pinpoint.collector.monitor;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorWriter;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class BufferedMutatorMetrics implements MetricSet {
private static final String HBASE_ASYNC_OPS = "hbase.bufferedmutator.ops";

private static final String COUNT = HBASE_ASYNC_OPS + ".success.count";
private static final String REJECTED_COUNT = HBASE_ASYNC_OPS + ".rejected.count";

private final BufferedMutatorWriter writer;

public BufferedMutatorMetrics(BufferedMutatorWriter writer) {
this.writer = Objects.requireNonNull(writer, "writer");
}

@Override
public Map<String, Metric> getMetrics() {

final Map<String, Metric> gauges = new HashMap<>(3);
gauges.put(COUNT, new Gauge<Long>() {
@Override
public Long getValue() {
return writer.getSuccessCount();
}
});
gauges.put(REJECTED_COUNT, new Gauge<Long>() {
@Override
public Long getValue() {
return writer.getErrorCount();
}
});

return gauges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@
<constructor-arg ref="hbaseTemplate"/>
</bean>

<bean id="bufferedMutatorConfiguration" class="com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorConfiguration"/>
<bean id="hbaseBatchWriter" class="com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorWriter">
<constructor-arg ref="connectionFactory"/>
</bean>

<bean id="hbaseBatchPutWriter" class="com.navercorp.pinpoint.common.hbase.batch.DelegateSimpleBatchWriter"/>

<bean id="hbaseAdminFactory" class="com.navercorp.pinpoint.common.hbase.HbaseAdminFactory">
<constructor-arg ref="connectionFactory"/>
</bean>
Expand Down
7 changes: 6 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ hbase.client.async.flush.period.ms=100
# the max number of the retry attempts to insert queue before dropping the request. default:10000
hbase.client.async.max.retries.in.queue=10000

hbase.client.compatibility-check=true
hbase.client.compatibility-check=true

collector.batchwrite.enable=true
collector.batchwrite.timertick=100
collector.batchwrite.writebuffer.size=5012
collector.batchwrite.writebuffer.heaplimit=100MB
3 changes: 3 additions & 0 deletions commons-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<sniffer.artifactid>java18</sniffer.artifactid>

<log4j2.version>${log4j2-jdk8.version}</log4j2.version>
<spring.version>${spring5.version}</spring.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -71,6 +72,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
Expand All @@ -81,6 +83,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;


public interface SimpleBatchWriter {

void write(TableName tableName, Put mutation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.navercorp.pinpoint.common.hbase.batch;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.unit.DataSize;

public class BufferedMutatorConfiguration {

@Value("${collector.batchwrite.enable:true}")
private boolean batchWriter = true;

@Value("${collector.batchwrite.timertick:100}")
private long writeBufferPeriodicFlushTimerTickMs = 100;

@Value("${collector.batchwrite.writebuffer.size:5012}")
private long writeBufferSize = 1024 * 5;

// for OOM prevent
@Value("${collector.batchwrite.writebuffer.heaplimit:100MB}")
private DataSize writeBufferHeapLimit = DataSize.ofMegabytes(100);

public boolean isBatchWriter() {
return batchWriter;
}

public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}

public long getWriteBufferSize() {
return writeBufferSize;
}

public long getWriteBufferHeapLimit() {
return writeBufferHeapLimit.toBytes();
}

@Override
public String toString() {
return "BufferedMutatorConfiguration{" +
"batchWriter=" + batchWriter +
", writeBufferPeriodicFlushTimerTickMs=" + writeBufferPeriodicFlushTimerTickMs +
", writeBufferSize=" + writeBufferSize +
", writeBufferHeapLimit=" + writeBufferHeapLimit +
'}';
}
}
Loading

0 comments on commit cc9a773

Please sign in to comment.