diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java index 5d735879b8eb..183617fa4111 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.collector.dao.TraceDao; import com.navercorp.pinpoint.collector.util.CollectorUtils; +import com.navercorp.pinpoint.collector.util.DurabilityApplier; import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.hbase.async.HbasePutWriter; @@ -34,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; import java.util.List; @@ -62,17 +64,24 @@ public class HbaseTraceDaoV2 implements TraceDao { private final RowKeyEncoder rowKeyEncoder; private final HbasePutWriter putWriter; + private final DurabilityApplier durabilityApplier; + public HbaseTraceDaoV2(@Qualifier("spanPutWriter") HbasePutWriter putWriter, TableNameProvider tableNameProvider, @Qualifier("traceRowKeyEncoderV2") RowKeyEncoder rowKeyEncoder, SpanSerializerV2 spanSerializer, - SpanChunkSerializerV2 spanChunkSerializer) { + SpanChunkSerializerV2 spanChunkSerializer, + @Value("${collector.span.durability:USE_DEFAULT}") + String spanDurability) { this.putWriter = Objects.requireNonNull(putWriter, "putWriter"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); this.rowKeyEncoder = Objects.requireNonNull(rowKeyEncoder, "rowKeyEncoder"); this.spanSerializer = Objects.requireNonNull(spanSerializer, "spanSerializer"); this.spanChunkSerializer = Objects.requireNonNull(spanChunkSerializer, "spanChunkSerializer"); + + this.durabilityApplier = new DurabilityApplier(spanDurability); + logger.info("Span(Trace Put) durability:{}", durabilityApplier); } @Override @@ -109,6 +118,8 @@ public CompletableFuture asyncInsert(final SpanBo spanBo) { final byte[] rowKey = this.rowKeyEncoder.encodeRowKey(transactionId); final Put put = new Put(rowKey, acceptedTime, true); + this.durabilityApplier.apply(put); + this.spanSerializer.serialize(spanBo, put, null); TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable()); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/util/DurabilityApplier.java b/collector/src/main/java/com/navercorp/pinpoint/collector/util/DurabilityApplier.java new file mode 100644 index 000000000000..751ef2efb492 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/util/DurabilityApplier.java @@ -0,0 +1,50 @@ +package com.navercorp.pinpoint.collector.util; + +import com.navercorp.pinpoint.common.util.StringUtils; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; + +import java.util.Objects; + +public class DurabilityApplier { + private final Durability durability; + + public DurabilityApplier(String durability) { + this(durability, Durability.USE_DEFAULT); + } + + public DurabilityApplier(String durability, Durability defaultDurability) { + this.durability = toDurability(durability, defaultDurability); + } + + public static Durability toDurability(String durability, Durability defaultDurability) { + Objects.requireNonNull(defaultDurability, "defaultDurability"); + + if (StringUtils.isEmpty(durability)) { + return defaultDurability; + } + try { + return Durability.valueOf(durability.toUpperCase()); + } catch (IllegalArgumentException e) { + return defaultDurability; + } + } + + public void apply(Mutation mutation) { + if (mutation == null) { + return; + } + mutation.setDurability(durability); + } + + public Durability getDurability() { + return durability; + } + + @Override + public String toString() { + return "DurabilityConfiguration{" + + "durability=" + durability + + '}'; + } +} diff --git a/collector/src/main/resources/hbase-root.properties b/collector/src/main/resources/hbase-root.properties index e8e3f1927e74..2fddeff4b0e2 100644 --- a/collector/src/main/resources/hbase-root.properties +++ b/collector/src/main/resources/hbase-root.properties @@ -27,6 +27,7 @@ hbase.client.properties.hbase.client.retries.number=4 hbase.client.put-writer.concurrency-limit=100000 hbase.client.span-put-writer.concurrency-limit=0 + # hbase async ================================================================= # enable hbase async operation. default: false hbase.client.async.enable=false @@ -38,4 +39,14 @@ hbase.client.compatibility-check=true collector.batchwrite.enable=false collector.batchwrite.timertick=100 collector.batchwrite.writebuffer.size=5012 -collector.batchwrite.writebuffer.heaplimit=100MB \ No newline at end of file +collector.batchwrite.writebuffer.heaplimit=100MB + + +# Set span wal durability. default:USE_DEFAULT +# https://hbase.apache.org/2.3/book.html#wal.durability +# USE_DEFAULT: default durability +# SKIP_WAL: skip write to WAL +# ASYNC_WAL: async write to WAL +# SYNC_WAL: sync write to WAL +# FSYNC_WAL: fsync write to WAL +collector.span.durability=USE_DEFAULT \ No newline at end of file diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/util/DurabilityApplierTest.java b/collector/src/test/java/com/navercorp/pinpoint/collector/util/DurabilityApplierTest.java new file mode 100644 index 000000000000..009b32b0ce06 --- /dev/null +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/util/DurabilityApplierTest.java @@ -0,0 +1,45 @@ +package com.navercorp.pinpoint.collector.util; + +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class DurabilityApplierTest { + + @Test + void null_to_default() { + DurabilityApplier config = new DurabilityApplier(null); + assertEquals(Durability.USE_DEFAULT, config.getDurability()); + } + + @Test + void skipwal() { + DurabilityApplier config = new DurabilityApplier("SKIP_WAL"); + assertEquals(Durability.SKIP_WAL, config.getDurability()); + } + + @Test + void skipwal_uppercase() { + DurabilityApplier configuration = new DurabilityApplier("skip_wal"); + assertEquals(Durability.SKIP_WAL, configuration.getDurability()); + } + + @Test + void skipwal_unknown() { + DurabilityApplier configuration = new DurabilityApplier("unknown"); + assertEquals(Durability.USE_DEFAULT, configuration.getDurability()); + } + + + @Test + void testPut() { + Put put = new Put(new byte[1]); + + DurabilityApplier applier = new DurabilityApplier(Durability.SKIP_WAL.toString()); + applier.apply(put); + + assertEquals(Durability.SKIP_WAL, put.getDurability()); + } +} \ No newline at end of file