Skip to content

Commit

Permalink
[#8691] Refactoring Batch writer
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Mar 22, 2022
1 parent b6276c9 commit ef95dbb
Show file tree
Hide file tree
Showing 28 changed files with 63 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Collections;
Expand All @@ -55,7 +54,7 @@ public class HbaseAgentUriStatDao implements AgentUriStatDao {

private final AgentUriStatSerializer agentUriStatSerializer;

public HbaseAgentUriStatDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseAgentUriStatDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
AgentUriStatSerializer agentUriStatSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class HbaseApplicationTraceIndexDao implements ApplicationTraceIndexDao {

private final ApplicationNameRowKeyEncoder rowKeyEncoder = new ApplicationNameRowKeyEncoder();

public HbaseApplicationTraceIndexDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseApplicationTraceIndexDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
@Qualifier("applicationTraceIndexDistributor") AbstractRowKeyDistributor rowKeyDistributor,
AcceptedTimeService acceptedTimeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.navercorp.pinpoint.common.util.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -45,7 +44,7 @@ public class HBaseLoadedClassDao implements AgentStatDaoV2<LoadedClassBo> {

private final LoadedClassSerializer loadedClassSerializer;

public HBaseLoadedClassDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HBaseLoadedClassDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
LoadedClassSerializer loadedClassSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -46,8 +45,10 @@ public class HBaseTotalThreadCountDao implements AgentStatDaoV2<TotalThreadCount

private final TotalThreadCountSerializer totalThreadCountSerializer;

public HBaseTotalThreadCountDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, TotalThreadCountSerializer totalThreadCountSerializer) {
public HBaseTotalThreadCountDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
TotalThreadCountSerializer totalThreadCountSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
this.agentStatHbaseOperationFactory = Objects.requireNonNull(agentStatHbaseOperationFactory, "agentStatHbaseOperationFactory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseActiveTraceDao implements AgentStatDaoV2<ActiveTraceBo> {

private final ActiveTraceSerializer activeTraceSerializer;

public HbaseActiveTraceDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider,
public HbaseActiveTraceDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, ActiveTraceSerializer activeTraceSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseCpuLoadDao implements AgentStatDaoV2<CpuLoadBo> {

private final CpuLoadSerializer cpuLoadSerializer;

public HbaseCpuLoadDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseCpuLoadDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider, AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
CpuLoadSerializer cpuLoadSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
Expand All @@ -55,7 +54,7 @@ public class HbaseDataSourceListDao implements AgentStatDaoV2<DataSourceListBo>

private final DataSourceSerializer dataSourceSerializer;

public HbaseDataSourceListDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider,
public HbaseDataSourceListDao(HbaseOperations2 hbaseTemplate, TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, DataSourceSerializer dataSourceSerializer) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -49,7 +48,7 @@ public class HbaseDeadlockThreadCountDao implements AgentStatDaoV2<DeadlockThrea

private final DeadlockThreadCountSerializer deadlockThreadCountSerializer;

public HbaseDeadlockThreadCountDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseDeadlockThreadCountDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
DeadlockThreadCountSerializer deadlockThreadCountSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseDirectBufferDao implements AgentStatDaoV2<DirectBufferBo> {

private final DirectBufferSerializer directBufferSerializer;

public HbaseDirectBufferDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseDirectBufferDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
DirectBufferSerializer directBufferSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseFileDescriptorDao implements AgentStatDaoV2<FileDescriptorBo>

private final FileDescriptorSerializer fileDescriptorSerializer;

public HbaseFileDescriptorDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseFileDescriptorDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
FileDescriptorSerializer fileDescriptorSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseJvmGcDao implements AgentStatDaoV2<JvmGcBo> {

private final JvmGcSerializer jvmGcSerializer;

public HbaseJvmGcDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseJvmGcDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
JvmGcSerializer jvmGcSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseJvmGcDetailedDao implements AgentStatDaoV2<JvmGcDetailedBo> {

private final JvmGcDetailedSerializer jvmGcDetailedSerializer;

public HbaseJvmGcDetailedDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseJvmGcDetailedDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory, JvmGcDetailedSerializer jvmGcDetailedSerializer) {
this.hbaseTemplate = hbaseTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -50,7 +49,7 @@ public class HbaseResponseTimeDao implements AgentStatDaoV2<ResponseTimeBo> {

private final ResponseTimeSerializer responseTimeSerializer;

public HbaseResponseTimeDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseResponseTimeDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
ResponseTimeSerializer responseTimeSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -49,7 +48,7 @@ public class HbaseTransactionDao implements AgentStatDaoV2<TransactionBo> {

private final TransactionSerializer transactionSerializer;

public HbaseTransactionDao(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate,
public HbaseTransactionDao(HbaseOperations2 hbaseTemplate,
TableNameProvider tableNameProvider,
AgentStatHbaseOperationFactory agentStatHbaseOperationFactory,
TransactionSerializer transactionSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@
</property>
</bean>

<bean id="hbaseThreadPool" class="com.navercorp.pinpoint.common.server.util.PinpointThreadPoolExecutorFactoryBean">
<bean id="abstractHbaseThreadPool" class="com.navercorp.pinpoint.common.server.util.PinpointThreadPoolExecutorFactoryBean" abstract="true">
<property name="corePoolSize" value="${hbase.client.thread.max}"/>
<property name="maxPoolSize" value="${hbase.client.thread.max}"/>
<property name="queueCapacity" value="${hbase.client.threadPool.queueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-HConnectionExecutor-"/>
<property name="threadNamePrefix" value="HConnectionExecutor-"/>
<property name="daemon" value="true"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<property name="awaitTerminationSeconds" value="10"/>
<property name="preStartAllCoreThreads" value="true"/>
</bean>

<bean id="hbaseThreadPool" parent="abstractHbaseThreadPool">
<property name="threadNamePrefix" value="Pinpoint-HConnectionExecutor-"/>
</bean>

<bean id="connectionFactory" class="com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean">
<constructor-arg type="org.apache.hadoop.conf.Configuration" ref="hbaseConfiguration"/>
<constructor-arg type="java.util.concurrent.ExecutorService" ref="hbaseThreadPool"/>
Expand All @@ -84,19 +88,8 @@
<property name="asyncOperation" ref="asyncOperation"/>
</bean>

<bean id="asyncPutHbaseTemplate" class="com.navercorp.pinpoint.common.hbase.AsyncPutHbaseTemplate2">
<constructor-arg ref="hbaseTemplate"/>
</bean>

<bean id="batchHbaseThreadPool" class="com.navercorp.pinpoint.common.server.util.PinpointThreadPoolExecutorFactoryBean">
<property name="corePoolSize" value="${hbase.client.thread.max}"/>
<property name="maxPoolSize" value="${hbase.client.thread.max}"/>
<property name="queueCapacity" value="${hbase.client.threadPool.queueSize}"/>
<bean id="batchHbaseThreadPool" parent="abstractHbaseThreadPool">
<property name="threadNamePrefix" value="Pinpoint-Batch-HConnectionExecutor-"/>
<property name="daemon" value="true"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<property name="awaitTerminationSeconds" value="10"/>
<property name="preStartAllCoreThreads" value="true"/>
</bean>

<bean id="batchConnectionFactory" class="com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.common.hbase.batch;

import com.navercorp.pinpoint.common.hbase.HBaseAsyncOperation;
import com.navercorp.pinpoint.common.hbase.HbaseTemplate2;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import org.springframework.beans.factory.FactoryBean;
Expand All @@ -11,10 +12,15 @@ public class SimpleBatchWriterFactoryBean implements FactoryBean<SimpleBatchWrit

public SimpleBatchWriterFactoryBean(BufferedMutatorConfiguration configuration,
HbaseBatchWriter hbaseBatchWriter,
HBaseAsyncOperation asyncOperation,
HbaseTemplate2 HbaseTemplate2) {
if (configuration != null && configuration.isBatchWriter()) {
this.batchWriter = new SimpleBufferWriter(hbaseBatchWriter);
} else {
}
else if (asyncOperation.isAvailable()) {
this.batchWriter = new TableMultiplexerWriter(asyncOperation);
}
else {
this.batchWriter = new HbaseTemplateWriter(HbaseTemplate2);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.navercorp.pinpoint.common.hbase.batch;

import com.navercorp.pinpoint.common.hbase.HBaseAsyncOperation;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;

import java.util.Objects;

public class TableMultiplexerWriter implements SimpleBatchWriter {
private final HBaseAsyncOperation hbaseAsyncOperation;

public TableMultiplexerWriter(HBaseAsyncOperation hbaseAsyncOperation) {
this.hbaseAsyncOperation = Objects.requireNonNull(hbaseAsyncOperation, "hbaseAsyncOperation");

}

@Override
public boolean write(TableName tableName, Put mutation) {
return hbaseAsyncOperation.put(tableName, mutation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.Date;
import java.util.List;
Expand All @@ -45,7 +44,7 @@ public class ActiveTraceDao {
private final ActiveTraceSerializer activeTraceSerializer;
private final TableNameProvider tableNameProvider;

public ActiveTraceDao(@Qualifier("asyncPutHbaseTemplate") HbaseTemplate2 hbaseTemplate2,
public ActiveTraceDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
ActiveTraceSerializer activeTraceSerializer,
TableNameProvider tableNameProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.Date;
import java.util.List;
Expand All @@ -44,7 +43,7 @@ public class CpuLoadDao {
private final CpuLoadSerializer cpuLoadSerializer;
private final TableNameProvider tableNameProvider;

public CpuLoadDao(@Qualifier("asyncPutHbaseTemplate") HbaseTemplate2 hbaseTemplate2,
public CpuLoadDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
CpuLoadSerializer cpuLoadSerializer,
TableNameProvider tableNameProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.Date;
import java.util.List;
Expand All @@ -44,7 +43,7 @@ public class DataSourceDao {
private final DataSourceSerializer dataSourceSerializer;
private final TableNameProvider tableNameProvider;

public DataSourceDao(@Qualifier("asyncPutHbaseTemplate") HbaseTemplate2 hbaseTemplate2,
public DataSourceDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
DataSourceSerializer dataSourceSerializer,
TableNameProvider tableNameProvider) {
Expand Down
Loading

0 comments on commit ef95dbb

Please sign in to comment.