diff --git a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/bo/serializer/stat/ApplicationStatHbaseOperationFactory.java b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/bo/serializer/stat/ApplicationStatHbaseOperationFactory.java index e497e6756671..9a79388f94a4 100644 --- a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/bo/serializer/stat/ApplicationStatHbaseOperationFactory.java +++ b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/bo/serializer/stat/ApplicationStatHbaseOperationFactory.java @@ -56,16 +56,16 @@ public ApplicationStatHbaseOperationFactory( this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor"); } - public List createPuts(String applicationId, List joinStatBoList, StatType statType, ApplicationStatSerializer applicationStatSerializer) { + public List createPuts(String applicationId, List joinStatBoList, StatType statType, ApplicationStatSerializer applicationStatSerializer) { if (CollectionUtils.isEmpty(joinStatBoList)) { return Collections.emptyList(); } - Map> timeslots = slotApplicationStatDataPoints(joinStatBoList); + Map> timeslots = slotApplicationStatDataPoints(joinStatBoList); List puts = new ArrayList<>(); - for (Map.Entry> timeslot : timeslots.entrySet()) { + for (Map.Entry> timeslot : timeslots.entrySet()) { long baseTimestamp = timeslot.getKey(); - List slottedApplicationStatDataPoints = timeslot.getValue(); + List slottedApplicationStatDataPoints = timeslot.getValue(); final ApplicationStatRowKeyComponent rowKeyComponent = new ApplicationStatRowKeyComponent(applicationId, statType, baseTimestamp); byte[] rowKey = this.rowKeyEncoder.encodeRowKey(rowKeyComponent); diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java b/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java index b3b71f99c5e5..ae836d623220 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java @@ -16,6 +16,7 @@ package com.navercorp.pinpoint.flink; import com.navercorp.pinpoint.collector.receiver.thrift.TCPReceiverBean; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; import com.navercorp.pinpoint.flink.cluster.FlinkServerRegister; import com.navercorp.pinpoint.flink.config.FlinkConfiguration; import com.navercorp.pinpoint.flink.dao.hbase.*; @@ -35,6 +36,8 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -55,16 +58,9 @@ public class Bootstrap { private final TcpDispatchHandler tcpDispatchHandler; private final TcpSourceFunction tcpSourceFunction; private final ApplicationCache applicationCache; - private final CpuLoadDao cpuLoadDao; - private final MemoryDao memoryDao; - private final TransactionDao transactionDao; - private final ActiveTraceDao activeTraceDao; - private final ResponseTimeDao responseTimeDao; - private final DataSourceDao dataSourceDao; - private final FileDescriptorDao fileDescriptorDao; - private final DirectBufferDao directBufferDao; - private final TotalThreadCountDao totalThreadCountDao; - private final LoadedClassDao loadedClassDao; + + private final List> applicationMetricDaoList; + private final TBaseFlatMapperInterceptor tBaseFlatMapperInterceptor; private final StatisticsDaoInterceptor statisticsDaoInterceptor; private final ApplicationStatBoWindowInterceptor applicationStatBoWindowInterceptor; @@ -79,24 +75,23 @@ private Bootstrap() { tcpSourceFunction = applicationContext.getBean("tcpSourceFunction", TcpSourceFunction.class); applicationCache = applicationContext.getBean("applicationCache", ApplicationCache.class); statisticsDao = applicationContext.getBean("statisticsDao", StatisticsDao.class); - cpuLoadDao = applicationContext.getBean("cpuLoadDao", CpuLoadDao.class); - memoryDao = applicationContext.getBean("memoryDao", MemoryDao.class); - transactionDao = applicationContext.getBean("transactionDao", TransactionDao.class); - activeTraceDao = applicationContext.getBean("activeTraceDao", ActiveTraceDao.class); - responseTimeDao = applicationContext.getBean("responseTimeDao", ResponseTimeDao.class); - dataSourceDao = applicationContext.getBean("dataSourceDao", DataSourceDao.class); - totalThreadCountDao = applicationContext.getBean("totalThreadCountDao", TotalThreadCountDao.class); - fileDescriptorDao = applicationContext.getBean("fileDescriptorDao", FileDescriptorDao.class); - directBufferDao = applicationContext.getBean("directBufferDao", DirectBufferDao.class); - loadedClassDao = applicationContext.getBean("loadedClassDao", LoadedClassDao.class); + + this.applicationMetricDaoList = getApplicationMetricDao(); + tBaseFlatMapperInterceptor = applicationContext.getBean("tBaseFlatMapperInterceptor", TBaseFlatMapperInterceptor.class); statisticsDaoInterceptor = applicationContext.getBean("statisticsDaoInterceptor", StatisticsDaoInterceptor.class); applicationStatBoWindowInterceptor = applicationContext.getBean("applicationStatBoWindowInterceptor", ApplicationStatBoWindowInterceptor.class); agentStatHandler = applicationContext.getBean("agentStatHandler", AgentStatHandler.class); } - public FileDescriptorDao getFileDescriptorDao() { - return fileDescriptorDao; + @SuppressWarnings("unchecked") + private List> getApplicationMetricDao() { + Map metricDaoMap = applicationContext.getBeansOfType(ApplicationMetricDao.class); + + metricDaoMap.forEach((beanName, applicationMetricDao) -> logger.info("ApplicationMetricDao BeanName:{}", beanName)); + + List values = new ArrayList<>(metricDaoMap.values()); + return (List>) (List) values; } public static Bootstrap getInstance(Map jobParameters) { @@ -122,38 +117,10 @@ public StatisticsDao getStatisticsDao() { return statisticsDao; } - public CpuLoadDao getCpuLoadDao() { - return cpuLoadDao; - } - - public MemoryDao getMemoryDao() { - return memoryDao; - } - - public TransactionDao getTransactionDao() { - return transactionDao; + public List> getApplicationMetricDaoList() { + return applicationMetricDaoList; } - public ActiveTraceDao getActiveTraceDao() { - return activeTraceDao; - } - - public ResponseTimeDao getResponseTimeDao() { - return responseTimeDao; - } - - public DataSourceDao getDataSourceDao() { - return dataSourceDao; - } - - public DirectBufferDao getDirectBufferDao() { - return directBufferDao; - } - - public TotalThreadCountDao getTotalThreadCountDao() { return totalThreadCountDao; } - - public LoadedClassDao getLoadedClassDao() { return loadedClassDao; } - public TBaseFlatMapper getTbaseFlatMapper() { return tbaseFlatMapper; } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ActiveTraceDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ActiveTraceDao.java index a4e49e9b4efc..93cb1635fb3c 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ActiveTraceDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ActiveTraceDao.java @@ -19,49 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ActiveTraceSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinActiveTraceBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class ActiveTraceDao { - private final Logger logger = LogManager.getLogger(this.getClass()); - +public class ActiveTraceDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final ActiveTraceSerializer activeTraceSerializer; - private final TableNameProvider tableNameProvider; - - public ActiveTraceDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - ActiveTraceSerializer activeTraceSerializer, + public ActiveTraceDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.activeTraceSerializer = Objects.requireNonNull(activeTraceSerializer, "activeTraceSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_ACTIVE_TRACE_COUNT, JoinApplicationStatBo::getJoinActiveTraceBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinActiveTraceBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinActiveTraceBoList); - } - List activeTracePuts = applicationStatHbaseOperationFactory.createPuts(id, joinActiveTraceBoList, statType, activeTraceSerializer); - if (!activeTracePuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, activeTracePuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ApplicationMetricDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ApplicationMetricDao.java new file mode 100644 index 000000000000..429734d2656f --- /dev/null +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ApplicationMetricDao.java @@ -0,0 +1,12 @@ +package com.navercorp.pinpoint.flink.dao.hbase; + +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; + +import java.util.List; + +public interface ApplicationMetricDao { + void insert(String id, long timestamp, List joinStatBoList); + + void insert(JoinApplicationStatBo joinApplicationStatBo); +} diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/CpuLoadDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/CpuLoadDao.java index d18c8738c2a8..6144fd56a0c4 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/CpuLoadDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/CpuLoadDao.java @@ -19,48 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.CpuLoadSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinCpuLoadBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class CpuLoadDao { - private final Logger logger = LogManager.getLogger(this.getClass()); +public class CpuLoadDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final CpuLoadSerializer cpuLoadSerializer; - private final TableNameProvider tableNameProvider; - - public CpuLoadDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - CpuLoadSerializer cpuLoadSerializer, + public CpuLoadDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.cpuLoadSerializer = Objects.requireNonNull(cpuLoadSerializer, "cpuLoadSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_CPU_LOAD, JoinApplicationStatBo::getJoinCpuLoadBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinCpuLoadBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinCpuLoadBoList); - } - List cpuLoadPuts = applicationStatHbaseOperationFactory.createPuts(id, joinCpuLoadBoList, statType, cpuLoadSerializer); - if (!cpuLoadPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, cpuLoadPuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DataSourceDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DataSourceDao.java index 33e158fd3d3d..20b550202340 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DataSourceDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DataSourceDao.java @@ -19,48 +19,23 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.DataSourceSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinDataSourceListBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class DataSourceDao { - private final Logger logger = LogManager.getLogger(this.getClass()); - - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final DataSourceSerializer dataSourceSerializer; - private final TableNameProvider tableNameProvider; +public class DataSourceDao extends DefaultApplicationMetricDao { - public DataSourceDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - DataSourceSerializer dataSourceSerializer, + public DataSourceDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.dataSourceSerializer = Objects.requireNonNull(dataSourceSerializer, "dataSourceSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_DATA_SOURCE, JoinApplicationStatBo::getJoinDataSourceListBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinResponseTimeBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinResponseTimeBoList); - } - List responseTimePuts = applicationStatHbaseOperationFactory.createPuts(id, joinResponseTimeBoList, statType, dataSourceSerializer); - if (!responseTimePuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, responseTimePuts); - } - } + } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DefaultApplicationMetricDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DefaultApplicationMetricDao.java new file mode 100644 index 000000000000..574a97bdcddd --- /dev/null +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DefaultApplicationMetricDao.java @@ -0,0 +1,67 @@ +package com.navercorp.pinpoint.flink.dao.hbase; + +import com.navercorp.pinpoint.common.hbase.HbaseTable; +import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; +import com.navercorp.pinpoint.common.hbase.TableNameProvider; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; +import com.navercorp.pinpoint.common.server.util.DateTimeFormatUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +public class DefaultApplicationMetricDao implements ApplicationMetricDao { + protected final Logger logger = LogManager.getLogger(this.getClass()); + + private final StatType statType; + private final Function> appStatFunction; + private final ApplicationStatSerializer serializer; + + private final HbaseTable tableName; + private final HbaseTemplate2 hbaseTemplate2; + private final ApplicationStatHbaseOperationFactory operationFactory; + private final TableNameProvider tableNameProvider; + + public DefaultApplicationMetricDao(StatType statType, + Function> appStatFunction, + ApplicationStatSerializer serializer, + + HbaseTable tableName, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, + TableNameProvider tableNameProvider) { + this.statType = Objects.requireNonNull(statType, "statType"); + this.appStatFunction = Objects.requireNonNull(appStatFunction, "dataPointFunction"); + this.serializer = Objects.requireNonNull(serializer, "activeTraceSerializer"); + + this.tableName = Objects.requireNonNull(tableName, "tableName"); + this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); + this.operationFactory = Objects.requireNonNull(operationFactory, "applicationStatHbaseOperationFactory"); + this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + } + + public void insert(String id, long timestamp, List appStatBoList) { + if (logger.isDebugEnabled()) { + logger.debug("[insert] {} : ({})", DateTimeFormatUtils.format(timestamp), appStatBoList); + } + List activeTracePuts = operationFactory.createPuts(id, appStatBoList, statType, serializer); + if (!activeTracePuts.isEmpty()) { + TableName applicationStatAggreTableName = tableNameProvider.getTableName(tableName); + hbaseTemplate2.asyncPut(applicationStatAggreTableName, activeTracePuts); + } + } + + @Override + public void insert(JoinApplicationStatBo applicationStatBo) { + List statBo = appStatFunction.apply(applicationStatBo); + insert(applicationStatBo.getId(), applicationStatBo.getTimestamp(), statBo); + } +} diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DirectBufferDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DirectBufferDao.java index b2b0e1640b7b..c29d429622d3 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DirectBufferDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/DirectBufferDao.java @@ -19,49 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.DirectBufferSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinDirectBufferBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import org.springframework.beans.factory.annotation.Qualifier; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author Roy Kim */ -public class DirectBufferDao { - private final Logger logger = LogManager.getLogger(this.getClass()); +public class DirectBufferDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final DirectBufferSerializer directBufferSerializer; - private final TableNameProvider tableNameProvider; - - public DirectBufferDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - DirectBufferSerializer directBufferSerializer, + public DirectBufferDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.directBufferSerializer = Objects.requireNonNull(directBufferSerializer, "directBufferSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_DIRECT_BUFFER, JoinApplicationStatBo::getJoinDirectBufferBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinDirectBufferBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinDirectBufferBoList); - } - List directBufferPuts = applicationStatHbaseOperationFactory.createPuts(id, joinDirectBufferBoList, statType, directBufferSerializer); - if (!directBufferPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, directBufferPuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/FileDescriptorDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/FileDescriptorDao.java index 547657dfc7e5..dfd23ab17aca 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/FileDescriptorDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/FileDescriptorDao.java @@ -19,48 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.FileDescriptorSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinFileDescriptorBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author Roy Kim */ -public class FileDescriptorDao { - private final Logger logger = LogManager.getLogger(this.getClass()); +public class FileDescriptorDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final FileDescriptorSerializer fileDescriptorSerializer; - private final TableNameProvider tableNameProvider; - - public FileDescriptorDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - FileDescriptorSerializer fileDescriptorSerializer, + public FileDescriptorDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.fileDescriptorSerializer = Objects.requireNonNull(fileDescriptorSerializer, "fileDescriptorSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_FILE_DESCRIPTOR, JoinApplicationStatBo::getJoinFileDescriptorBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinFileDescriptorBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinFileDescriptorBoList); - } - List fileDescriptorPuts = applicationStatHbaseOperationFactory.createPuts(id, joinFileDescriptorBoList, statType, fileDescriptorSerializer); - if (!fileDescriptorPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, fileDescriptorPuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/LoadedClassDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/LoadedClassDao.java index 029bb0747de8..4556e1c1e4fb 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/LoadedClassDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/LoadedClassDao.java @@ -20,44 +20,19 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.LoadedClassSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinLoadedClassBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import java.util.Date; -import java.util.List; -import java.util.Objects; +public class LoadedClassDao extends DefaultApplicationMetricDao { -public class LoadedClassDao { - private final Logger logger = LogManager.getLogger(this.getClass()); - - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final LoadedClassSerializer loadedClassSerializer; - private final TableNameProvider tableNameProvider; - - public LoadedClassDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - LoadedClassSerializer loadedClassSerializer, + public LoadedClassDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.loadedClassSerializer = Objects.requireNonNull(loadedClassSerializer, "loadedClassSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_LOADED_CLASS, JoinApplicationStatBo::getJoinLoadedClassBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinLoadedClassBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinLoadedClassBoList); - } - List fileDescriptorPuts = applicationStatHbaseOperationFactory.createPuts(id, joinLoadedClassBoList, statType, loadedClassSerializer); - if (!fileDescriptorPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, fileDescriptorPuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/MemoryDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/MemoryDao.java index cc6496bb2002..6e91ef52f321 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/MemoryDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/MemoryDao.java @@ -19,48 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.MemorySerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinMemoryBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class MemoryDao { - private final Logger logger = LogManager.getLogger(this.getClass()); +public class MemoryDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final MemorySerializer memorySerializer; - private final TableNameProvider tableNameProvider; - - public MemoryDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - MemorySerializer memorySerializer, + public MemoryDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.memorySerializer = Objects.requireNonNull(memorySerializer, "memorySerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_MEMORY_USED, JoinApplicationStatBo::getJoinMemoryBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinMemoryBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinMemoryBoList); - } - List memoryPuts = applicationStatHbaseOperationFactory.createPuts(id, joinMemoryBoList, statType, memorySerializer); - if (!memoryPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - this.hbaseTemplate2.asyncPut(applicationStatAggreTableName, memoryPuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ResponseTimeDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ResponseTimeDao.java index d5e020cc0015..4450bcc560a5 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ResponseTimeDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/ResponseTimeDao.java @@ -19,48 +19,23 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ResponseTimeSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinResponseTimeBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class ResponseTimeDao { - private final Logger logger = LogManager.getLogger(this.getClass()); +public class ResponseTimeDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final ResponseTimeSerializer responseTimeSerializer; - private final TableNameProvider tableNameProvider; - public ResponseTimeDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - ResponseTimeSerializer responseTimeSerializer, + public ResponseTimeDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.responseTimeSerializer = Objects.requireNonNull(responseTimeSerializer, "responseTimeSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_RESPONSE_TIME, JoinApplicationStatBo::getJoinResponseTimeBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinResponseTimeBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinResponseTimeBoList); - } - List responseTimePuts = applicationStatHbaseOperationFactory.createPuts(id, joinResponseTimeBoList, statType, responseTimeSerializer); - if (!responseTimePuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, responseTimePuts); - } - } } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/StatisticsDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/StatisticsDao.java index f2d10c29f170..599cddb206a0 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/StatisticsDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/StatisticsDao.java @@ -21,18 +21,12 @@ import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; import com.navercorp.pinpoint.flink.Bootstrap; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.util.CollectionUtil; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import org.apache.logging.log4j.Logger; /** * @author minwoo.jung @@ -41,32 +35,15 @@ public class StatisticsDao extends RichSinkFunction[] applicationMetricDaoList; private transient StatisticsDaoInterceptor statisticsDaoInterceptor; @Override public void open(Configuration parameters) throws Exception { ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Bootstrap bootstrap = Bootstrap.getInstance(globalJobParameters.toMap()); - cpuLoadDao = bootstrap.getCpuLoadDao(); - memoryDao = bootstrap.getMemoryDao(); - transactionDao = bootstrap.getTransactionDao(); - activeTraceDao = bootstrap.getActiveTraceDao(); - responseTimeDao = bootstrap.getResponseTimeDao(); - dataSourceDao = bootstrap.getDataSourceDao(); - fileDescriptorDao = bootstrap.getFileDescriptorDao(); - directBufferDao = bootstrap.getDirectBufferDao(); - totalThreadCountDao = bootstrap.getTotalThreadCountDao(); - loadedClassDao = bootstrap.getLoadedClassDao(); + applicationMetricDaoList = bootstrap.getApplicationMetricDaoList().toArray(new ApplicationMetricDao[0]); statisticsDaoInterceptor = bootstrap.getStatisticsDaoInterceptor(); } @@ -90,42 +67,14 @@ public void invoke(Tuple3 statData, SinkFunction.Conte } } - private void insertJoinApplicationStatBo(JoinApplicationStatBo joinApplicationStatBo) { - List joinCpuLoadBoList = castJoinStatBoList(joinApplicationStatBo.getJoinCpuLoadBoList()); - List joinMemoryBoList = castJoinStatBoList(joinApplicationStatBo.getJoinMemoryBoList()); - List joinTransactionBoList = castJoinStatBoList(joinApplicationStatBo.getJoinTransactionBoList()); - List joinActiveTraceBoList = castJoinStatBoList(joinApplicationStatBo.getJoinActiveTraceBoList()); - List joinResponseTimeBoList = castJoinStatBoList(joinApplicationStatBo.getJoinResponseTimeBoList()); - List joinDataSourceBoList = castJoinStatBoList(joinApplicationStatBo.getJoinDataSourceListBoList()); - List joinFileDescriptorBoList = castJoinStatBoList(joinApplicationStatBo.getJoinFileDescriptorBoList()); - List joinDirectBufferBoList = castJoinStatBoList(joinApplicationStatBo.getJoinDirectBufferBoList()); - List joinTotalThreadCountBoList = castJoinStatBoList(joinApplicationStatBo.getJoinTotalThreadCountBoList()); - List joinLoadedClassBoList = castJoinStatBoList(joinApplicationStatBo.getJoinLoadedClassBoList()); - - if (joinApplicationStatBo.getStatType() == StatType.APP_STST_AGGRE) { + private void insertJoinApplicationStatBo(JoinApplicationStatBo appMetric) { + if (appMetric.getStatType() == StatType.APP_STST_AGGRE) { // logger.info("insert application aggre : " + new Date(joinApplicationStatBo.getTimestamp()) + " ("+ joinApplicationStatBo.getApplicationId() + " )"); } else { - final String id = joinApplicationStatBo.getId(); - final long timestamp = joinApplicationStatBo.getTimestamp(); - cpuLoadDao.insert(id, timestamp, joinCpuLoadBoList, StatType.APP_CPU_LOAD); - memoryDao.insert(id, timestamp, joinMemoryBoList, StatType.APP_MEMORY_USED); - transactionDao.insert(id, timestamp, joinTransactionBoList, StatType.APP_TRANSACTION_COUNT); - activeTraceDao.insert(id, timestamp, joinActiveTraceBoList, StatType.APP_ACTIVE_TRACE_COUNT); - responseTimeDao.insert(id, timestamp, joinResponseTimeBoList, StatType.APP_RESPONSE_TIME); - dataSourceDao.insert(id, timestamp, joinDataSourceBoList, StatType.APP_DATA_SOURCE); - fileDescriptorDao.insert(id, timestamp, joinFileDescriptorBoList, StatType.APP_FILE_DESCRIPTOR); - directBufferDao.insert(id, timestamp, joinDirectBufferBoList, StatType.APP_DIRECT_BUFFER); - totalThreadCountDao.insert(id, timestamp, joinTotalThreadCountBoList, StatType.APP_TOTAL_THREAD_COUNT); - loadedClassDao.insert(id, timestamp, joinLoadedClassBoList, StatType.APP_LOADED_CLASS); - } - } - - private List castJoinStatBoList(List JoinStatBoList) { - if (CollectionUtil.isNullOrEmpty(JoinStatBoList)) { - return new ArrayList<>(0); + for (ApplicationMetricDao dao : applicationMetricDaoList) { + dao.insert(appMetric); + } } - - return new ArrayList<>(JoinStatBoList); } private void insertJoinAgentStatBo(JoinAgentStatBo joinAgentStatBo) { diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TotalThreadCountDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TotalThreadCountDao.java index b99948842dee..7681f6247118 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TotalThreadCountDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TotalThreadCountDao.java @@ -20,44 +20,20 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.TotalThreadCountSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinTotalThreadCountBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import java.util.Date; -import java.util.List; -import java.util.Objects; +public class TotalThreadCountDao extends DefaultApplicationMetricDao { -public class TotalThreadCountDao { - private final Logger logger = LogManager.getLogger(this.getClass()); - - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final TotalThreadCountSerializer totalThreadCountSerializer; - private final TableNameProvider tableNameProvider; - - public TotalThreadCountDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - TotalThreadCountSerializer totalThreadCountSerializer, + public TotalThreadCountDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHBaseOperationFactory"); - this.totalThreadCountSerializer = Objects.requireNonNull(totalThreadCountSerializer, "totalThreadCountSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_TOTAL_THREAD_COUNT, JoinApplicationStatBo::getJoinTotalThreadCountBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinTotalThreadCountBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinTotalThreadCountBoList); - } - List fileDescriptorPuts = applicationStatHbaseOperationFactory.createPuts(id, joinTotalThreadCountBoList, statType, totalThreadCountSerializer); - if (!fileDescriptorPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, fileDescriptorPuts); - } - } + } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TransactionDao.java b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TransactionDao.java index 9e8c426f3932..f542221ea930 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TransactionDao.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/dao/hbase/TransactionDao.java @@ -19,49 +19,22 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory; -import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.TransactionSerializer; -import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; +import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo; +import com.navercorp.pinpoint.common.server.bo.stat.join.JoinTransactionBo; import com.navercorp.pinpoint.common.server.bo.stat.join.StatType; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Date; -import java.util.List; -import java.util.Objects; - /** * @author minwoo.jung */ -public class TransactionDao { - private final Logger logger = LogManager.getLogger(this.getClass()); - +public class TransactionDao extends DefaultApplicationMetricDao { - private final HbaseTemplate2 hbaseTemplate2; - private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory; - private final TransactionSerializer transactionSerializer; - private final TableNameProvider tableNameProvider; - - public TransactionDao(HbaseTemplate2 hbaseTemplate2, - ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory, - TransactionSerializer transactionSerializer, + public TransactionDao(ApplicationStatSerializer serializer, + HbaseTemplate2 hbaseTemplate2, + ApplicationStatHbaseOperationFactory operationFactory, TableNameProvider tableNameProvider) { - this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2"); - this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory"); - this.transactionSerializer = Objects.requireNonNull(transactionSerializer, "transactionSerializer"); - this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + super(StatType.APP_TRANSACTION_COUNT, JoinApplicationStatBo::getJoinTransactionBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE, + hbaseTemplate2, operationFactory, tableNameProvider); } - public void insert(String id, long timestamp, List joinTransactionBoList, StatType statType) { - if (logger.isDebugEnabled()) { - logger.debug("[insert] {} : ({})", new Date(timestamp), joinTransactionBoList); - } - List transactionPuts = applicationStatHbaseOperationFactory.createPuts(id, joinTransactionBoList, statType, transactionSerializer); - if (!transactionPuts.isEmpty()) { - TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE); - hbaseTemplate2.asyncPut(applicationStatAggreTableName, transactionPuts); - } - } } diff --git a/flink/src/main/resources/applicationContext-flink.xml b/flink/src/main/resources/applicationContext-flink.xml index 1103012a23f0..929399925c41 100644 --- a/flink/src/main/resources/applicationContext-flink.xml +++ b/flink/src/main/resources/applicationContext-flink.xml @@ -76,72 +76,72 @@ - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - + + + @@ -178,7 +178,7 @@ - +