Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[http] Fix refresh time check and calculation #16288

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ public void initialize() {
}
rateLimitedHttpClient.setDelay(config.delay);

int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}

// remove empty headers
config.headers.removeIf(String::isBlank);

Expand Down Expand Up @@ -236,6 +227,17 @@ public void initialize() {
// create channels
thing.getChannels().forEach(this::createChannel);

int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}

urlHandlers.values().forEach(urlHandler -> urlHandler.start(scheduler, config.refresh));

updateStatus(ThingStatus.UNKNOWN);
}

Expand Down Expand Up @@ -330,9 +332,10 @@ private void createChannel(Channel channel) {
// we need a key consisting of stateContent and URL, only if both are equal, we can use the same cache
String key = channelConfig.stateContent + "$" + stateUrl;
channelUrls.put(channelUID, key);
Objects.requireNonNull(urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, stateUrl, config,
channelConfig.stateContent, config.contentType, this)))
Objects.requireNonNull(
urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(rateLimitedHttpClient, stateUrl, config,
channelConfig.stateContent, config.contentType, this)))
.addConsumer(itemValueConverter::process);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ public class RefreshingUrlCache {
private final @Nullable String httpContentType;
private final HttpStatusListener httpStatusListener;

private final ScheduledFuture<?> future;
private @Nullable ScheduledFuture<?> future;
private @Nullable ChannelHandlerContent lastContent;

public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
HttpThingConfig thingConfig, String httpContent, @Nullable String httpContentType,
HttpStatusListener httpStatusListener) {
public RefreshingUrlCache(RateLimitedHttpClient httpClient, String url, HttpThingConfig thingConfig,
String httpContent, @Nullable String httpContentType, HttpStatusListener httpStatusListener) {
this.httpClient = httpClient;
this.url = url;
this.strictErrorHandling = thingConfig.strictErrorHandling;
Expand All @@ -76,9 +75,25 @@ public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClie
this.httpContentType = httpContentType;
this.httpStatusListener = httpStatusListener;
fallbackEncoding = thingConfig.encoding;
}

future = executor.scheduleWithFixedDelay(this::refresh, 1, thingConfig.refresh, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, thingConfig.refresh);
public void start(ScheduledExecutorService executor, int refreshTime) {
if (future != null) {
logger.warn("Starting refresh task requested but it is already started. This is bug.");
return;
}
future = executor.scheduleWithFixedDelay(this::refresh, 1, refreshTime, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, refreshTime);
}

public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
logger.trace("Stopped refresh task for URL '{}'", url);
}
}

private void refresh() {
Expand Down Expand Up @@ -132,13 +147,6 @@ private void refresh(boolean isRetry) {
}
}

public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
future.cancel(false);
logger.trace("Stopped refresh task for URL '{}'", url);
}

public void addConsumer(Consumer<@Nullable ChannelHandlerContent> consumer) {
consumers.add(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testWithLimitAndPriority() {
assertThat((int) msBetween, allOf(greaterThanOrEqualTo(1000), lessThan(1100)));
}

private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
private void doLimitTest(int setDelay, List<Boolean> config) {
stubFor(get(urlEqualTo(TEST_LOCATION)).willReturn(aResponse().withBody(TEST_CONTENT)));

RateLimitedHttpClient rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
Expand Down Expand Up @@ -129,8 +129,6 @@ private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
// wait until we got all results
waitForAssert(() -> assertEquals(config.size(), responses.size()));
rateLimitedHttpClient.shutdown();

return responses;
}

private static class Response {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ public void testDateIsFormattedInURL() {
* @return the cache object
*/
private RefreshingUrlCache getUrlCache(String content) {
RefreshingUrlCache urlCache = new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, thingConfig,
content, null, statusListener);
RefreshingUrlCache urlCache = new RefreshingUrlCache(rateLimitedHttpClient, url, thingConfig, content, null,
statusListener);
urlCache.addConsumer(contentWrappers::add);

urlCache.start(scheduler, thingConfig.refresh);
return urlCache;
}
}