Skip to content

Commit

Permalink
[#9254] Add Snapshot manager & Testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 18, 2022
1 parent c751d9d commit b755608
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.navercorp.pinpoint.profiler.context.grpc;

import com.navercorp.pinpoint.bootstrap.pair.NameValuePair;
import com.navercorp.pinpoint.common.trace.UriStatHistogramBucket;
import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
import com.navercorp.pinpoint.grpc.trace.PEachUriStat;
Expand All @@ -25,9 +24,9 @@
import com.navercorp.pinpoint.profiler.monitor.metric.MetricType;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.EachUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.URIKey;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.UriStatHistogram;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

Expand All @@ -52,10 +51,10 @@ private PAgentUriStat createPAgentUriStat(AgentUriStatData agentUriStatData) {
PAgentUriStat.Builder builder = PAgentUriStat.newBuilder();
builder.setBucketVersion(layout.getBucketVersion());

Set<Map.Entry<NameValuePair<String, Long>, EachUriStatData>> allUriStatData = agentUriStatData.getAllUriStatData();
Set<Map.Entry<URIKey, EachUriStatData>> allUriStatData = agentUriStatData.getAllUriStatData();

for (Map.Entry<NameValuePair<String, Long>, EachUriStatData> eachUriStatData : allUriStatData) {
builder.setTimestamp(eachUriStatData.getKey().getValue());
for (Map.Entry<URIKey, EachUriStatData> eachUriStatData : allUriStatData) {
builder.setTimestamp(eachUriStatData.getKey().getTimestamp());
PEachUriStat pEachUriStat = createPEachUriStat(eachUriStatData.getValue());
builder.addEachUriStat(pEachUriStat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,46 +77,42 @@ protected void pollTimeout(long timeout) {
executorListener.executePollTimeout();
}

private static class ExecutorListener implements AsyncQueueingExecutorListener<UriStatInfo> {
static class ExecutorListener implements AsyncQueueingExecutorListener<UriStatInfo> {

private static final int DEFAULT_COLLECT_INTERVAL = 30000; // 30s

private static final int SNAPSHOT_LIMIT = 4;
private final int uriStatDataLimitSize;

private final Clock clock;
private final Queue<AgentUriStatData> snapshotQueue;

private final Snapshot<AgentUriStatData> snapshotManager;

private AgentUriStatData currentAgentUriStatData;

public ExecutorListener(int uriStatDataLimitSize) {
this(uriStatDataLimitSize, DEFAULT_COLLECT_INTERVAL);
}

public ExecutorListener(int uriStatDataLimitSize, int collectInterval) {
Assert.isTrue(uriStatDataLimitSize > 0, "uriStatDataLimitSize must be ' > 0'");
this.uriStatDataLimitSize = uriStatDataLimitSize;

Assert.isTrue(collectInterval > 0, "collectInterval must be ' > 0'");
this.clock = Clock.tick(collectInterval);

this.snapshotQueue = new ConcurrentLinkedQueue<>();
this.snapshotManager = new Snapshot<>(value -> new AgentUriStatData(value, uriStatDataLimitSize));
}

@Override
public void execute(Collection<UriStatInfo> messageList) {
final long currentBaseTimestamp = clock.millis();
checkAndFlushOldData(currentBaseTimestamp);

AgentUriStatData agentUriStatData = getCurrent(currentBaseTimestamp);
AgentUriStatData agentUriStatData = snapshotManager.getCurrent(currentBaseTimestamp);

Object[] dataList = messageList.toArray();
for (int i = 0; i < CollectionUtils.nullSafeSize(messageList); i++) {
try {
agentUriStatData.add((UriStatInfo) dataList[i]);
} catch (Throwable th) {
LOGGER.warn("Unexpected Error. Cause:{}", th.getMessage(), th);
}
addUriData(agentUriStatData, (UriStatInfo) dataList[i]);
}
}

Expand All @@ -125,10 +121,13 @@ public void execute(UriStatInfo message) {
long currentBaseTimestamp = clock.millis();
checkAndFlushOldData(currentBaseTimestamp);

AgentUriStatData agentUriStatData = getCurrent(currentBaseTimestamp);
final boolean overflow = agentUriStatData.add(message);
if (overflow) {
TLogger.info("Too many URI pattern. sample-message:{}, capacity:{}, counter:{} ", message, agentUriStatData.getCapacity(), TLogger.getCounter());
AgentUriStatData agentUriStatData = snapshotManager.getCurrent(currentBaseTimestamp);
addUriData(agentUriStatData, message);
}

private void addUriData(AgentUriStatData agentUriStatData, UriStatInfo uriStatInfo) {
if (!agentUriStatData.add(uriStatInfo)) {
TLogger.info("Too many URI pattern. sample-message:{}, capacity:{}, counter:{} ", uriStatInfo, agentUriStatData.getCapacity(), TLogger.getCounter());
}
}

Expand All @@ -139,34 +138,14 @@ public void executePollTimeout() {


private boolean checkAndFlushOldData(long currentBaseTimestamp) {
final AgentUriStatData snapshot = getSnapshot(currentBaseTimestamp);
final AgentUriStatData snapshot = snapshotManager.takeSnapshot(currentBaseTimestamp, AgentUriStatData::getBaseTimestamp);
if (snapshot != null) {
addCompletedData(snapshot);
return true;
}
return false;
}

private AgentUriStatData getSnapshot(long currentBaseTimestamp) {
if (currentAgentUriStatData == null) {
return null;
}

if (currentBaseTimestamp > currentAgentUriStatData.getBaseTimestamp()) {
AgentUriStatData snapshot = currentAgentUriStatData;
currentAgentUriStatData = null;
return snapshot;
}
return null;
}

private AgentUriStatData getCurrent(long currentBaseTimestamp) {
if (currentAgentUriStatData == null) {
currentAgentUriStatData = new AgentUriStatData(currentBaseTimestamp, uriStatDataLimitSize);
}
return currentAgentUriStatData;
}


private void addCompletedData(AgentUriStatData agentUriStatData) {
// Thread safety : single consumer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.navercorp.pinpoint.profiler.context.storage;

import java.util.Objects;
import java.util.function.LongFunction;
import java.util.function.ToLongFunction;

/**
* @author Woonduk Kang(emeroad)
*/
public class Snapshot<T> {

private T currentImage;
private final LongFunction<T> instanceFactory;

public Snapshot(LongFunction<T> instanceFactory) {
this.instanceFactory = Objects.requireNonNull(instanceFactory, "instanceFactory");
}

public T takeSnapshot(long currentBaseTimestamp, ToLongFunction<T> currentImageTimestamp) {
if (currentImage == null) {
return null;
}

if (currentBaseTimestamp > currentImageTimestamp.applyAsLong(currentImage)) {
T snapshot = this.currentImage;
this.currentImage = null;
return snapshot;
}
return null;
}

public T getCurrent(long currentBaseTimestamp) {
if (currentImage == null) {
currentImage = instanceFactory.apply(currentBaseTimestamp);
}
return currentImage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

package com.navercorp.pinpoint.profiler.monitor.metric.uri;

import com.navercorp.pinpoint.bootstrap.pair.NameValuePair;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.storage.AsyncQueueingUriStatStorage;
import com.navercorp.pinpoint.profiler.monitor.metric.MetricType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,11 +27,10 @@
* @author Taejin Koo
*/
public class AgentUriStatData implements MetricType {
private static final Logger LOGGER = LogManager.getLogger(AsyncQueueingUriStatStorage.class);
private final int capacity;
private final long baseTimestamp;

private final Map<NameValuePair<String, Long>, EachUriStatData> eachUriStatDataMap = new HashMap<>();
private final Map<URIKey, EachUriStatData> eachUriStatDataMap = new HashMap<>();

public AgentUriStatData(long baseTimestamp, int capacity) {
Assert.isTrue(capacity > 0, "capacity must be ` > 0`");
Expand All @@ -53,26 +48,29 @@ public long getBaseTimestamp() {
}

public boolean add(UriStatInfo uriStatInfo) {
String uri = uriStatInfo.getUri();

if (eachUriStatDataMap.size() > this.capacity) {
LOGGER.warn("Number of URIs already collected met the capacity of " + this.capacity + ". Ignoring URI " + uri);
if (eachUriStatDataMap.size() >= this.capacity) {
return false;
}

NameValuePair key = new NameValuePair(uri, uriStatInfo.getEndTime());
URIKey key = newURIKey(uriStatInfo);

EachUriStatData eachUriStatData = eachUriStatDataMap.get(key);
if (eachUriStatData == null) {
eachUriStatData = new EachUriStatData(uri);
eachUriStatData = new EachUriStatData(key.getUri());
eachUriStatDataMap.put(key, eachUriStatData);
}

eachUriStatData.add(uriStatInfo);
return true;
}

public Set<Map.Entry<NameValuePair<String, Long>, EachUriStatData>> getAllUriStatData() {
private URIKey newURIKey(UriStatInfo uriStatInfo) {
String uri = uriStatInfo.getUri();
long endTime = uriStatInfo.getEndTime();
return new URIKey(uri, endTime);
}

public Set<Map.Entry<URIKey, EachUriStatData>> getAllUriStatData() {
return eachUriStatDataMap.entrySet();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.navercorp.pinpoint.profiler.monitor.metric.uri;

import java.util.Objects;

/**
* @author Woonduk Kang(emeroad)
*/
public class URIKey {
private final String uri;
private final long timestamp;

public URIKey(String uri, long timestamp) {
this.uri = uri;
this.timestamp = timestamp;
}

public String getUri() {
return uri;
}

public long getTimestamp() {
return timestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

URIKey uriKey = (URIKey) o;

if (timestamp != uriKey.timestamp) return false;
return Objects.equals(uri, uriKey.uri);
}

@Override
public int hashCode() {
int result = uri != null ? uri.hashCode() : 0;
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
return result;
}

@Override
public String toString() {
return "URIKey{" +
"uri='" + uri + '\'' +
", timestamp=" + timestamp +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package com.navercorp.pinpoint.profiler.context.storage;

import com.google.common.util.concurrent.Uninterruptibles;
import com.navercorp.pinpoint.bootstrap.pair.NameValuePair;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.EachUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.URIKey;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.UriStatHistogram;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand All @@ -41,8 +41,13 @@ public class AsyncQueueingUriStatStorageTest {
@Test
public void storageTest() {
int collectInterval = 100;
int storeCount = RANDOM.nextInt(5) + 1;
storageTest(collectInterval, 1);
storageTest(collectInterval, 2);
storageTest(collectInterval, 3);
storageTest(collectInterval, 4);
}

private void storageTest(int collectInterval, int storeCount) {
AsyncQueueingUriStatStorage storage = null;
try {
storage = new AsyncQueueingUriStatStorage(5012, 3, "Test-Executor", collectInterval);
Expand All @@ -64,10 +69,13 @@ public void storageTest() {
AgentUriStatData poll = storage.poll();
Assertions.assertNotNull(poll);

Set<Map.Entry<NameValuePair<String, Long>, EachUriStatData>> allUriStatData = poll.getAllUriStatData();
for (Map.Entry<NameValuePair<String, Long>, EachUriStatData> eachUriStatData : allUriStatData) {
storeCount -= eachUriStatData.getValue().getTotalHistogram().getCount();
}
Set<Map.Entry<URIKey, EachUriStatData>> allUriStatData = poll.getAllUriStatData();
storeCount -= allUriStatData
.stream()
.map(Map.Entry::getValue)
.map(EachUriStatData::getTotalHistogram)
.mapToLong(UriStatHistogram::getCount)
.sum();

Assertions.assertEquals(0, storeCount);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.navercorp.pinpoint.profiler.monitor.metric.uri;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

/**
* @author Woonduk Kang(emeroad)
*/
public class AgentUriStatDataTest {

@Test
public void add_capacity() {
long baseTimestamp = System.currentTimeMillis();
AgentUriStatData metric = new AgentUriStatData(baseTimestamp, 1);

UriStatInfo test = new UriStatInfo("test", true, 10, baseTimestamp);

Assertions.assertTrue(metric.add(test));
Assertions.assertFalse(metric.add(test));
}

@Test
public void add_pattern() {
long baseTimestamp = System.currentTimeMillis();
AgentUriStatData metric = new AgentUriStatData(baseTimestamp, 10);

UriStatInfo pattern1 = new UriStatInfo("pattern1", true, 10, baseTimestamp);
metric.add(pattern1);
UriStatInfo pattern1_1 = new UriStatInfo("pattern1", true, 10, baseTimestamp);
metric.add(pattern1_1);

UriStatInfo pattern2 = new UriStatInfo("pattern2", true, 10, baseTimestamp);
metric.add(pattern2);

Assertions.assertEquals(2, metric.getAllUriStatData().size());

Optional<EachUriStatData> findPattern1 = metric.getAllUriStatData()
.stream()
.map(Map.Entry::getValue)
.filter(eachUriStatData -> eachUriStatData.getUri().equals("pattern1"))
.findFirst();
Assertions.assertEquals(2, findPattern1.get().getTotalHistogram().getCount());

}
}

0 comments on commit b755608

Please sign in to comment.