Skip to content
Permalink
Browse files
1052: Memory leak in RestRequestCache
Reviewed-by: kcr, ehelin
  • Loading branch information
erikj79 committed May 26, 2021
1 parent d5e31d2 commit ca8d1c3b817029fd083c2ff5aef0f827b96a734f
@@ -36,6 +36,7 @@
import java.util.stream.Collectors;

import com.sun.net.httpserver.*;
import org.openjdk.skara.network.RestRequest;

class BotRunnerError extends RuntimeException {
BotRunnerError(String msg) {
@@ -325,10 +326,14 @@ public void run(Duration timeout) {
}

isReady = true;
executor.scheduleAtFixedRate(this::itemWatchdog, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);

var schedulingInterval = config.scheduledExecutionPeriod().toMillis();
executor.scheduleAtFixedRate(this::itemWatchdog, 0, schedulingInterval, TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0, schedulingInterval, TimeUnit.MILLISECONDS);

var cacheEvictionInterval = config.cacheEvictionInterval().toMillis();
executor.scheduleAtFixedRate(RestRequest::evictOldCacheData, cacheEvictionInterval,
cacheEvictionInterval, TimeUnit.MILLISECONDS);

try {
executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
@@ -319,6 +319,20 @@ Duration scheduledExecutionPeriod() {
}
}

/**
* The amount of time to wait between runs of the RestResponseCache evictions.
* @return
*/
Duration cacheEvictionInterval() {
if (!config.contains("runner") || !config.get("runner").contains("cache_eviction_interval")) {
var defaultValue = Duration.ofMinutes(5);
log.info("No cache eviction interval defined, using default value " + defaultValue);
return defaultValue;
} else {
return Duration.parse(config.get("runner").get("cache_eviction_interval").asString());
}
}

/**
* Number of WorkItems to execute in parallel.
* @return
@@ -479,4 +479,8 @@ public QueryBuilder delete(String endpoint) {
public QueryBuilder delete() {
return delete(null);
}

public static void evictOldCacheData() {
RestRequestCache.INSTANCE.evictOldData();
}
}
@@ -74,12 +74,21 @@ public Duration age() {
}
}

private final Map<RequestContext, HttpResponse<String>> cachedResponses = new ConcurrentHashMap<>();
private class CacheEntry {
private final HttpResponse<String> response;
private final Instant callTime;

public CacheEntry(HttpResponse<String> response, Instant callTime) {
this.response = response;
this.callTime = callTime;
}
}

private final Map<RequestContext, CacheEntry> cachedResponses = new ConcurrentHashMap<>();
private final HttpClient client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
private final Logger log = Logger.getLogger("org.openjdk.skara.network");
private final ConcurrentHashMap<String, Lock> authLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Instant> lastUpdates = new ConcurrentHashMap<>();
private final Map<RequestContext, Instant> cachedUpdated = new ConcurrentHashMap<>();

private static class LockWithTimeout implements AutoCloseable {
private final Lock lock;
@@ -182,9 +191,8 @@ private Duration maxAllowedAge(RequestContext requestContext) {
if (unauthenticatedRequest.method().equals("GET")) {
var cached = cachedResponses.get(requestContext);
if (cached != null) {
var created = cachedUpdated.get(requestContext);
if (Instant.now().minus(maxAllowedAge(requestContext)).isBefore(created)) {
var tag = cached.headers().firstValue("ETag");
if (Instant.now().minus(maxAllowedAge(requestContext)).isBefore(cached.callTime)) {
var tag = cached.response.headers().firstValue("ETag");
tag.ifPresent(value -> requestBuilder.header("If-None-Match", value));
} else {
log.finer("Expired response cache for " + requestContext.unauthenticatedRequest.uri() + " (" + requestContext.authId + ")");
@@ -196,13 +204,12 @@ private Duration maxAllowedAge(RequestContext requestContext) {
// Perform requests using a certain account serially
response = client.send(finalRequest, HttpResponse.BodyHandlers.ofString());
}
if (response.statusCode() == 304) {
if (cached != null && response.statusCode() == 304) {
cacheHitsCounter.inc();
log.finer("Using cached response for " + finalRequest + " (" + authId + ")");
return new CachedHttpResponse<>(cached, response);
return new CachedHttpResponse<>(cached.response, response);
} else {
cachedResponses.put(requestContext, response);
cachedUpdated.put(requestContext, Instant.now());
cachedResponses.put(requestContext, new CacheEntry(response, Instant.now()));
cachedEntriesGauge.set(cachedResponses.size());
log.finer("Updating response cache for " + finalRequest + " (" + authId + ")");
return response;
@@ -228,12 +235,30 @@ private Duration maxAllowedAge(RequestContext requestContext) {
} finally {
// Invalidate any related GET caches
var postUriString = unauthenticatedRequest.uri().toString();
for (var cachedResponse : cachedResponses.keySet()) {
if (cachedResponse.unauthenticatedRequest.uri().toString().startsWith(postUriString)) {
cachedUpdated.put(cachedResponse, Instant.now().minus(Duration.ofDays(1)));
var iterator = cachedResponses.entrySet().iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.getKey().unauthenticatedRequest.uri().toString().startsWith(postUriString)) {
iterator.remove();
cachedEntriesGauge.set(cachedResponses.size());
}
}
}
}
}

/**
* This method should be run from time to time to keep the cache from growing indefinitely.
*/
public void evictOldData() {
var now = Instant.now();
var iterator = cachedResponses.entrySet().iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.getValue().callTime.isBefore(now.minus(maxAllowedAge(entry.getKey())))) {
iterator.remove();
cachedEntriesGauge.set(cachedResponses.size());
}
}
}
}

1 comment on commit ca8d1c3

@openjdk-notifier

This comment has been minimized.

Copy link

@openjdk-notifier openjdk-notifier bot commented on ca8d1c3 May 26, 2021

Please sign in to comment.