diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/stat/v2/HbaseAgentStatDaoOperationsV2.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/stat/v2/HbaseAgentStatDaoOperationsV2.java index 333daed9f9fc..490eb9682bd0 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/stat/v2/HbaseAgentStatDaoOperationsV2.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/stat/v2/HbaseAgentStatDaoOperationsV2.java @@ -86,7 +86,7 @@ boolean agentStatExists(AgentStatType agentStatTy logger.debug("checking for stat data existence : agentId={}, {}", agentId, range); } - int resultLimit = 1; + int resultLimit = 20; Scan scan = this.createScan(agentStatType, agentId, range, resultLimit); List> result = hbaseOperations2.findParallel(HBaseTables.AGENT_STAT_VER2, scan, this.operationFactory.getRowKeyDistributor(), resultLimit, mapper, AGENT_STAT_VER2_NUM_PARTITIONS); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AbstractAgentStatSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AbstractAgentStatSampler.java new file mode 100644 index 000000000000..0e19f9366d76 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AbstractAgentStatSampler.java @@ -0,0 +1,151 @@ +package com.navercorp.pinpoint.web.mapper.stat; + +import com.navercorp.pinpoint.common.server.bo.stat.AgentStatDataPoint; +import com.navercorp.pinpoint.web.util.TimeWindow; +import com.navercorp.pinpoint.web.vo.stat.SampledAgentStatDataPoint; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * @author HyunGil Jeong + */ +public abstract class AbstractAgentStatSampler implements AgentStatSampler { + + private static final int INITIAL_TIME_WINDOW_INDEX = -1; + private static final Comparator REVERSE_TIMESTAMP_COMPARATOR = new Comparator() { + @Override + public int compare(Long o1, Long o2) { + return o2.compareTo(o1); + } + }; + + @Override + public final List sampleDataPoints(TimeWindow timeWindow, List dataPoints) { + Map> dataPointPartitions = partitionDataPoints(dataPoints); + + Map> sampledPointProjection = mapProjection(timeWindow, dataPointPartitions); + + List sampledDataPoints = new ArrayList<>(sampledPointProjection.size()); + for (Map sampledPointCandidates : sampledPointProjection.values()) { + sampledDataPoints.add(reduceSampledPoints(sampledPointCandidates)); + } + return sampledDataPoints; + } + + /** + * Returns a map of data points partitioned by the start timestamp of the agent. This is mainly to distinguish + * between different agent life cycles, and prevent stats from being mixed up when there are multiple agents with + * the same agent id. + * + * @param dataPoints a list of data points to partition + * @return a map of data points partitioned by agent start timestamps + */ + private Map> partitionDataPoints(List dataPoints) { + Map> dataPointPartitions = new HashMap<>(); + for (T jvmGcBo : dataPoints) { + long startTimestamp = jvmGcBo.getStartTimestamp(); + List dataPointPartition = dataPointPartitions.get(startTimestamp); + if (dataPointPartition == null) { + dataPointPartition = new ArrayList<>(dataPoints.size()); + dataPointPartitions.put(startTimestamp, dataPointPartition); + } + dataPointPartition.add(jvmGcBo); + } + return dataPointPartitions; + } + + /** + * Collapses multiple projections of sampled data points into a single map with key sorted by timeslot timestamp, + * and values being sampled data points of different agent life cycles mapped by their agent start timestamp. + * + * @param timeWindow the TimeWindow used to create the timeslot map + * @param dataPointPartitions a map of data points partitioned by their agent start timestamp + * @return a map of timeslots with sampled data points mapped by their agent start timestamp as values + */ + private Map> mapProjection(TimeWindow timeWindow, Map> dataPointPartitions) { + Map> sampledPointProjection = new TreeMap<>(); + for (Map.Entry> dataPointPartitionEntry : dataPointPartitions.entrySet()) { + Long startTimestamp = dataPointPartitionEntry.getKey(); + List dataPointPartition = dataPointPartitionEntry.getValue(); + Map sampledDataPointPartition = sampleDataPointPartition(timeWindow, dataPointPartition); + + for (Map.Entry e : sampledDataPointPartition.entrySet()) { + Long timeslotTimestamp = e.getKey(); + S sampledDataPoint = e.getValue(); + Map timeslotSampleEntry = sampledPointProjection.get(timeslotTimestamp); + if (timeslotSampleEntry == null) { + timeslotSampleEntry = new TreeMap<>(REVERSE_TIMESTAMP_COMPARATOR); + sampledPointProjection.put(timeslotTimestamp, timeslotSampleEntry); + } + timeslotSampleEntry.put(startTimestamp, sampledDataPoint); + } + } + return sampledPointProjection; + } + + /** + * Returns a map of timeslot timestamps with sampled data points as value. + * + * @param timeWindow the TimeWindow used to create the timeslot map + * @param dataPoints a list of data points to sample + * @return a map of timeslots with sampled data points as values + */ + private Map sampleDataPointPartition(TimeWindow timeWindow, List dataPoints) { + Map sampledDataPoints = new HashMap<>((int) timeWindow.getWindowRangeCount()); + T previous; + List currentBatch = new ArrayList<>(); + int currentTimeWindowIndex = INITIAL_TIME_WINDOW_INDEX; + long currentTimeslotTimestamp = 0; + for (T dataPoint : dataPoints) { + long timestamp = dataPoint.getTimestamp(); + int timeWindowIndex = timeWindow.getWindowIndex(timestamp); + if (currentTimeWindowIndex == INITIAL_TIME_WINDOW_INDEX || currentTimeWindowIndex == timeWindowIndex) { + currentBatch.add(dataPoint); + } else if (timeWindowIndex < currentTimeWindowIndex) { + previous = dataPoint; + // currentBatch shouldn't be empty at this point + S sampledBatch = sampleDataPoints(currentTimeslotTimestamp, currentBatch, previous); + sampledDataPoints.put(currentTimeslotTimestamp, sampledBatch); + currentBatch = new ArrayList<>(currentBatch.size()); + currentBatch.add(dataPoint); + } else { + // Results should be sorted in a descending order of their actual timestamp values + // as they are stored using reverse timestamp. + throw new IllegalStateException("Out of order AgentStatDataPoint"); + } + currentTimeslotTimestamp = timeWindow.refineTimestamp(timestamp); + currentTimeWindowIndex = timeWindowIndex; + } + if (!currentBatch.isEmpty()) { + S sampledBatch = sampleDataPoints(currentTimeslotTimestamp, currentBatch, null); + sampledDataPoints.put(currentTimeslotTimestamp, sampledBatch); + } + return sampledDataPoints; + } + + /** + * Returns the sampled data point of the most recently started agent out of multiple candidates. + * + * @param sampledPointCandidates a map of sampled data points to reduce mapped by their agent start timestamp + * @return sampled data point of the most recently started agent + */ + protected S reduceSampledPoints(Map sampledPointCandidates) { + long latestStartTimestamp = -1L; + S sampledPointToUse = null; + for (Map.Entry e : sampledPointCandidates.entrySet()) { + long startTimestamp = e.getKey(); + if (startTimestamp > latestStartTimestamp) { + latestStartTimestamp = startTimestamp; + sampledPointToUse = e.getValue(); + } + } + return sampledPointToUse; + } + + protected abstract S sampleDataPoints(long timestamp, List dataPoints, T previousDataPoint); +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/ActiveTraceSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/ActiveTraceSampler.java index 673cbbabfb0a..306d1474062d 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/ActiveTraceSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/ActiveTraceSampler.java @@ -36,7 +36,7 @@ * @author HyunGil Jeong */ @Component -public class ActiveTraceSampler implements AgentStatSampler { +public class ActiveTraceSampler extends AbstractAgentStatSampler { public static final DownSampler INTEGER_DOWN_SAMPLER = DownSamplers.getIntegerDownSampler(ActiveTraceBo.UNCOLLECTED_ACTIVE_TRACE_COUNT); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AgentStatSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AgentStatSampler.java index fcb2650c8244..5f31e46bc798 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AgentStatSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/AgentStatSampler.java @@ -1,6 +1,7 @@ package com.navercorp.pinpoint.web.mapper.stat; import com.navercorp.pinpoint.common.server.bo.stat.AgentStatDataPoint; +import com.navercorp.pinpoint.web.util.TimeWindow; import com.navercorp.pinpoint.web.vo.stat.SampledAgentStatDataPoint; import java.util.List; @@ -10,5 +11,5 @@ */ public interface AgentStatSampler { - S sampleDataPoints(long timestamp, List dataPoints, T previousDataPoint); + List sampleDataPoints(TimeWindow timeWindow, List dataPoints); } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/CpuLoadSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/CpuLoadSampler.java index 4d5afdf7d2b7..58c1d11b40cb 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/CpuLoadSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/CpuLoadSampler.java @@ -31,7 +31,7 @@ * @author HyunGil Jeong */ @Component -public class CpuLoadSampler implements AgentStatSampler { +public class CpuLoadSampler extends AbstractAgentStatSampler { private static final int NUM_DECIMAL_PLACES = 1; public static final DownSampler DOUBLE_DOWN_SAMPLER = DownSamplers.getDoubleDownSampler(CpuLoadBo.UNCOLLECTED_VALUE, NUM_DECIMAL_PLACES); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcDetailedSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcDetailedSampler.java index 414793d36a53..1125520bbe84 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcDetailedSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcDetailedSampler.java @@ -31,7 +31,7 @@ * @author HyunGil Jeong */ @Component -public class JvmGcDetailedSampler implements AgentStatSampler { +public class JvmGcDetailedSampler extends AbstractAgentStatSampler { private static final int NUM_DECIMAL_PLACES = 1; public static final DownSampler LONG_DOWN_SAMPLER = DownSamplers.getLongDownSampler(JvmGcDetailedBo.UNCOLLECTED_VALUE); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcSampler.java index 68162d82abc0..f75c9c45784f 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/JvmGcSampler.java @@ -32,7 +32,7 @@ * @author HyunGil Jeong */ @Component -public class JvmGcSampler implements AgentStatSampler { +public class JvmGcSampler extends AbstractAgentStatSampler { public static final DownSampler LONG_DOWN_SAMPLER = DownSamplers.getLongDownSampler(JvmGcBo.UNCOLLECTED_VALUE); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractor.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractor.java index 34ee8517dbf5..de8eab93a397 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractor.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @@ -32,12 +33,9 @@ */ public class SampledAgentStatResultExtractor implements ResultsExtractor> { - private static final int INITIAL_TIME_WINDOW_INDEX = -1; - private final TimeWindow timeWindow; private final AgentStatMapper rowMapper; private final AgentStatSampler sampler; - private final List sampledDataPoints; public SampledAgentStatResultExtractor(TimeWindow timeWindow, AgentStatMapper rowMapper, AgentStatSampler sampler) { if (timeWindow.getWindowRangeCount() > Integer.MAX_VALUE) { @@ -46,48 +44,22 @@ public SampledAgentStatResultExtractor(TimeWindow timeWindow, AgentStatMapper this.timeWindow = timeWindow; this.rowMapper = rowMapper; this.sampler = sampler; - this.sampledDataPoints = new ArrayList<>((int) timeWindow.getWindowRangeCount()); } @Override public List extractData(ResultScanner results) throws Exception { int rowNum = 0; - // Sample straight away, tossing out already sampled data points so they can be garbage collected - // as soon as possible. - // This is mainly important when querying over a long period of time where sampling after all the data has been - // deserialized would consume too much memory. - T previous; - List currentBatch = new ArrayList<>(); - int currentTimeWindowIndex = INITIAL_TIME_WINDOW_INDEX; - long currentTimeslotTimestamp = 0; + List aggregatedDataPoints = new ArrayList<>(); for (Result result : results) { - List dataPoints = this.rowMapper.mapRow(result, rowNum++); - for (T dataPoint : dataPoints) { - long timestamp = dataPoint.getTimestamp(); - int timeWindowIndex = this.timeWindow.getWindowIndex(timestamp); - if (currentTimeWindowIndex == INITIAL_TIME_WINDOW_INDEX || currentTimeWindowIndex == timeWindowIndex) { - currentBatch.add(dataPoint); - } else if (timeWindowIndex < currentTimeWindowIndex) { - previous = dataPoint; - // currentBatch shouldn't be empty at this point - S sampledBatch = sampler.sampleDataPoints(currentTimeslotTimestamp, currentBatch, previous); - this.sampledDataPoints.add(sampledBatch); - currentBatch = new ArrayList<>(); - currentBatch.add(dataPoint); - } else { - // Results should be sorted in a descending order of their actual timestamp values - // as they are stored using reverse timestamp. - throw new IllegalStateException("Out of order AgentStatDataPoint"); - } - currentTimeslotTimestamp = this.timeWindow.refineTimestamp(timestamp); - currentTimeWindowIndex = timeWindowIndex; - } + aggregatedDataPoints.addAll(this.rowMapper.mapRow(result, rowNum++)); } - if (!currentBatch.isEmpty()) { - S sampledBatch = sampler.sampleDataPoints(currentTimeslotTimestamp, currentBatch, null); - sampledDataPoints.add(sampledBatch); + List sampledDataPoints; + if (aggregatedDataPoints.isEmpty()) { + sampledDataPoints = Collections.emptyList(); + } else { + sampledDataPoints = sampler.sampleDataPoints(timeWindow, aggregatedDataPoints); } - return this.sampledDataPoints; + return sampledDataPoints; } } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/TransactionSampler.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/TransactionSampler.java index a8a420ec92c6..a14320f3326b 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/TransactionSampler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/stat/TransactionSampler.java @@ -32,7 +32,7 @@ * @author HyunGil Jeong */ @Component -public class TransactionSampler implements AgentStatSampler { +public class TransactionSampler extends AbstractAgentStatSampler { private static final double UNCOLLECTED_TPS = -1D; private static final int NUM_DECIMAL_PLACES = 1; diff --git a/web/src/test/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractorTest.java b/web/src/test/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractorTest.java index e0e05bf02606..e438b0c52796 100644 --- a/web/src/test/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractorTest.java +++ b/web/src/test/java/com/navercorp/pinpoint/web/mapper/stat/SampledAgentStatResultExtractorTest.java @@ -175,7 +175,7 @@ private List createDataPoints(long finalTimestamp, long return dataPoints; } - private static class TestAgentStatSampler implements AgentStatSampler { + private static class TestAgentStatSampler extends AbstractAgentStatSampler { @Override public TestSampledAgentStatDataPoint sampleDataPoints(long timestamp, List dataPoints, TestAgentStatDataPoint previousDataPoint) {