From ef95dbb9d1017d6a4eacfadc34230c28e35ec380 Mon Sep 17 00:00:00 2001 From: emeroad Date: Tue, 22 Mar 2022 18:08:51 +0900 Subject: [PATCH] [#8691] Refactoring Batch writer --- .../dao/hbase/HbaseAgentUriStatDao.java | 3 +-- .../hbase/HbaseApplicationTraceIndexDao.java | 2 +- .../dao/hbase/stat/HBaseLoadedClassDao.java | 3 +-- .../hbase/stat/HBaseTotalThreadCountDao.java | 7 +++--- .../dao/hbase/stat/HbaseActiveTraceDao.java | 3 +-- .../dao/hbase/stat/HbaseCpuLoadDao.java | 3 +-- .../hbase/stat/HbaseDataSourceListDao.java | 3 +-- .../stat/HbaseDeadlockThreadCountDao.java | 3 +-- .../dao/hbase/stat/HbaseDirectBufferDao.java | 3 +-- .../hbase/stat/HbaseFileDescriptorDao.java | 3 +-- .../dao/hbase/stat/HbaseJvmGcDao.java | 3 +-- .../dao/hbase/stat/HbaseJvmGcDetailedDao.java | 3 +-- .../dao/hbase/stat/HbaseResponseTimeDao.java | 3 +-- .../dao/hbase/stat/HbaseTransactionDao.java | 3 +-- .../applicationContext-collector-hbase.xml | 21 ++++++------------ .../batch/SimpleBatchWriterFactoryBean.java | 8 ++++++- .../hbase/batch/TableMultiplexerWriter.java | 22 +++++++++++++++++++ .../flink/dao/hbase/ActiveTraceDao.java | 3 +-- .../pinpoint/flink/dao/hbase/CpuLoadDao.java | 3 +-- .../flink/dao/hbase/DataSourceDao.java | 3 +-- .../flink/dao/hbase/DirectBufferDao.java | 2 +- .../flink/dao/hbase/FileDescriptorDao.java | 3 +-- .../flink/dao/hbase/LoadedClassDao.java | 3 +-- .../pinpoint/flink/dao/hbase/MemoryDao.java | 3 +-- .../flink/dao/hbase/ResponseTimeDao.java | 3 +-- .../flink/dao/hbase/TotalThreadCountDao.java | 3 +-- .../flink/dao/hbase/TransactionDao.java | 3 +-- .../resources/applicationContext-hbase.xml | 5 ----- 28 files changed, 63 insertions(+), 67 deletions(-) create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/TableMultiplexerWriter.java diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseAgentUriStatDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseAgentUriStatDao.java index 7660fdef4e6d..257ab1f88e85 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseAgentUriStatDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseAgentUriStatDao.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.Collections; @@ -55,7 +54,7 @@ public class HbaseAgentUriStatDao implements AgentUriStatDao { private final AgentUriStatSerializer agentUriStatSerializer; - public HbaseAgentUriStatDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseAgentUriStatDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, AgentUriStatSerializer agentUriStatSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationTraceIndexDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationTraceIndexDao.java index cdbe4e8526cb..2356751d7718 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationTraceIndexDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationTraceIndexDao.java @@ -68,7 +68,7 @@ public class HbaseApplicationTraceIndexDao implements ApplicationTraceIndexDao { private final ApplicationNameRowKeyEncoder rowKeyEncoder = new ApplicationNameRowKeyEncoder(); - public HbaseApplicationTraceIndexDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseApplicationTraceIndexDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, @Qualifier("applicationTraceIndexDistributor") AbstractRowKeyDistributor rowKeyDistributor, AcceptedTimeService acceptedTimeService, diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseLoadedClassDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseLoadedClassDao.java index 113e3e9c9863..c48eedd02c87 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseLoadedClassDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseLoadedClassDao.java @@ -29,7 +29,6 @@ import com.navercorp.pinpoint.common.util.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -45,7 +44,7 @@ public class HBaseLoadedClassDao implements AgentStatDaoV2 { private final LoadedClassSerializer loadedClassSerializer; - public HBaseLoadedClassDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HBaseLoadedClassDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, LoadedClassSerializer loadedClassSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseTotalThreadCountDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseTotalThreadCountDao.java index 25bee2a77138..f86faa8ad440 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseTotalThreadCountDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HBaseTotalThreadCountDao.java @@ -29,7 +29,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -46,8 +45,10 @@ public class HBaseTotalThreadCountDao implements AgentStatDaoV2 { private final ActiveTraceSerializer activeTraceSerializer; - public HbaseActiveTraceDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, + public HbaseActiveTraceDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, ActiveTraceSerializer activeTraceSerializer) { this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseCpuLoadDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseCpuLoadDao.java index 3d63c1b4ecff..06477f76dea2 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseCpuLoadDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseCpuLoadDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -50,7 +49,7 @@ public class HbaseCpuLoadDao implements AgentStatDaoV2 { private final CpuLoadSerializer cpuLoadSerializer; - public HbaseCpuLoadDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseCpuLoadDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, CpuLoadSerializer cpuLoadSerializer) { this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDataSourceListDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDataSourceListDao.java index 6338ea6b5ee7..1bccd95f3006 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDataSourceListDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDataSourceListDao.java @@ -33,7 +33,6 @@ import org.apache.commons.collections4.map.MultiKeyMap; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.ArrayList; @@ -55,7 +54,7 @@ public class HbaseDataSourceListDao implements AgentStatDaoV2 private final DataSourceSerializer dataSourceSerializer; - public HbaseDataSourceListDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, + public HbaseDataSourceListDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, DataSourceSerializer dataSourceSerializer) { this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDeadlockThreadCountDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDeadlockThreadCountDao.java index 9c1cdc0e4f90..aa884ef2a8ba 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDeadlockThreadCountDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseDeadlockThreadCountDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -49,7 +48,7 @@ public class HbaseDeadlockThreadCountDao implements AgentStatDaoV2 { private final DirectBufferSerializer directBufferSerializer; - public HbaseDirectBufferDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseDirectBufferDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, DirectBufferSerializer directBufferSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseFileDescriptorDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseFileDescriptorDao.java index 1b8a3c641dbe..8284d9df75ee 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseFileDescriptorDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseFileDescriptorDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -50,7 +49,7 @@ public class HbaseFileDescriptorDao implements AgentStatDaoV2 private final FileDescriptorSerializer fileDescriptorSerializer; - public HbaseFileDescriptorDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseFileDescriptorDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, FileDescriptorSerializer fileDescriptorSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDao.java index 085632036b96..78a2ad45000c 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -50,7 +49,7 @@ public class HbaseJvmGcDao implements AgentStatDaoV2 { private final JvmGcSerializer jvmGcSerializer; - public HbaseJvmGcDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseJvmGcDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, JvmGcSerializer jvmGcSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDetailedDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDetailedDao.java index 1b5c7ae9c8fb..f3c16e0f00a7 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDetailedDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseJvmGcDetailedDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -50,7 +49,7 @@ public class HbaseJvmGcDetailedDao implements AgentStatDaoV2 { private final JvmGcDetailedSerializer jvmGcDetailedSerializer; - public HbaseJvmGcDetailedDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseJvmGcDetailedDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, JvmGcDetailedSerializer jvmGcDetailedSerializer) { this.hbaseTemplate = hbaseTemplate; diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseResponseTimeDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseResponseTimeDao.java index 40ea1c799238..4cc97eb8ac56 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseResponseTimeDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseResponseTimeDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -50,7 +49,7 @@ public class HbaseResponseTimeDao implements AgentStatDaoV2 { private final ResponseTimeSerializer responseTimeSerializer; - public HbaseResponseTimeDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseResponseTimeDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, ResponseTimeSerializer responseTimeSerializer) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseTransactionDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseTransactionDao.java index c6a0166138f0..b1ff3d093e31 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseTransactionDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/stat/HbaseTransactionDao.java @@ -30,7 +30,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.List; @@ -49,7 +48,7 @@ public class HbaseTransactionDao implements AgentStatDaoV2 { private final TransactionSerializer transactionSerializer; - public HbaseTransactionDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseTransactionDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, TransactionSerializer transactionSerializer) { diff --git a/collector/src/main/resources/applicationContext-collector-hbase.xml b/collector/src/main/resources/applicationContext-collector-hbase.xml index eccca6d4cbe7..a98ce0b7a76d 100644 --- a/collector/src/main/resources/applicationContext-collector-hbase.xml +++ b/collector/src/main/resources/applicationContext-collector-hbase.xml @@ -47,17 +47,21 @@ - + - + + + + + @@ -84,19 +88,8 @@ - - - - - - - - + - - - - diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java index 5e8d31a254a4..31387d814717 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java @@ -1,5 +1,6 @@ package com.navercorp.pinpoint.common.hbase.batch; +import com.navercorp.pinpoint.common.hbase.HBaseAsyncOperation; import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; import org.springframework.beans.factory.FactoryBean; @@ -11,10 +12,15 @@ public class SimpleBatchWriterFactoryBean implements FactoryBean - - - - -