Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1052: Memory leak in RestRequestCache #1167

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 9 additions & 4 deletions bot/src/main/java/org/openjdk/skara/bot/BotRunner.java
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;

import com.sun.net.httpserver.*;
import org.openjdk.skara.network.RestRequest;

class BotRunnerError extends RuntimeException {
BotRunnerError(String msg) {
Expand Down Expand Up @@ -325,10 +326,14 @@ public void run(Duration timeout) {
}

isReady = true;
executor.scheduleAtFixedRate(this::itemWatchdog, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);

var schedulingInterval = config.scheduledExecutionPeriod().toMillis();
executor.scheduleAtFixedRate(this::itemWatchdog, 0, schedulingInterval, TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0, schedulingInterval, TimeUnit.MILLISECONDS);

var cacheEvictionInterval = config.cacheEvictionInterval().toMillis();
executor.scheduleAtFixedRate(RestRequest::evictOldCacheData, cacheEvictionInterval,
cacheEvictionInterval, TimeUnit.MILLISECONDS);

try {
executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -319,6 +319,20 @@ Duration scheduledExecutionPeriod() {
}
}

/**
* The amount of time to wait between runs of the RestResponseCache evictions.
* @return
*/
Duration cacheEvictionInterval() {
if (!config.contains("runner") || !config.get("runner").contains("cache_eviction_interval")) {
var defaultValue = Duration.ofMinutes(5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you done any testing to see if a default interval of 5 minutes is the right frequency for cache evictions? Naively, it seems like it might be too often. Is there a performance impact?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Kevin, I think we could probably keep at least 60 minutes of cache, if not even more. Or I might have missed something 🤷😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. In my initial patch I used the scheduling frequency (which is what the watchdog uses) instead of this new configuration value, which is 10 seconds, and it worked fine on staging. I thought that was a bit excessive so I increased to a default of 5 mins, which matches the lowest "maxAge" in the cache.

While we could certainly live with a longer interval here, 24h would still solve our problems, I don't think we have much to gain from increasing it longer than my suggested 5 mins. The nature of the bots is that they run pretty much non stop, using a lot of cpu. I can't see how running this little method will have any noticeable impact on performance, even if the cache has thousands or even millions of valid entries. It shouldn't take more than milliseconds on one of 8-32 threads (depending on bot runner config).

If we start seeing problems, this value can always be changed in the future, or configured for any specific bot runner configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just to clarify as I realized there was a misunderstanding in offline discussion. This patch is not changing the functional behavior of the cache in any way. The entries that are being removed are just those older than "maxAge", which were already disqualified in the send method.

I have also verified the behavior in staging over night and the metrics show that we have the same amount of cache hits/min with the patch as before.

log.info("No cache eviction interval defined, using default value " + defaultValue);
return defaultValue;
} else {
return Duration.parse(config.get("runner").get("cache_eviction_interval").asString());
}
}

/**
* Number of WorkItems to execute in parallel.
* @return
Expand Down
Expand Up @@ -479,4 +479,8 @@ public QueryBuilder delete(String endpoint) {
public QueryBuilder delete() {
return delete(null);
}

public static void evictOldCacheData() {
RestRequestCache.INSTANCE.evictOldData();
}
}
Expand Up @@ -22,6 +22,9 @@
*/
package org.openjdk.skara.network;

import org.openjdk.skara.metrics.Counter;
import org.openjdk.skara.metrics.Gauge;

import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.net.URI;
Expand All @@ -35,6 +38,9 @@
enum RestRequestCache {
INSTANCE;

private final static Gauge cachedEntriesGauge = Gauge.name("skara_response_cache_size").register();
private final static Counter cacheHitsCounter = Counter.name("skara_response_cache_hits").register();

private static class RequestContext {
private final String authId;
private final HttpRequest unauthenticatedRequest;
Expand Down Expand Up @@ -68,12 +74,21 @@ public Duration age() {
}
}

private final Map<RequestContext, HttpResponse<String>> cachedResponses = new ConcurrentHashMap<>();
private class CacheEntry {
private final HttpResponse<String> response;
private final Instant callTime;

public CacheEntry(HttpResponse<String> response, Instant callTime) {
this.response = response;
this.callTime = callTime;
}
}

private final Map<RequestContext, CacheEntry> cachedResponses = new ConcurrentHashMap<>();
private final HttpClient client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
private final Logger log = Logger.getLogger("org.openjdk.skara.network");
private final ConcurrentHashMap<String, Lock> authLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Instant> lastUpdates = new ConcurrentHashMap<>();
private final Map<RequestContext, Instant> cachedUpdated = new ConcurrentHashMap<>();

private static class LockWithTimeout implements AutoCloseable {
private final Lock lock;
Expand Down Expand Up @@ -176,9 +191,8 @@ HttpResponse<String> send(String authId, HttpRequest.Builder requestBuilder) thr
if (unauthenticatedRequest.method().equals("GET")) {
var cached = cachedResponses.get(requestContext);
if (cached != null) {
var created = cachedUpdated.get(requestContext);
if (Instant.now().minus(maxAllowedAge(requestContext)).isBefore(created)) {
var tag = cached.headers().firstValue("ETag");
if (Instant.now().minus(maxAllowedAge(requestContext)).isBefore(cached.callTime)) {
var tag = cached.response.headers().firstValue("ETag");
tag.ifPresent(value -> requestBuilder.header("If-None-Match", value));
} else {
log.finer("Expired response cache for " + requestContext.unauthenticatedRequest.uri() + " (" + requestContext.authId + ")");
Expand All @@ -190,12 +204,13 @@ HttpResponse<String> send(String authId, HttpRequest.Builder requestBuilder) thr
// Perform requests using a certain account serially
response = client.send(finalRequest, HttpResponse.BodyHandlers.ofString());
}
if (response.statusCode() == 304) {
if (cached != null && response.statusCode() == 304) {
cacheHitsCounter.inc();
log.finer("Using cached response for " + finalRequest + " (" + authId + ")");
return new CachedHttpResponse<>(cached, response);
return new CachedHttpResponse<>(cached.response, response);
} else {
cachedResponses.put(requestContext, response);
cachedUpdated.put(requestContext, Instant.now());
cachedResponses.put(requestContext, new CacheEntry(response, Instant.now()));
cachedEntriesGauge.set(cachedResponses.size());
log.finer("Updating response cache for " + finalRequest + " (" + authId + ")");
return response;
}
Expand All @@ -220,12 +235,30 @@ HttpResponse<String> send(String authId, HttpRequest.Builder requestBuilder) thr
} finally {
// Invalidate any related GET caches
var postUriString = unauthenticatedRequest.uri().toString();
for (var cachedResponse : cachedResponses.keySet()) {
if (cachedResponse.unauthenticatedRequest.uri().toString().startsWith(postUriString)) {
cachedUpdated.put(cachedResponse, Instant.now().minus(Duration.ofDays(1)));
var iterator = cachedResponses.entrySet().iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.getKey().unauthenticatedRequest.uri().toString().startsWith(postUriString)) {
iterator.remove();
cachedEntriesGauge.set(cachedResponses.size());
}
}
}
}
}

/**
* This method should be run from time to time to keep the cache from growing indefinitely.
*/
public void evictOldData() {
var now = Instant.now();
var iterator = cachedResponses.entrySet().iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.getValue().callTime.isBefore(now.minus(maxAllowedAge(entry.getKey())))) {
iterator.remove();
cachedEntriesGauge.set(cachedResponses.size());
}
}
}
}