Skip to content

Commit

Permalink
[#6821] Improves server map performance
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Feb 5, 2021
1 parent ddbfa72 commit e419421
Show file tree
Hide file tree
Showing 42 changed files with 1,290 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,48 +30,35 @@

/**
* @author emeroad
* @author jaehong.kim
*/
public class CollectorConfiguration {

private final Logger logger = LoggerFactory.getLogger(getClass());

@Value("${collector.agentEventWorker.threadSize:32}")
private int agentEventWorkerThreadSize;

@Value("${collector.agentEventWorker.queueSize:5120}")
private int agentEventWorkerQueueSize;

// @Value("#{'${collector.l4.ip:}'.split(',')}")
// private List<String> l4IpList = Collections.emptyList();
@Value("${collector.l4.ip:}")
private String[] l4IpList = new String[0];

@Value("${collector.metric.jmx:false}")
private boolean metricJmxEnable;

@Value("${collector.metric.jmx.domain:pinpoint.collector.metrics}")
private String metricJmxDomainName;

@Value("${cluster.enable}")
private boolean clusterEnable;

@Value("${cluster.zookeeper.address:}")
private String clusterAddress;

@Value("${cluster.zookeeper.sessiontimeout:-1}")
private int clusterSessionTimeout;


@Value("${cluster.listen.ip:}")
private String clusterListenIp;


@Value("${cluster.listen.port:-1}")
private int clusterListenPort;


@Value("${collector.stat.uri:false}")
private boolean uriStatEnable;
@Value("${collector.statistics.agent-state.enable:false}")
private boolean statisticsAgentStateEnable;

public int getAgentEventWorkerThreadSize() {
return this.agentEventWorkerThreadSize;
Expand Down Expand Up @@ -162,14 +149,21 @@ public void setUriStatEnable(boolean uriStatEnable) {
this.uriStatEnable = uriStatEnable;
}

public void setStatisticsAgentStateEnable(boolean statisticsAgentStateEnable) {
this.statisticsAgentStateEnable = statisticsAgentStateEnable;
}

public boolean isStatisticsAgentStateEnable() {
return statisticsAgentStateEnable;
}

@PostConstruct
public void log() {
logger.info("{}", this);
AnnotationVisitor<Value> visitor = new AnnotationVisitor<>(Value.class);
visitor.visit(this, new LoggingEvent(logger));
}


@Override
public String toString() {
final StringBuilder sb = new StringBuilder("CollectorConfiguration{");
Expand All @@ -184,8 +178,8 @@ public String toString() {
sb.append(", clusterListenIp='").append(clusterListenIp).append('\'');
sb.append(", clusterListenPort=").append(clusterListenPort);
sb.append(", uriStatEnable=").append(uriStatEnable);
sb.append(", statisticsAgentStateEnable=").append(statisticsAgentStateEnable);
sb.append('}');
return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

/**
* @author emeroad
* @author jaehong.kim
*/
public interface AgentInfoDao {
void insert(AgentInfoBo agentInfo);

AgentInfoBo getAgentInfo(String agentId, long timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/**
* @author emeroad
* @author jaehong.kim
*/
public interface MapResponseTimeDao extends CachedStatisticsDao {
void received(String applicationName, ServiceType serviceType, String agentId, int elapsed, boolean isError);
void received(String applicationName, ServiceType serviceType, String agentId, int elapsed, boolean isError, boolean isPing);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.HbaseTableConstants;
import com.navercorp.pinpoint.common.hbase.ResultsExtractor;
import com.navercorp.pinpoint.common.hbase.TableDescriptor;
import com.navercorp.pinpoint.common.server.bo.AgentInfoBo;
import com.navercorp.pinpoint.common.server.util.RowKeyUtils;
import com.navercorp.pinpoint.common.util.TimeUtils;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,19 +39,23 @@

/**
* @author emeroad
* @author jaehong.kim
*/
@Repository
public class HbaseAgentInfoDao implements AgentInfoDao {
private static final int SCANNER_CACHING = 1;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final HbaseOperations2 hbaseTemplate;
private final ResultsExtractor<AgentInfoBo> agentInfoResultsExtractor;

private final TableDescriptor<HbaseColumnFamily.AgentInfo> descriptor;

public HbaseAgentInfoDao(HbaseOperations2 hbaseTemplate, TableDescriptor<HbaseColumnFamily.AgentInfo> descriptor) {
public HbaseAgentInfoDao(HbaseOperations2 hbaseTemplate, TableDescriptor<HbaseColumnFamily.AgentInfo> descriptor, ResultsExtractor<AgentInfoBo> agentInfoResultsExtractor) {
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.descriptor = Objects.requireNonNull(descriptor, "descriptor");
this.agentInfoResultsExtractor = Objects.requireNonNull(agentInfoResultsExtractor, "agentInfoResultsExtractor");
}

@Override
Expand Down Expand Up @@ -86,4 +92,28 @@ public void insert(AgentInfoBo agentInfo) {
final TableName agentInfoTableName = descriptor.getTableName();
hbaseTemplate.put(agentInfoTableName, put);
}

public AgentInfoBo getAgentInfo(final String agentId, final long timestamp) {
Objects.requireNonNull(agentId, "agentId");

final Scan scan = createScan(agentId, timestamp);
final TableName agentInfoTableName = descriptor.getTableName();
return this.hbaseTemplate.find(agentInfoTableName, scan, agentInfoResultsExtractor);
}

private Scan createScan(String agentId, long currentTime) {
final Scan scan = new Scan();
final byte[] agentIdBytes = Bytes.toBytes(agentId);
final long startTime = TimeUtils.reverseTimeMillis(currentTime);
final byte[] startKeyBytes = RowKeyUtils.concatFixedByteAndLong(agentIdBytes, HbaseTableConstants.AGENT_NAME_MAX_LEN, startTime);
final byte[] endKeyBytes = RowKeyUtils.concatFixedByteAndLong(agentIdBytes, HbaseTableConstants.AGENT_NAME_MAX_LEN, Long.MAX_VALUE);

scan.withStartRow(startKeyBytes);
scan.withStopRow(endKeyBytes);
scan.addFamily(descriptor.getColumnFamilyName());
scan.setMaxVersions(1);
scan.setCaching(SCANNER_CACHING);

return scan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,24 @@

/**
* Save response time data of WAS
*
*
* @author netspider
* @author emeroad
* @author jaehong.kim
* @author HyunGil Jeong
*/
@Repository
public class HbaseMapResponseTimeDao extends MonitoredCachedStatisticsDao implements MapResponseTimeDao {

public class HbaseMapResponseTimeDao extends MonitoredCachedStatisticsDao implements MapResponseTimeDao {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final HbaseOperations2 hbaseTemplate;

private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final AcceptedTimeService acceptedTimeService;

private final TimeSlot timeSlot;

private final BulkIncrementer bulkIncrementer;

private final boolean useBulk;

private final TableDescriptor<HbaseColumnFamily.SelfStatMap> descriptor;


@Autowired
public HbaseMapResponseTimeDao(HbaseOperations2 hbaseTemplate,
TableDescriptor<HbaseColumnFamily.SelfStatMap> descriptor,
Expand All @@ -95,7 +87,7 @@ public HbaseMapResponseTimeDao(HbaseOperations2 hbaseTemplate,
}

@Override
public void received(String applicationName, ServiceType applicationServiceType, String agentId, int elapsed, boolean isError) {
public void received(String applicationName, ServiceType applicationServiceType, String agentId, int elapsed, boolean isError, boolean isPing) {
Objects.requireNonNull(applicationName, "applicationName");
Objects.requireNonNull(agentId, "agentId");

Expand All @@ -108,7 +100,7 @@ public void received(String applicationName, ServiceType applicationServiceType,
final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);
final RowKey selfRowKey = new CallRowKey(applicationName, applicationServiceType.getCode(), rowTimeSlot);

final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationServiceType, elapsed, isError);
final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationServiceType, elapsed, isError, isPing);
final ColumnName selfColumnName = new ResponseColumnName(agentId, slotNumber);
if (useBulk) {
TableName mapStatisticsSelfTableName = descriptor.getTableName();
Expand All @@ -132,7 +124,6 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
hbaseTemplate.incrementColumnValue(mapStatisticsSelfTableName, rowKey, descriptor.getColumnFamilyName(), columnName, increment);
}


@Override
public void flushAll() {
if (!useBulk) {
Expand All @@ -154,5 +145,4 @@ public void flushAll() {
private byte[] getDistributedKey(byte[] rowKey) {
return rowKeyDistributorByHashPrefix.getDistributedKey(rowKey);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2021 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.dao.hbase.mapper;

import com.navercorp.pinpoint.common.PinpointConstants;
import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.buffer.FixedBuffer;
import com.navercorp.pinpoint.common.hbase.HbaseTableConstants;
import com.navercorp.pinpoint.common.hbase.RowMapper;
import com.navercorp.pinpoint.common.server.bo.AgentInfoBo;
import com.navercorp.pinpoint.common.server.bo.JvmInfoBo;
import com.navercorp.pinpoint.common.server.bo.ServerMetaDataBo;
import com.navercorp.pinpoint.common.util.BytesUtils;
import com.navercorp.pinpoint.common.util.TimeUtils;
import org.apache.hadoop.hbase.client.Result;
import org.springframework.stereotype.Component;

import static com.navercorp.pinpoint.common.hbase.HbaseColumnFamily.AGENTINFO_INFO;


/**
* @author HyunGil Jeong
*/
@Component
public class AgentInfoMapper implements RowMapper<AgentInfoBo> {

@Override
public AgentInfoBo mapRow(Result result, int rowNum) throws Exception {
byte[] rowKey = result.getRow();
String agentId = BytesUtils.safeTrim(BytesUtils.toString(rowKey, 0, PinpointConstants.AGENT_NAME_MAX_LEN));
long reverseStartTime = BytesUtils.bytesToLong(rowKey, HbaseTableConstants.AGENT_NAME_MAX_LEN);
long startTime = TimeUtils.recoveryTimeMillis(reverseStartTime);

byte[] serializedAgentInfo = result.getValue(AGENTINFO_INFO.getName(), AGENTINFO_INFO.QUALIFIER_IDENTIFIER);
byte[] serializedServerMetaData = result.getValue(AGENTINFO_INFO.getName(), AGENTINFO_INFO.QUALIFIER_SERVER_META_DATA);
byte[] serializedJvmInfo = result.getValue(AGENTINFO_INFO.getName(), AGENTINFO_INFO.QUALIFIER_JVM);

final AgentInfoBo.Builder agentInfoBoBuilder = createBuilderFromValue(serializedAgentInfo);
agentInfoBoBuilder.setAgentId(agentId);
agentInfoBoBuilder.setStartTime(startTime);

if (serializedServerMetaData != null) {
agentInfoBoBuilder.setServerMetaData(new ServerMetaDataBo.Builder(serializedServerMetaData).build());
}
if (serializedJvmInfo != null) {
agentInfoBoBuilder.setJvmInfo(new JvmInfoBo(serializedJvmInfo));
}
return agentInfoBoBuilder.build();
}

private AgentInfoBo.Builder createBuilderFromValue(byte[] serializedAgentInfo) {
final Buffer buffer = new FixedBuffer(serializedAgentInfo);
final AgentInfoBo.Builder builder = new AgentInfoBo.Builder();
builder.setHostName(buffer.readPrefixedString());
builder.setIp(buffer.readPrefixedString());
builder.setPorts(buffer.readPrefixedString());
builder.setApplicationName(buffer.readPrefixedString());
builder.setServiceTypeCode(buffer.readShort());
builder.setPid(buffer.readInt());
builder.setAgentVersion(buffer.readPrefixedString());
builder.setStartTime(buffer.readLong());
builder.setEndTimeStamp(buffer.readLong());
builder.setEndStatus(buffer.readInt());
// FIXME - 2015.09 v1.5.0 added vmVersion (check for compatibility)
if (buffer.hasRemaining()) {
builder.setVmVersion(buffer.readPrefixedString());
}
// FIXME - 2018.06 v1.8.0 added container (check for compatibility)
if (buffer.hasRemaining()) {
builder.isContainer(buffer.readBoolean());
}
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.dao.hbase.mapper;

import com.navercorp.pinpoint.common.hbase.ResultsExtractor;
import com.navercorp.pinpoint.common.hbase.RowMapper;
import com.navercorp.pinpoint.common.server.bo.AgentInfoBo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author HyunGil Jeong
*/
@Component
public class AgentInfoResultsExtractor implements ResultsExtractor<AgentInfoBo> {

@Autowired
private RowMapper<AgentInfoBo> agentInfoMapper;

@Override
public AgentInfoBo extractData(ResultScanner results) throws Exception {
int found = 0;
for (Result result : results) {
return agentInfoMapper.mapRow(result, found++);
}
return null;
}
}
Loading

0 comments on commit e419421

Please sign in to comment.