Skip to content

Commit

Permalink
[#11150] Add WAL durability setting to collector
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 14, 2024
1 parent b4a3742 commit ea6b21f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.collector.util.CollectorUtils;
import com.navercorp.pinpoint.collector.util.DurabilityApplier;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.hbase.async.HbasePutWriter;
Expand All @@ -34,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand Down Expand Up @@ -62,17 +64,24 @@ public class HbaseTraceDaoV2 implements TraceDao {
private final RowKeyEncoder<TransactionId> rowKeyEncoder;
private final HbasePutWriter putWriter;

private final DurabilityApplier durabilityApplier;

public HbaseTraceDaoV2(@Qualifier("spanPutWriter")
HbasePutWriter putWriter,
TableNameProvider tableNameProvider,
@Qualifier("traceRowKeyEncoderV2") RowKeyEncoder<TransactionId> rowKeyEncoder,
SpanSerializerV2 spanSerializer,
SpanChunkSerializerV2 spanChunkSerializer) {
SpanChunkSerializerV2 spanChunkSerializer,
@Value("${collector.span.durability:USE_DEFAULT}")
String spanDurability) {
this.putWriter = Objects.requireNonNull(putWriter, "putWriter");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
this.rowKeyEncoder = Objects.requireNonNull(rowKeyEncoder, "rowKeyEncoder");
this.spanSerializer = Objects.requireNonNull(spanSerializer, "spanSerializer");
this.spanChunkSerializer = Objects.requireNonNull(spanChunkSerializer, "spanChunkSerializer");

this.durabilityApplier = new DurabilityApplier(spanDurability);
logger.info("Span(Trace Put) durability:{}", durabilityApplier);
}

@Override
Expand Down Expand Up @@ -109,6 +118,8 @@ public CompletableFuture<Void> asyncInsert(final SpanBo spanBo) {
final byte[] rowKey = this.rowKeyEncoder.encodeRowKey(transactionId);
final Put put = new Put(rowKey, acceptedTime, true);

this.durabilityApplier.apply(put);

this.spanSerializer.serialize(spanBo, put, null);

TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.navercorp.pinpoint.collector.util;

import com.navercorp.pinpoint.common.util.StringUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;

import java.util.Objects;

public class DurabilityApplier {
private final Durability durability;

public DurabilityApplier(String durability) {
this(durability, Durability.USE_DEFAULT);
}

public DurabilityApplier(String durability, Durability defaultDurability) {
this.durability = toDurability(durability, defaultDurability);
}

public static Durability toDurability(String durability, Durability defaultDurability) {
Objects.requireNonNull(defaultDurability, "defaultDurability");

if (StringUtils.isEmpty(durability)) {
return defaultDurability;
}
try {
return Durability.valueOf(durability.toUpperCase());
} catch (IllegalArgumentException e) {
return defaultDurability;
}
}

public void apply(Mutation mutation) {
if (mutation == null) {
return;
}
mutation.setDurability(durability);
}

public Durability getDurability() {
return durability;
}

@Override
public String toString() {
return "DurabilityConfiguration{" +
"durability=" + durability +
'}';
}
}
13 changes: 12 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0


# hbase async =================================================================
# enable hbase async operation. default: false
hbase.client.async.enable=false
Expand All @@ -38,4 +39,14 @@ hbase.client.compatibility-check=true
collector.batchwrite.enable=false
collector.batchwrite.timertick=100
collector.batchwrite.writebuffer.size=5012
collector.batchwrite.writebuffer.heaplimit=100MB
collector.batchwrite.writebuffer.heaplimit=100MB


# Set span wal durability. default:USE_DEFAULT
# https://hbase.apache.org/2.3/book.html#wal.durability
# USE_DEFAULT: default durability
# SKIP_WAL: skip write to WAL
# ASYNC_WAL: async write to WAL
# SYNC_WAL: sync write to WAL
# FSYNC_WAL: fsync write to WAL
collector.span.durability=USE_DEFAULT
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.navercorp.pinpoint.collector.util;

import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class DurabilityApplierTest {

@Test
void null_to_default() {
DurabilityApplier config = new DurabilityApplier(null);
assertEquals(Durability.USE_DEFAULT, config.getDurability());
}

@Test
void skipwal() {
DurabilityApplier config = new DurabilityApplier("SKIP_WAL");
assertEquals(Durability.SKIP_WAL, config.getDurability());
}

@Test
void skipwal_uppercase() {
DurabilityApplier configuration = new DurabilityApplier("skip_wal");
assertEquals(Durability.SKIP_WAL, configuration.getDurability());
}

@Test
void skipwal_unknown() {
DurabilityApplier configuration = new DurabilityApplier("unknown");
assertEquals(Durability.USE_DEFAULT, configuration.getDurability());
}


@Test
void testPut() {
Put put = new Put(new byte[1]);

DurabilityApplier applier = new DurabilityApplier(Durability.SKIP_WAL.toString());
applier.apply(put);

assertEquals(Durability.SKIP_WAL, put.getDurability());
}
}

0 comments on commit ea6b21f

Please sign in to comment.