Skip to content

Commit

Permalink
#2345 Handle cases where multiple agents with the same id simultaneou…
Browse files Browse the repository at this point in the history
…sly send data

Picks the sampled data point of the most recently started agent if there are overlapping stat data from multiple agents in a single timeslot.
  • Loading branch information
Xylus authored and minwoo-jung committed Dec 29, 2016
1 parent 698042e commit 4e4b675
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 45 deletions.
Expand Up @@ -86,7 +86,7 @@ <T extends AgentStatDataPoint> boolean agentStatExists(AgentStatType agentStatTy
logger.debug("checking for stat data existence : agentId={}, {}", agentId, range);
}

int resultLimit = 1;
int resultLimit = 20;
Scan scan = this.createScan(agentStatType, agentId, range, resultLimit);

List<List<T>> result = hbaseOperations2.findParallel(HBaseTables.AGENT_STAT_VER2, scan, this.operationFactory.getRowKeyDistributor(), resultLimit, mapper, AGENT_STAT_VER2_NUM_PARTITIONS);
Expand Down
@@ -0,0 +1,151 @@
package com.navercorp.pinpoint.web.mapper.stat;

import com.navercorp.pinpoint.common.server.bo.stat.AgentStatDataPoint;
import com.navercorp.pinpoint.web.util.TimeWindow;
import com.navercorp.pinpoint.web.vo.stat.SampledAgentStatDataPoint;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* @author HyunGil Jeong
*/
public abstract class AbstractAgentStatSampler<T extends AgentStatDataPoint, S extends SampledAgentStatDataPoint> implements AgentStatSampler<T, S> {

private static final int INITIAL_TIME_WINDOW_INDEX = -1;
private static final Comparator<Long> REVERSE_TIMESTAMP_COMPARATOR = new Comparator<Long>() {
@Override
public int compare(Long o1, Long o2) {
return o2.compareTo(o1);
}
};

@Override
public final List<S> sampleDataPoints(TimeWindow timeWindow, List<T> dataPoints) {
Map<Long, List<T>> dataPointPartitions = partitionDataPoints(dataPoints);

Map<Long, Map<Long, S>> sampledPointProjection = mapProjection(timeWindow, dataPointPartitions);

List<S> sampledDataPoints = new ArrayList<>(sampledPointProjection.size());
for (Map<Long, S> sampledPointCandidates : sampledPointProjection.values()) {
sampledDataPoints.add(reduceSampledPoints(sampledPointCandidates));
}
return sampledDataPoints;
}

/**
* Returns a map of data points partitioned by the start timestamp of the agent. This is mainly to distinguish
* between different agent life cycles, and prevent stats from being mixed up when there are multiple agents with
* the same agent id.
*
* @param dataPoints a list of data points to partition
* @return a map of data points partitioned by agent start timestamps
*/
private Map<Long, List<T>> partitionDataPoints(List<T> dataPoints) {
Map<Long, List<T>> dataPointPartitions = new HashMap<>();
for (T jvmGcBo : dataPoints) {
long startTimestamp = jvmGcBo.getStartTimestamp();
List<T> dataPointPartition = dataPointPartitions.get(startTimestamp);
if (dataPointPartition == null) {
dataPointPartition = new ArrayList<>(dataPoints.size());
dataPointPartitions.put(startTimestamp, dataPointPartition);
}
dataPointPartition.add(jvmGcBo);
}
return dataPointPartitions;
}

/**
* Collapses multiple projections of sampled data points into a single map with key sorted by timeslot timestamp,
* and values being sampled data points of different agent life cycles mapped by their agent start timestamp.
*
* @param timeWindow the TimeWindow used to create the timeslot map
* @param dataPointPartitions a map of data points partitioned by their agent start timestamp
* @return a map of timeslots with sampled data points mapped by their agent start timestamp as values
*/
private Map<Long, Map<Long, S>> mapProjection(TimeWindow timeWindow, Map<Long, List<T>> dataPointPartitions) {
Map<Long, Map<Long, S>> sampledPointProjection = new TreeMap<>();
for (Map.Entry<Long, List<T>> dataPointPartitionEntry : dataPointPartitions.entrySet()) {
Long startTimestamp = dataPointPartitionEntry.getKey();
List<T> dataPointPartition = dataPointPartitionEntry.getValue();
Map<Long, S> sampledDataPointPartition = sampleDataPointPartition(timeWindow, dataPointPartition);

for (Map.Entry<Long, S> e : sampledDataPointPartition.entrySet()) {
Long timeslotTimestamp = e.getKey();
S sampledDataPoint = e.getValue();
Map<Long, S> timeslotSampleEntry = sampledPointProjection.get(timeslotTimestamp);
if (timeslotSampleEntry == null) {
timeslotSampleEntry = new TreeMap<>(REVERSE_TIMESTAMP_COMPARATOR);
sampledPointProjection.put(timeslotTimestamp, timeslotSampleEntry);
}
timeslotSampleEntry.put(startTimestamp, sampledDataPoint);
}
}
return sampledPointProjection;
}

/**
* Returns a map of timeslot timestamps with sampled data points as value.
*
* @param timeWindow the TimeWindow used to create the timeslot map
* @param dataPoints a list of data points to sample
* @return a map of timeslots with sampled data points as values
*/
private Map<Long, S> sampleDataPointPartition(TimeWindow timeWindow, List<T> dataPoints) {
Map<Long, S> sampledDataPoints = new HashMap<>((int) timeWindow.getWindowRangeCount());
T previous;
List<T> currentBatch = new ArrayList<>();
int currentTimeWindowIndex = INITIAL_TIME_WINDOW_INDEX;
long currentTimeslotTimestamp = 0;
for (T dataPoint : dataPoints) {
long timestamp = dataPoint.getTimestamp();
int timeWindowIndex = timeWindow.getWindowIndex(timestamp);
if (currentTimeWindowIndex == INITIAL_TIME_WINDOW_INDEX || currentTimeWindowIndex == timeWindowIndex) {
currentBatch.add(dataPoint);
} else if (timeWindowIndex < currentTimeWindowIndex) {
previous = dataPoint;
// currentBatch shouldn't be empty at this point
S sampledBatch = sampleDataPoints(currentTimeslotTimestamp, currentBatch, previous);
sampledDataPoints.put(currentTimeslotTimestamp, sampledBatch);
currentBatch = new ArrayList<>(currentBatch.size());
currentBatch.add(dataPoint);
} else {
// Results should be sorted in a descending order of their actual timestamp values
// as they are stored using reverse timestamp.
throw new IllegalStateException("Out of order AgentStatDataPoint");
}
currentTimeslotTimestamp = timeWindow.refineTimestamp(timestamp);
currentTimeWindowIndex = timeWindowIndex;
}
if (!currentBatch.isEmpty()) {
S sampledBatch = sampleDataPoints(currentTimeslotTimestamp, currentBatch, null);
sampledDataPoints.put(currentTimeslotTimestamp, sampledBatch);
}
return sampledDataPoints;
}

/**
* Returns the sampled data point of the most recently started agent out of multiple candidates.
*
* @param sampledPointCandidates a map of sampled data points to reduce mapped by their agent start timestamp
* @return sampled data point of the most recently started agent
*/
protected S reduceSampledPoints(Map<Long, S> sampledPointCandidates) {
long latestStartTimestamp = -1L;
S sampledPointToUse = null;
for (Map.Entry<Long, S> e : sampledPointCandidates.entrySet()) {
long startTimestamp = e.getKey();
if (startTimestamp > latestStartTimestamp) {
latestStartTimestamp = startTimestamp;
sampledPointToUse = e.getValue();
}
}
return sampledPointToUse;
}

protected abstract S sampleDataPoints(long timestamp, List<T> dataPoints, T previousDataPoint);
}
Expand Up @@ -36,7 +36,7 @@
* @author HyunGil Jeong
*/
@Component
public class ActiveTraceSampler implements AgentStatSampler<ActiveTraceBo, SampledActiveTrace> {
public class ActiveTraceSampler extends AbstractAgentStatSampler<ActiveTraceBo, SampledActiveTrace> {

public static final DownSampler<Integer> INTEGER_DOWN_SAMPLER = DownSamplers.getIntegerDownSampler(ActiveTraceBo.UNCOLLECTED_ACTIVE_TRACE_COUNT);

Expand Down
@@ -1,6 +1,7 @@
package com.navercorp.pinpoint.web.mapper.stat;

import com.navercorp.pinpoint.common.server.bo.stat.AgentStatDataPoint;
import com.navercorp.pinpoint.web.util.TimeWindow;
import com.navercorp.pinpoint.web.vo.stat.SampledAgentStatDataPoint;

import java.util.List;
Expand All @@ -10,5 +11,5 @@
*/
public interface AgentStatSampler<T extends AgentStatDataPoint, S extends SampledAgentStatDataPoint> {

S sampleDataPoints(long timestamp, List<T> dataPoints, T previousDataPoint);
List<S> sampleDataPoints(TimeWindow timeWindow, List<T> dataPoints);
}
Expand Up @@ -31,7 +31,7 @@
* @author HyunGil Jeong
*/
@Component
public class CpuLoadSampler implements AgentStatSampler<CpuLoadBo, SampledCpuLoad> {
public class CpuLoadSampler extends AbstractAgentStatSampler<CpuLoadBo, SampledCpuLoad> {

private static final int NUM_DECIMAL_PLACES = 1;
public static final DownSampler<Double> DOUBLE_DOWN_SAMPLER = DownSamplers.getDoubleDownSampler(CpuLoadBo.UNCOLLECTED_VALUE, NUM_DECIMAL_PLACES);
Expand Down
Expand Up @@ -31,7 +31,7 @@
* @author HyunGil Jeong
*/
@Component
public class JvmGcDetailedSampler implements AgentStatSampler<JvmGcDetailedBo, SampledJvmGcDetailed> {
public class JvmGcDetailedSampler extends AbstractAgentStatSampler<JvmGcDetailedBo, SampledJvmGcDetailed> {

private static final int NUM_DECIMAL_PLACES = 1;
public static final DownSampler<Long> LONG_DOWN_SAMPLER = DownSamplers.getLongDownSampler(JvmGcDetailedBo.UNCOLLECTED_VALUE);
Expand Down
Expand Up @@ -32,7 +32,7 @@
* @author HyunGil Jeong
*/
@Component
public class JvmGcSampler implements AgentStatSampler<JvmGcBo, SampledJvmGc> {
public class JvmGcSampler extends AbstractAgentStatSampler<JvmGcBo, SampledJvmGc> {

public static final DownSampler<Long> LONG_DOWN_SAMPLER = DownSamplers.getLongDownSampler(JvmGcBo.UNCOLLECTED_VALUE);

Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


Expand All @@ -32,12 +33,9 @@
*/
public class SampledAgentStatResultExtractor<T extends AgentStatDataPoint, S extends SampledAgentStatDataPoint> implements ResultsExtractor<List<S>> {

private static final int INITIAL_TIME_WINDOW_INDEX = -1;

private final TimeWindow timeWindow;
private final AgentStatMapper<T> rowMapper;
private final AgentStatSampler<T, S> sampler;
private final List<S> sampledDataPoints;

public SampledAgentStatResultExtractor(TimeWindow timeWindow, AgentStatMapper<T> rowMapper, AgentStatSampler<T, S> sampler) {
if (timeWindow.getWindowRangeCount() > Integer.MAX_VALUE) {
Expand All @@ -46,48 +44,22 @@ public SampledAgentStatResultExtractor(TimeWindow timeWindow, AgentStatMapper<T>
this.timeWindow = timeWindow;
this.rowMapper = rowMapper;
this.sampler = sampler;
this.sampledDataPoints = new ArrayList<>((int) timeWindow.getWindowRangeCount());
}

@Override
public List<S> extractData(ResultScanner results) throws Exception {
int rowNum = 0;
// Sample straight away, tossing out already sampled data points so they can be garbage collected
// as soon as possible.
// This is mainly important when querying over a long period of time where sampling after all the data has been
// deserialized would consume too much memory.
T previous;
List<T> currentBatch = new ArrayList<>();
int currentTimeWindowIndex = INITIAL_TIME_WINDOW_INDEX;
long currentTimeslotTimestamp = 0;
List<T> aggregatedDataPoints = new ArrayList<>();
for (Result result : results) {
List<T> dataPoints = this.rowMapper.mapRow(result, rowNum++);
for (T dataPoint : dataPoints) {
long timestamp = dataPoint.getTimestamp();
int timeWindowIndex = this.timeWindow.getWindowIndex(timestamp);
if (currentTimeWindowIndex == INITIAL_TIME_WINDOW_INDEX || currentTimeWindowIndex == timeWindowIndex) {
currentBatch.add(dataPoint);
} else if (timeWindowIndex < currentTimeWindowIndex) {
previous = dataPoint;
// currentBatch shouldn't be empty at this point
S sampledBatch = sampler.sampleDataPoints(currentTimeslotTimestamp, currentBatch, previous);
this.sampledDataPoints.add(sampledBatch);
currentBatch = new ArrayList<>();
currentBatch.add(dataPoint);
} else {
// Results should be sorted in a descending order of their actual timestamp values
// as they are stored using reverse timestamp.
throw new IllegalStateException("Out of order AgentStatDataPoint");
}
currentTimeslotTimestamp = this.timeWindow.refineTimestamp(timestamp);
currentTimeWindowIndex = timeWindowIndex;
}
aggregatedDataPoints.addAll(this.rowMapper.mapRow(result, rowNum++));
}
if (!currentBatch.isEmpty()) {
S sampledBatch = sampler.sampleDataPoints(currentTimeslotTimestamp, currentBatch, null);
sampledDataPoints.add(sampledBatch);
List<S> sampledDataPoints;
if (aggregatedDataPoints.isEmpty()) {
sampledDataPoints = Collections.emptyList();
} else {
sampledDataPoints = sampler.sampleDataPoints(timeWindow, aggregatedDataPoints);
}
return this.sampledDataPoints;
return sampledDataPoints;
}

}
Expand Up @@ -32,7 +32,7 @@
* @author HyunGil Jeong
*/
@Component
public class TransactionSampler implements AgentStatSampler<TransactionBo, SampledTransaction> {
public class TransactionSampler extends AbstractAgentStatSampler<TransactionBo, SampledTransaction> {

private static final double UNCOLLECTED_TPS = -1D;
private static final int NUM_DECIMAL_PLACES = 1;
Expand Down
Expand Up @@ -175,7 +175,7 @@ private List<TestAgentStatDataPoint> createDataPoints(long finalTimestamp, long
return dataPoints;
}

private static class TestAgentStatSampler implements AgentStatSampler<TestAgentStatDataPoint, TestSampledAgentStatDataPoint> {
private static class TestAgentStatSampler extends AbstractAgentStatSampler<TestAgentStatDataPoint, TestSampledAgentStatDataPoint> {

@Override
public TestSampledAgentStatDataPoint sampleDataPoints(long timestamp, List<TestAgentStatDataPoint> dataPoints, TestAgentStatDataPoint previousDataPoint) {
Expand Down

0 comments on commit 4e4b675

Please sign in to comment.