Skip to content

Commit

Permalink
[http] Fix refresh time check and calculation (#16288)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <github@klug.nrw>
  • Loading branch information
J-N-K committed Jan 17, 2024
1 parent 41743e8 commit 06b8c73
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ public void initialize() {
}
rateLimitedHttpClient.setDelay(config.delay);

int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}

// remove empty headers
config.headers.removeIf(String::isBlank);

Expand Down Expand Up @@ -236,6 +227,17 @@ public void initialize() {
// create channels
thing.getChannels().forEach(this::createChannel);

int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}

urlHandlers.values().forEach(urlHandler -> urlHandler.start(scheduler, config.refresh));

updateStatus(ThingStatus.UNKNOWN);
}

Expand Down Expand Up @@ -330,9 +332,10 @@ private void createChannel(Channel channel) {
// we need a key consisting of stateContent and URL, only if both are equal, we can use the same cache
String key = channelConfig.stateContent + "$" + stateUrl;
channelUrls.put(channelUID, key);
Objects.requireNonNull(urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, stateUrl, config,
channelConfig.stateContent, config.contentType, this)))
Objects.requireNonNull(
urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(rateLimitedHttpClient, stateUrl, config,
channelConfig.stateContent, config.contentType, this)))
.addConsumer(itemValueConverter::process);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ public class RefreshingUrlCache {
private final @Nullable String httpContentType;
private final HttpStatusListener httpStatusListener;

private final ScheduledFuture<?> future;
private @Nullable ScheduledFuture<?> future;
private @Nullable ChannelHandlerContent lastContent;

public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
HttpThingConfig thingConfig, String httpContent, @Nullable String httpContentType,
HttpStatusListener httpStatusListener) {
public RefreshingUrlCache(RateLimitedHttpClient httpClient, String url, HttpThingConfig thingConfig,
String httpContent, @Nullable String httpContentType, HttpStatusListener httpStatusListener) {
this.httpClient = httpClient;
this.url = url;
this.strictErrorHandling = thingConfig.strictErrorHandling;
Expand All @@ -76,9 +75,25 @@ public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClie
this.httpContentType = httpContentType;
this.httpStatusListener = httpStatusListener;
fallbackEncoding = thingConfig.encoding;
}

future = executor.scheduleWithFixedDelay(this::refresh, 1, thingConfig.refresh, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, thingConfig.refresh);
public void start(ScheduledExecutorService executor, int refreshTime) {
if (future != null) {
logger.warn("Starting refresh task requested but it is already started. This is bug.");
return;
}
future = executor.scheduleWithFixedDelay(this::refresh, 1, refreshTime, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, refreshTime);
}

public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
logger.trace("Stopped refresh task for URL '{}'", url);
}
}

private void refresh() {
Expand Down Expand Up @@ -132,13 +147,6 @@ private void refresh(boolean isRetry) {
}
}

public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
future.cancel(false);
logger.trace("Stopped refresh task for URL '{}'", url);
}

public void addConsumer(Consumer<@Nullable ChannelHandlerContent> consumer) {
consumers.add(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testWithLimitAndPriority() {
assertThat((int) msBetween, allOf(greaterThanOrEqualTo(1000), lessThan(1100)));
}

private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
private void doLimitTest(int setDelay, List<Boolean> config) {
stubFor(get(urlEqualTo(TEST_LOCATION)).willReturn(aResponse().withBody(TEST_CONTENT)));

RateLimitedHttpClient rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
Expand Down Expand Up @@ -129,8 +129,6 @@ private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
// wait until we got all results
waitForAssert(() -> assertEquals(config.size(), responses.size()));
rateLimitedHttpClient.shutdown();

return responses;
}

private static class Response {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ public void testDateIsFormattedInURL() {
* @return the cache object
*/
private RefreshingUrlCache getUrlCache(String content) {
RefreshingUrlCache urlCache = new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, thingConfig,
content, null, statusListener);
RefreshingUrlCache urlCache = new RefreshingUrlCache(rateLimitedHttpClient, url, thingConfig, content, null,
statusListener);
urlCache.addConsumer(contentWrappers::add);

urlCache.start(scheduler, thingConfig.refresh);
return urlCache;
}
}

0 comments on commit 06b8c73

Please sign in to comment.