Skip to content

Commit

Permalink
[#noissue] Refactor Flink MetricDao
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Apr 5, 2022
1 parent fe0556d commit d0c7825
Show file tree
Hide file tree
Showing 16 changed files with 234 additions and 497 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ public ApplicationStatHbaseOperationFactory(
this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor");
}

public List<Put> createPuts(String applicationId, List<JoinStatBo> joinStatBoList, StatType statType, ApplicationStatSerializer applicationStatSerializer) {
public <T extends JoinStatBo> List<Put> createPuts(String applicationId, List<T> joinStatBoList, StatType statType, ApplicationStatSerializer<T> applicationStatSerializer) {
if (CollectionUtils.isEmpty(joinStatBoList)) {
return Collections.emptyList();
}

Map<Long, List<JoinStatBo>> timeslots = slotApplicationStatDataPoints(joinStatBoList);
Map<Long, List<T>> timeslots = slotApplicationStatDataPoints(joinStatBoList);
List<Put> puts = new ArrayList<>();
for (Map.Entry<Long, List<JoinStatBo>> timeslot : timeslots.entrySet()) {
for (Map.Entry<Long, List<T>> timeslot : timeslots.entrySet()) {
long baseTimestamp = timeslot.getKey();
List<JoinStatBo> slottedApplicationStatDataPoints = timeslot.getValue();
List<T> slottedApplicationStatDataPoints = timeslot.getValue();

final ApplicationStatRowKeyComponent rowKeyComponent = new ApplicationStatRowKeyComponent(applicationId, statType, baseTimestamp);
byte[] rowKey = this.rowKeyEncoder.encodeRowKey(rowKeyComponent);
Expand Down
71 changes: 19 additions & 52 deletions flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.navercorp.pinpoint.flink;

import com.navercorp.pinpoint.collector.receiver.thrift.TCPReceiverBean;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.flink.cluster.FlinkServerRegister;
import com.navercorp.pinpoint.flink.config.FlinkConfiguration;
import com.navercorp.pinpoint.flink.dao.hbase.*;
Expand All @@ -35,6 +36,8 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -55,16 +58,9 @@ public class Bootstrap {
private final TcpDispatchHandler tcpDispatchHandler;
private final TcpSourceFunction tcpSourceFunction;
private final ApplicationCache applicationCache;
private final CpuLoadDao cpuLoadDao;
private final MemoryDao memoryDao;
private final TransactionDao transactionDao;
private final ActiveTraceDao activeTraceDao;
private final ResponseTimeDao responseTimeDao;
private final DataSourceDao dataSourceDao;
private final FileDescriptorDao fileDescriptorDao;
private final DirectBufferDao directBufferDao;
private final TotalThreadCountDao totalThreadCountDao;
private final LoadedClassDao loadedClassDao;

private final List<ApplicationMetricDao<JoinStatBo>> applicationMetricDaoList;

private final TBaseFlatMapperInterceptor tBaseFlatMapperInterceptor;
private final StatisticsDaoInterceptor statisticsDaoInterceptor;
private final ApplicationStatBoWindowInterceptor applicationStatBoWindowInterceptor;
Expand All @@ -79,24 +75,23 @@ private Bootstrap() {
tcpSourceFunction = applicationContext.getBean("tcpSourceFunction", TcpSourceFunction.class);
applicationCache = applicationContext.getBean("applicationCache", ApplicationCache.class);
statisticsDao = applicationContext.getBean("statisticsDao", StatisticsDao.class);
cpuLoadDao = applicationContext.getBean("cpuLoadDao", CpuLoadDao.class);
memoryDao = applicationContext.getBean("memoryDao", MemoryDao.class);
transactionDao = applicationContext.getBean("transactionDao", TransactionDao.class);
activeTraceDao = applicationContext.getBean("activeTraceDao", ActiveTraceDao.class);
responseTimeDao = applicationContext.getBean("responseTimeDao", ResponseTimeDao.class);
dataSourceDao = applicationContext.getBean("dataSourceDao", DataSourceDao.class);
totalThreadCountDao = applicationContext.getBean("totalThreadCountDao", TotalThreadCountDao.class);
fileDescriptorDao = applicationContext.getBean("fileDescriptorDao", FileDescriptorDao.class);
directBufferDao = applicationContext.getBean("directBufferDao", DirectBufferDao.class);
loadedClassDao = applicationContext.getBean("loadedClassDao", LoadedClassDao.class);

this.applicationMetricDaoList = getApplicationMetricDao();

tBaseFlatMapperInterceptor = applicationContext.getBean("tBaseFlatMapperInterceptor", TBaseFlatMapperInterceptor.class);
statisticsDaoInterceptor = applicationContext.getBean("statisticsDaoInterceptor", StatisticsDaoInterceptor.class);
applicationStatBoWindowInterceptor = applicationContext.getBean("applicationStatBoWindowInterceptor", ApplicationStatBoWindowInterceptor.class);
agentStatHandler = applicationContext.getBean("agentStatHandler", AgentStatHandler.class);
}

public FileDescriptorDao getFileDescriptorDao() {
return fileDescriptorDao;
@SuppressWarnings("unchecked")
private List<ApplicationMetricDao<JoinStatBo>> getApplicationMetricDao() {
Map<String, ApplicationMetricDao> metricDaoMap = applicationContext.getBeansOfType(ApplicationMetricDao.class);

metricDaoMap.forEach((beanName, applicationMetricDao) -> logger.info("ApplicationMetricDao BeanName:{}", beanName));

List<ApplicationMetricDao> values = new ArrayList<>(metricDaoMap.values());
return (List<ApplicationMetricDao<JoinStatBo>>) (List<?>) values;
}

public static Bootstrap getInstance(Map<String, String> jobParameters) {
Expand All @@ -122,38 +117,10 @@ public StatisticsDao getStatisticsDao() {
return statisticsDao;
}

public CpuLoadDao getCpuLoadDao() {
return cpuLoadDao;
}

public MemoryDao getMemoryDao() {
return memoryDao;
}

public TransactionDao getTransactionDao() {
return transactionDao;
public List<ApplicationMetricDao<JoinStatBo>> getApplicationMetricDaoList() {
return applicationMetricDaoList;
}

public ActiveTraceDao getActiveTraceDao() {
return activeTraceDao;
}

public ResponseTimeDao getResponseTimeDao() {
return responseTimeDao;
}

public DataSourceDao getDataSourceDao() {
return dataSourceDao;
}

public DirectBufferDao getDirectBufferDao() {
return directBufferDao;
}

public TotalThreadCountDao getTotalThreadCountDao() { return totalThreadCountDao; }

public LoadedClassDao getLoadedClassDao() { return loadedClassDao; }

public TBaseFlatMapper getTbaseFlatMapper() {
return tbaseFlatMapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,22 @@
import com.navercorp.pinpoint.common.hbase.HbaseTemplate2;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ActiveTraceSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinActiveTraceBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.StatType;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
* @author minwoo.jung
*/
public class ActiveTraceDao {
private final Logger logger = LogManager.getLogger(this.getClass());

public class ActiveTraceDao extends DefaultApplicationMetricDao<JoinActiveTraceBo> {

private final HbaseTemplate2 hbaseTemplate2;
private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory;
private final ActiveTraceSerializer activeTraceSerializer;
private final TableNameProvider tableNameProvider;

public ActiveTraceDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
ActiveTraceSerializer activeTraceSerializer,
public ActiveTraceDao(ApplicationStatSerializer<JoinActiveTraceBo> serializer,
HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory operationFactory,
TableNameProvider tableNameProvider) {
this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2");
this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory");
this.activeTraceSerializer = Objects.requireNonNull(activeTraceSerializer, "activeTraceSerializer");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
super(StatType.APP_ACTIVE_TRACE_COUNT, JoinApplicationStatBo::getJoinActiveTraceBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE,
hbaseTemplate2, operationFactory, tableNameProvider);
}

public void insert(String id, long timestamp, List<JoinStatBo> joinActiveTraceBoList, StatType statType) {
if (logger.isDebugEnabled()) {
logger.debug("[insert] {} : ({})", new Date(timestamp), joinActiveTraceBoList);
}
List<Put> activeTracePuts = applicationStatHbaseOperationFactory.createPuts(id, joinActiveTraceBoList, statType, activeTraceSerializer);
if (!activeTracePuts.isEmpty()) {
TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE);
hbaseTemplate2.asyncPut(applicationStatAggreTableName, activeTracePuts);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.navercorp.pinpoint.flink.dao.hbase;

import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;

import java.util.List;

public interface ApplicationMetricDao<T extends JoinStatBo> {
void insert(String id, long timestamp, List<T> joinStatBoList);

void insert(JoinApplicationStatBo joinApplicationStatBo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,22 @@
import com.navercorp.pinpoint.common.hbase.HbaseTemplate2;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.CpuLoadSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinCpuLoadBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.StatType;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
* @author minwoo.jung
*/
public class CpuLoadDao {
private final Logger logger = LogManager.getLogger(this.getClass());
public class CpuLoadDao extends DefaultApplicationMetricDao<JoinCpuLoadBo> {

private final HbaseTemplate2 hbaseTemplate2;
private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory;
private final CpuLoadSerializer cpuLoadSerializer;
private final TableNameProvider tableNameProvider;

public CpuLoadDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
CpuLoadSerializer cpuLoadSerializer,
public CpuLoadDao(ApplicationStatSerializer<JoinCpuLoadBo> serializer,
HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory operationFactory,
TableNameProvider tableNameProvider) {
this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2");
this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory");
this.cpuLoadSerializer = Objects.requireNonNull(cpuLoadSerializer, "cpuLoadSerializer");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
super(StatType.APP_CPU_LOAD, JoinApplicationStatBo::getJoinCpuLoadBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE,
hbaseTemplate2, operationFactory, tableNameProvider);
}

public void insert(String id, long timestamp, List<JoinStatBo> joinCpuLoadBoList, StatType statType) {
if (logger.isDebugEnabled()) {
logger.debug("[insert] {} : ({})", new Date(timestamp), joinCpuLoadBoList);
}
List<Put> cpuLoadPuts = applicationStatHbaseOperationFactory.createPuts(id, joinCpuLoadBoList, statType, cpuLoadSerializer);
if (!cpuLoadPuts.isEmpty()) {
TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE);
hbaseTemplate2.asyncPut(applicationStatAggreTableName, cpuLoadPuts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,23 @@
import com.navercorp.pinpoint.common.hbase.HbaseTemplate2;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.ApplicationStatHbaseOperationFactory;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.DataSourceSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.common.server.bo.serializer.stat.join.ApplicationStatSerializer;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinDataSourceListBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.StatType;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
* @author minwoo.jung
*/
public class DataSourceDao {
private final Logger logger = LogManager.getLogger(this.getClass());

private final HbaseTemplate2 hbaseTemplate2;
private final ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory;
private final DataSourceSerializer dataSourceSerializer;
private final TableNameProvider tableNameProvider;
public class DataSourceDao extends DefaultApplicationMetricDao<JoinDataSourceListBo> {

public DataSourceDao(HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory applicationStatHbaseOperationFactory,
DataSourceSerializer dataSourceSerializer,
public DataSourceDao(ApplicationStatSerializer<JoinDataSourceListBo> serializer,
HbaseTemplate2 hbaseTemplate2,
ApplicationStatHbaseOperationFactory operationFactory,
TableNameProvider tableNameProvider) {
this.hbaseTemplate2 = Objects.requireNonNull(hbaseTemplate2, "hbaseTemplate2");
this.applicationStatHbaseOperationFactory = Objects.requireNonNull(applicationStatHbaseOperationFactory, "applicationStatHbaseOperationFactory");
this.dataSourceSerializer = Objects.requireNonNull(dataSourceSerializer, "dataSourceSerializer");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
super(StatType.APP_DATA_SOURCE, JoinApplicationStatBo::getJoinDataSourceListBoList, serializer, HbaseTable.APPLICATION_STAT_AGGRE,
hbaseTemplate2, operationFactory, tableNameProvider);
}

public void insert(String id, long timestamp, List<JoinStatBo> joinResponseTimeBoList, StatType statType) {
if (logger.isDebugEnabled()) {
logger.debug("[insert] {} : ({})", new Date(timestamp), joinResponseTimeBoList);
}
List<Put> responseTimePuts = applicationStatHbaseOperationFactory.createPuts(id, joinResponseTimeBoList, statType, dataSourceSerializer);
if (!responseTimePuts.isEmpty()) {
TableName applicationStatAggreTableName = tableNameProvider.getTableName(HbaseTable.APPLICATION_STAT_AGGRE);
hbaseTemplate2.asyncPut(applicationStatAggreTableName, responseTimePuts);
}
}

}
Loading

0 comments on commit d0c7825

Please sign in to comment.