Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#8372] Replace HTableMultiplexer with BufferedMutator #8373

Merged
merged 1 commit into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* @author Woonduk Kang(emeroad)
*/
public interface TraceDao {
boolean insert(SpanBo span);
void insert(SpanBo span);

boolean insertSpanChunk(SpanChunkBo spanChunk);
void insertSpanChunk(SpanChunkBo spanChunk);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.collector.util.CollectorUtils;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.profiler.util.TransactionId;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
Expand Down Expand Up @@ -49,7 +49,7 @@ public class HbaseTraceDaoV2 implements TraceDao {

private static final HbaseColumnFamily.Trace descriptor = HbaseColumnFamily.TRACE_V2_SPAN;

private final HbaseOperations2 hbaseTemplate;
private final SimpleBatchWriter writer;
private final TableNameProvider tableNameProvider;

private final SpanSerializerV2 spanSerializer;
Expand All @@ -58,20 +58,20 @@ public class HbaseTraceDaoV2 implements TraceDao {

private final RowKeyEncoder<TransactionId> rowKeyEncoder;

public HbaseTraceDaoV2(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseTraceDaoV2(SimpleBatchWriter writer,
TableNameProvider tableNameProvider,
@Qualifier("traceRowKeyEncoderV2") RowKeyEncoder<TransactionId> rowKeyEncoder,
SpanSerializerV2 spanSerializer,
SpanChunkSerializerV2 spanChunkSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.writer = Objects.requireNonNull(writer, "writer");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
this.rowKeyEncoder = Objects.requireNonNull(rowKeyEncoder, "rowKeyEncoder");
this.spanSerializer = Objects.requireNonNull(spanSerializer, "spanSerializer");
this.spanChunkSerializer = Objects.requireNonNull(spanChunkSerializer, "spanChunkSerializer");
}

@Override
public boolean insert(final SpanBo spanBo) {
public void insert(final SpanBo spanBo) {
Objects.requireNonNull(spanBo, "spanBo");
if (logger.isDebugEnabled()) {
logger.debug("insert trace: {}", spanBo);
Expand All @@ -91,12 +91,11 @@ public boolean insert(final SpanBo spanBo) {
this.spanSerializer.serialize(spanBo, put, null);

TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable());

return hbaseTemplate.asyncPut(traceTableName, put);
writer.write(traceTableName, put);
}

@Override
public boolean insertSpanChunk(SpanChunkBo spanChunkBo) {
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
Objects.requireNonNull(spanChunkBo, "spanChunkBo");

TransactionId transactionId = spanChunkBo.getTransactionId();
Expand All @@ -107,16 +106,14 @@ public boolean insertSpanChunk(SpanChunkBo spanChunkBo) {

final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return true;
return;
}

this.spanChunkSerializer.serialize(spanChunkBo, put, null);

if (!put.isEmpty()) {
TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable());
return hbaseTemplate.asyncPut(traceTableName, put);
writer.write(traceTableName, put);
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.navercorp.pinpoint.collector.monitor;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorWriter;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class BufferedMutatorMetrics implements MetricSet {
private static final String HBASE_ASYNC_OPS = "hbase.bufferedmutator.ops";

private static final String COUNT = HBASE_ASYNC_OPS + ".success.count";
private static final String REJECTED_COUNT = HBASE_ASYNC_OPS + ".rejected.count";

private final BufferedMutatorWriter writer;

public BufferedMutatorMetrics(BufferedMutatorWriter writer) {
this.writer = Objects.requireNonNull(writer, "writer");
}

@Override
public Map<String, Metric> getMetrics() {

final Map<String, Metric> gauges = new HashMap<>(3);
gauges.put(COUNT, new Gauge<Long>() {
@Override
public Long getValue() {
return writer.getSuccessCount();
}
});
gauges.put(REJECTED_COUNT, new Gauge<Long>() {
@Override
public Long getValue() {
return writer.getErrorCount();
}
});

return gauges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@
<constructor-arg ref="hbaseTemplate"/>
</bean>

<bean id="bufferedMutatorConfiguration" class="com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorConfiguration"/>
<bean id="hbaseBatchWriter" class="com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorWriter">
<constructor-arg ref="connectionFactory"/>
</bean>

<bean id="hbaseBatchPutWriter" class="com.navercorp.pinpoint.common.hbase.batch.DelegateSimpleBatchWriter"/>

<bean id="hbaseAdminFactory" class="com.navercorp.pinpoint.common.hbase.HbaseAdminFactory">
<constructor-arg ref="connectionFactory"/>
</bean>
Expand Down
7 changes: 6 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ hbase.client.async.flush.period.ms=100
# the max number of the retry attempts to insert queue before dropping the request. default:10000
hbase.client.async.max.retries.in.queue=10000

hbase.client.compatibility-check=true
hbase.client.compatibility-check=true

collector.batchwrite.enable=true
collector.batchwrite.timertick=100
collector.batchwrite.writebuffer.size=5012
collector.batchwrite.writebuffer.heaplimit=100MB
3 changes: 3 additions & 0 deletions commons-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<sniffer.artifactid>java18</sniffer.artifactid>

<log4j2.version>${log4j2-jdk8.version}</log4j2.version>
<spring.version>${spring5.version}</spring.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -71,6 +72,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
Expand All @@ -81,6 +83,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;


public interface SimpleBatchWriter {

void write(TableName tableName, Put mutation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.navercorp.pinpoint.common.hbase.batch;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.unit.DataSize;

public class BufferedMutatorConfiguration {

@Value("${collector.batchwrite.enable:true}")
private boolean batchWriter = true;

@Value("${collector.batchwrite.timertick:100}")
private long writeBufferPeriodicFlushTimerTickMs = 100;

@Value("${collector.batchwrite.writebuffer.size:5012}")
private long writeBufferSize = 1024 * 5;

// for OOM prevent
@Value("${collector.batchwrite.writebuffer.heaplimit:100MB}")
private DataSize writeBufferHeapLimit = DataSize.ofMegabytes(100);

public boolean isBatchWriter() {
return batchWriter;
}

public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}

public long getWriteBufferSize() {
return writeBufferSize;
}

public long getWriteBufferHeapLimit() {
return writeBufferHeapLimit.toBytes();
}

@Override
public String toString() {
return "BufferedMutatorConfiguration{" +
"batchWriter=" + batchWriter +
", writeBufferPeriodicFlushTimerTickMs=" + writeBufferPeriodicFlushTimerTickMs +
", writeBufferSize=" + writeBufferSize +
", writeBufferHeapLimit=" + writeBufferHeapLimit +
'}';
}
}
Loading