Skip to content

Commit

Permalink
Add rescheduling of timers (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
querdenker2k committed May 9, 2023
1 parent 9d69d80 commit 603eb1b
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ private void invokeRuleInternal(JRuleExecutionContext context, JRuleEvent event)
private void invokeDelayed(JRuleExecutionContext context, JRuleEvent event,
BiConsumer<JRuleExecutionContext, JRuleEvent> ruleInvoker) {
if (context.getDelayed() != null) {
JRuleTimerHandler.get().createTimer(null, context.getDelayed(), () -> ruleInvoker.accept(context, event),
JRuleTimerHandler.get().createTimer(null, context.getDelayed(), t -> ruleInvoker.accept(context, event),
context);
} else {
ruleInvoker.accept(context, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -68,7 +69,7 @@ private JRuleTimerHandler() {
}

public synchronized JRuleTimer createOrReplaceTimer(@Nullable final String timerName, Duration delay,
Runnable function, @Nullable JRuleExecutionContext context) {
Consumer<JRuleTimer> function, @Nullable JRuleExecutionContext context) {
final String newTimerName = Optional.ofNullable(timerName).orElse(UUID.randomUUID().toString());
cancelTimer(newTimerName);
return createTimer(newTimerName, delay, function, context);
Expand All @@ -88,7 +89,7 @@ public synchronized boolean isTimerRunning(String timerName) {
.noneMatch(timer -> timer.futures.stream().anyMatch(CompletableFuture::isDone));
}

public JRuleTimer createTimer(@Nullable final String timerName, Duration delay, Runnable function,
public JRuleTimer createTimer(@Nullable final String timerName, Duration delay, Consumer<JRuleTimer> function,
@Nullable JRuleExecutionContext context) {
final String newTimerName = Optional.ofNullable(timerName).orElse(UUID.randomUUID().toString());
Optional<JRuleTimer> any = timers.stream().filter(timer -> Objects.equals(timer.name, newTimerName)).findAny();
Expand All @@ -98,7 +99,8 @@ public JRuleTimer createTimer(@Nullable final String timerName, Duration delay,
}

CompletableFuture<?> future = delayedExecution(delay);
JRuleTimer timer = new JRuleTimer(newTimerName, future, context != null ? context : getCurrentContext(), delay);
JRuleTimer timer = new JRuleTimer(newTimerName, function, future,
context != null ? context : getCurrentContext(), delay);
timers.add(timer);

JRuleLog.info(logger, timer.getLogName(), "Start timer '{}' with delay: {}", newTimerName, delay);
Expand All @@ -107,14 +109,14 @@ public JRuleTimer createTimer(@Nullable final String timerName, Duration delay,
}

public synchronized JRuleTimer createOrReplaceRepeatingTimer(@Nullable final String timerName, Duration delay,
int numberOfRepeats, Runnable function, @Nullable JRuleExecutionContext context) {
int numberOfRepeats, Consumer<JRuleTimer> function, @Nullable JRuleExecutionContext context) {
final String newTimerName = Optional.ofNullable(timerName).orElse(UUID.randomUUID().toString());
cancelTimer(newTimerName);
return createRepeatingTimer(newTimerName, delay, numberOfRepeats, function, context);
}

public synchronized JRuleTimer createRepeatingTimer(@Nullable String timerName, Duration delay, int numberOfRepeats,
Runnable function, @Nullable JRuleExecutionContext context) {
Consumer<JRuleTimer> function, @Nullable JRuleExecutionContext context) {
final String newTimerName = Optional.ofNullable(timerName).orElse(UUID.randomUUID().toString());
Optional<JRuleTimer> any = timers.stream().filter(timer -> timer.name.equals(newTimerName)).findAny();
if (any.isPresent()) {
Expand All @@ -123,8 +125,8 @@ public synchronized JRuleTimer createRepeatingTimer(@Nullable String timerName,

List<CompletableFuture<?>> newTimers = Stream.iterate(0, i -> i + 1).limit(numberOfRepeats)
.map(i -> delayedExecution(delay.multipliedBy(i).plus(delay))).collect(Collectors.toList());
JRuleTimer timer = new JRuleTimer(newTimerName, newTimers, context != null ? context : getCurrentContext(),
delay);
JRuleTimer timer = new JRuleTimer(newTimerName, function, newTimers,
context != null ? context : getCurrentContext(), delay);
timers.add(timer);
logger.trace("added repeating timers '{}': {}", newTimerName, newTimers.size());
getTimers(newTimerName);
Expand All @@ -141,7 +143,7 @@ public boolean getTimedLock(String lockName, Duration duration) {
CompletableFuture<?> future = delayedExecution(duration);
future.thenRun(() -> removeTimer(LOCK_PREFIX + lockName)).thenRun(() -> JRuleLog.info(logger,
getCurrentContext().getLogName(), String.format("Timer '%s' completed! Releasing lock", lockName)));
timers.add(new JRuleTimer(LOCK_PREFIX + lockName, future, getCurrentContext(), duration));
timers.add(new JRuleTimer(LOCK_PREFIX + lockName, null, future, getCurrentContext(), duration));
return true;
}

Expand All @@ -156,7 +158,7 @@ private synchronized List<JRuleTimer> getTimers(String timerName) {
return list;
}

private void invokeTimerInternal(JRuleTimer timer, Runnable runnable) {
private void invokeTimerInternal(JRuleTimer timer, Consumer<JRuleTimer> runnable) {
try {
JRule.JRULE_EXECUTION_CONTEXT.set(new JRuleLocalTimerExecutionContext(timer.context, timer.name));
JRuleLog.debug(logger, timer.context.getMethod().getName(), "Invoking timer from context: {}",
Expand All @@ -167,7 +169,7 @@ private void invokeTimerInternal(JRuleTimer timer, Runnable runnable) {
MDC.put(JRuleEngine.MDC_KEY_RULE, timer.context.getMethod().getName());
MDC.put(JRuleEngine.MDC_KEY_TIMER, timer.name);
Arrays.stream(timer.context.getLoggingTags()).forEach(s -> MDC.put(s, s));
runnable.run();
runnable.accept(timer);
} catch (IllegalArgumentException | SecurityException e) {
JRuleLog.error(logger, timer.context.getMethod().getName(), "Error {}", ExceptionUtils.getStackTrace(e));
} finally {
Expand Down Expand Up @@ -203,17 +205,21 @@ public final class JRuleTimer {
private final List<CompletableFuture<?>> futures;

private final JRuleExecutionContext context;
private Consumer<JRuleTimer> function;

public JRuleTimer(String name, CompletableFuture<?> future, JRuleExecutionContext context, Duration delay) {
public JRuleTimer(String name, Consumer<JRuleTimer> function, CompletableFuture<?> future,
JRuleExecutionContext context, Duration delay) {
this.name = name;
this.function = function;
this.futures = List.of(future);
this.context = context;
this.delay = delay;
}

public JRuleTimer(String name, List<CompletableFuture<?>> futures, JRuleExecutionContext context,
Duration delay) {
public JRuleTimer(String name, Consumer<JRuleTimer> function, List<CompletableFuture<?>> futures,
JRuleExecutionContext context, Duration delay) {
this.name = name;
this.function = function;
this.futures = futures;
this.context = context;
this.delay = delay;
Expand All @@ -234,37 +240,41 @@ public boolean isRunning() {
}

public JRuleTimerHandler.JRuleTimer createTimerAfter(@Nullable String timerName, Duration delay,
Runnable function) {
Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createTimer(timerName, delay.plus(this.delay), function, context);
}

public JRuleTimerHandler.JRuleTimer createTimerAfter(Duration delay, Runnable function) {
public JRuleTimerHandler.JRuleTimer createTimerAfter(Duration delay, Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createTimer(null, delay.plus(this.delay), function, context);
}

public JRuleTimerHandler.JRuleTimer createOrReplaceTimerAfter(@Nullable String timerName, Duration delay,
Runnable function) {
Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createOrReplaceTimer(timerName, delay.plus(this.delay), function, context);
}

public JRuleTimerHandler.JRuleTimer createOrReplaceRepeatingTimerAfter(@Nullable String timerName,
Duration delay, int numberOfRepeats, Runnable function) {
Duration delay, int numberOfRepeats, Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createOrReplaceRepeatingTimer(timerName, delay.plus(this.delay),
numberOfRepeats, function, context);
}

public JRuleTimerHandler.JRuleTimer createRepeatingTimerAfter(@Nullable String timerName, Duration delay,
int numberOfRepeats, Runnable function) {
int numberOfRepeats, Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createRepeatingTimer(timerName, delay.plus(this.delay), numberOfRepeats,
function, context);
}

public JRuleTimerHandler.JRuleTimer createRepeatingTimerAfter(Duration delay, int numberOfRepeats,
Runnable function) {
Consumer<JRuleTimer> function) {
return JRuleTimerHandler.this.createRepeatingTimer(null, delay.plus(this.delay), numberOfRepeats, function,
context);
}

public JRuleTimerHandler.JRuleTimer rescheduleTimer(Duration delay) {
return JRuleTimerHandler.this.createOrReplaceTimer(this.name, delay, this.function, context);
}

public boolean isDone() {
return futures.stream().allMatch(CompletableFuture::isDone);
}
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/org/openhab/automation/jrule/rules/JRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Map;
import java.util.function.Consumer;

import org.eclipse.jdt.annotation.Nullable;
import org.openhab.automation.jrule.exception.JRuleRuntimeException;
Expand Down Expand Up @@ -192,7 +193,7 @@ protected String sendHttpDeleteRequest(String url, Map<String, String> headers,
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createOrReplaceTimer(@Nullable String timerName, Duration delay,
Runnable function) {
Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createOrReplaceTimer(timerName, delay, function, null);
}

Expand All @@ -204,7 +205,8 @@ protected JRuleTimerHandler.JRuleTimer createOrReplaceTimer(@Nullable String tim
* @param function Code to execute.
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createTimer(@Nullable String timerName, Duration delay, Runnable function) {
protected JRuleTimerHandler.JRuleTimer createTimer(@Nullable String timerName, Duration delay,
Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createTimer(timerName, delay, function, null);
}

Expand All @@ -215,7 +217,8 @@ protected JRuleTimerHandler.JRuleTimer createTimer(@Nullable String timerName, D
* @param function Code to execute.
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createTimer(Duration delay, Runnable function) {
protected JRuleTimerHandler.JRuleTimer createTimer(Duration delay,
Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createTimer(null, delay, function, null);
}

Expand All @@ -229,7 +232,7 @@ protected JRuleTimerHandler.JRuleTimer createTimer(Duration delay, Runnable func
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createOrReplaceRepeatingTimer(@Nullable String timerName, Duration delay,
int numberOfRepeats, Runnable function) {
int numberOfRepeats, Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createOrReplaceRepeatingTimer(timerName, delay, numberOfRepeats, function, null);
}

Expand All @@ -243,7 +246,7 @@ protected JRuleTimerHandler.JRuleTimer createOrReplaceRepeatingTimer(@Nullable S
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createRepeatingTimer(@Nullable String timerName, Duration delay,
int numberOfRepeats, Runnable function) {
int numberOfRepeats, Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createRepeatingTimer(timerName, delay, numberOfRepeats, function, null);
}

Expand All @@ -256,7 +259,7 @@ protected JRuleTimerHandler.JRuleTimer createRepeatingTimer(@Nullable String tim
* @return A handle for the timer.
*/
protected JRuleTimerHandler.JRuleTimer createRepeatingTimer(Duration delay, int numberOfRepeats,
Runnable function) {
Consumer<JRuleTimerHandler.JRuleTimer> function) {
return JRuleTimerHandler.get().createRepeatingTimer(null, delay, numberOfRepeats, function, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ public void testTimer() throws ItemNotFoundException, InterruptedException {
assertFalse(eventPublisher.hasCommandEvent(JRuleTimerTestRules.TARGET_ITEM, "canceled timer"));
}

@Test
public void testTimerReschedule() throws ItemNotFoundException, InterruptedException {
JRuleTimerTestRules rule = initRule(JRuleTimerTestRules.class);
// Set item state in ItemRegistry
registerItem(new StringItem(JRuleTimerTestRules.TARGET_ITEM), UnDefType.UNDEF);
registerItem(new StringItem(JRuleTimerTestRules.TRIGGER_ITEM), UnDefType.UNDEF);

JRuleItemRegistry.get(JRuleTimerTestRules.TARGET_ITEM, TargetItem.class);
fireEvents(false, List.of(itemChangeEvent(JRuleTimerTestRules.TRIGGER_ITEM, "nothing", "reschedule")));
verify(rule, times(1)).testRescheduleTimers();
Thread.sleep(1500); // Wait for timer inside rule to execute
assertTrue(eventPublisher.hasCommandEvent(JRuleTimerTestRules.TARGET_ITEM, "command"));
assertTrue(eventPublisher.countCommandEvent(JRuleTimerTestRules.TARGET_ITEM, "timedCommand") >= 2);
}

@Test
public void testRepeatingTimer() throws ItemNotFoundException, InterruptedException {
JRuleTimerTestRules rule = initRule(JRuleTimerTestRules.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,50 @@ public void testTimers() {
stringItem.sendCommand("command");
cancelTimer("NON_EXISTING_TIMER");
createOrReplaceTimer("CREATE_OR_REPLACE_TIMER", Duration.ofSeconds(1),
() -> logInfo("Replaced timer completed"));
createTimer(null, Duration.ofSeconds(1), () -> {
t -> logInfo("Replaced timer completed"));
createTimer(null, Duration.ofSeconds(1), t -> {
logInfo("log something");
stringItem.sendCommand("unique timer");
});

JRuleTimerHandler.JRuleTimer timer = createTimer("TimerName", Duration.ofMillis(500), () -> {
JRuleTimerHandler.JRuleTimer timer = createTimer("TimerName", Duration.ofMillis(500), t -> {
stringItem.sendCommand("timedCommand");
createTimer("NestedTimer", Duration.ofMillis(500), () -> {
createTimer("NestedTimer", Duration.ofMillis(500), t2 -> {
stringItem.sendCommand("nestedTimedCommand");
});
});
stringItem.sendCommand("timer2 running " + timer.isRunning());
stringItem.sendCommand("timer2 done " + timer.isDone());

final AtomicInteger chainedCounter = new AtomicInteger(0);
createTimer("inner-1", Duration.ofSeconds(1), () -> {
createTimer("inner-1", Duration.ofSeconds(1), t -> {
logInfo("calling inner-1");
chainedCounter.incrementAndGet();
}).createTimerAfter("inner-2", Duration.ofMillis(500), () -> {
}).createTimerAfter("inner-2", Duration.ofMillis(500), t -> {
logInfo("calling inner-2");
chainedCounter.incrementAndGet();
}).createTimerAfter("inner-3", Duration.ofMillis(200), () -> {
}).createTimerAfter("inner-3", Duration.ofMillis(200), t -> {
logInfo("calling inner-3");
stringItem.sendCommand("after the other one " + chainedCounter.incrementAndGet());
});

JRuleTimerHandler.JRuleTimer canceledTimer = createTimer(null, Duration.ofSeconds(1),
() -> stringItem.sendCommand("canceled timer"));
t -> stringItem.sendCommand("canceled timer"));
canceledTimer.cancel();
}

@JRuleName("Reschedule Timer")
@JRuleWhenItemChange(item = TRIGGER_ITEM, to = "reschedule")
public void testRescheduleTimers() {
JRuleStringItem stringItem = JRuleStringItem.forName(TARGET_ITEM);
stringItem.sendCommand("command");

createTimer("TimerName", Duration.ofMillis(500), t -> {
stringItem.sendCommand("timedCommand");
t.rescheduleTimer(Duration.ofMillis(500));
});
}

@JRuleName("Repeating Timers")
@JRuleWhenItemChange(item = TRIGGER_ITEM, to = "timers-repeating")
public void testRepeatingTimers() {
Expand All @@ -88,17 +100,17 @@ public void testRepeatingTimers() {

// with name
final AtomicInteger repeatingWithNameCounter = new AtomicInteger(0);
createRepeatingTimer("repeating-with-name", Duration.ofMillis(500), 2, () -> repeatingWithNameItem
createRepeatingTimer("repeating-with-name", Duration.ofMillis(500), 2, t -> repeatingWithNameItem
.sendCommand("repeating-with-name-" + repeatingWithNameCounter.incrementAndGet()));
final AtomicInteger repeatingWithNameReplacedCounter = new AtomicInteger(0);
createOrReplaceRepeatingTimer("repeating-with-name-replaced", Duration.ofMillis(200), 5,
() -> repeatingWithNameReplacedItem.sendCommand(
t -> repeatingWithNameReplacedItem.sendCommand(
"repeating-with-name-replaced-" + repeatingWithNameReplacedCounter.incrementAndGet()));

// without name
final AtomicInteger repeatingCounter = new AtomicInteger(0);
createRepeatingTimer(Duration.ofMillis(100), 10,
() -> repeatingItem.sendCommand("repeating-" + repeatingCounter.incrementAndGet()));
t -> repeatingItem.sendCommand("repeating-" + repeatingCounter.incrementAndGet()));
}

@JRuleName("Repeating Timers")
Expand All @@ -111,7 +123,7 @@ public void testRepeatingTimersComplex() {

final String TIMER_NAME = "repeating-with-name-replaced";
createOrReplaceRepeatingTimer(TIMER_NAME, Duration.ofMillis(200), 5,
() -> repeatingWithNameReplacedItem.sendCommand(TIMER_NAME + "-" + counter.incrementAndGet()));
t -> repeatingWithNameReplacedItem.sendCommand(TIMER_NAME + "-" + counter.incrementAndGet()));
}

@JRuleName("Rule name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.IOException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openhab.automation.jrule.rules.user.TestRulesHttpActions;

Expand All @@ -26,6 +27,11 @@
*/
// Please do not reuse items for testing, create new ones, otherwise every change will hurt sometimes
public class ITJRuleHttpActions extends JRuleITBase {
@BeforeEach
public void init() {
WireMock.reset();
}

@Test
public void sendHttpGet() throws IOException {
WireMock.stubFor(WireMock.get(WireMock.urlEqualTo(TestRulesHttpActions.HTTP_GET_SOMETHING))
Expand Down
Loading

0 comments on commit 603eb1b

Please sign in to comment.