Skip to content

Commit

Permalink
Implement disposing of tasks scheduled on VirtualTimeScheduler (#3097)
Browse files Browse the repository at this point in the history
Enables unit tests to track the execution state of tasks when using virtual time.

Fixes #3088.
  • Loading branch information
mdindoffer committed Jul 1, 2022
1 parent baf49c9 commit 7803a34
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -399,8 +400,12 @@ final void drain() {
queue.poll();

// Only execute if not unsubscribed
if (!current.scheduler.shutdown) {
current.run.run();
if (!current.worker.shutdown) {
try {
current.run.run();
} finally {
current.set(true);
}
}
}
nanoTime = targetNanoTime;
Expand All @@ -413,30 +418,45 @@ final void drain() {
}
}

static final class TimedRunnable implements Comparable<TimedRunnable> {
static final class TimedRunnable extends AtomicBoolean
implements Comparable<TimedRunnable>, Disposable {

final long time;
final Runnable run;
final VirtualTimeWorker scheduler;
final long count; // for differentiating tasks at same time
final VirtualTimeScheduler scheduler;
final VirtualTimeWorker worker;
final long time;
final Runnable run;
final long count; // for differentiating tasks at same time

TimedRunnable(VirtualTimeWorker scheduler, long time, Runnable run, long count) {
TimedRunnable(VirtualTimeScheduler scheduler,
VirtualTimeWorker worker,
long time,
Runnable run,
long count) {
this.scheduler = scheduler;
this.worker = worker;
this.time = time;
this.run = run;
this.scheduler = scheduler;
this.count = count;
}

@Override
public int compareTo(TimedRunnable o) {
if (time == o.time) {
return compare(count, o.count);
return Long.compare(count, o.count);
}
return compare(time, o.time);
return Long.compare(time, o.time);
}

static int compare(long a, long b){
return a < b ? -1 : (a > b ? 1 : 0);
@Override
public boolean isDisposed() {
return super.get();
}

@Override
public void dispose() {
scheduler.queue.remove(this);
scheduler.drain();
set(true);
}
}

Expand Down Expand Up @@ -478,36 +498,26 @@ final class VirtualTimeWorker implements Worker {

@Override
public Disposable schedule(Runnable run) {
if (shutdown) {
throw Exceptions.failWithRejected();
}
final TimedRunnable timedTask = new TimedRunnable(this,
0,
run,
COUNTER.getAndIncrement(VirtualTimeScheduler.this));
queue.add(timedTask);
drain();
return () -> {
queue.remove(timedTask);
drain();
};
return doScheduleAtTime(run,0);
}

@Override
public Disposable schedule(Runnable run, long delayTime, TimeUnit unit) {
return doScheduleAtTime(run,nanoTime + unit.toNanos(delayTime));
}

private Disposable doScheduleAtTime(Runnable run, long time) {
if (shutdown) {
throw Exceptions.failWithRejected();
}
final TimedRunnable timedTask = new TimedRunnable(this,
nanoTime + unit.toNanos(delayTime),
TimedRunnable timedTask = new TimedRunnable(VirtualTimeScheduler.this,
this,
time,
run,
COUNTER.getAndIncrement(VirtualTimeScheduler.this));
queue.add(timedTask);
drain();
return () -> {
queue.remove(timedTask);
drain();
};
return timedTask;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,9 @@
import java.util.function.Supplier;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -424,6 +426,41 @@ public Scheduler newSingle(ThreadFactory threadFactory) {
assertThat(postResetNewScheduler).as("new from restoredgt").isNotInstanceOf(VirtualTimeScheduler.class);
}

@Test
void scheduledTaskShouldBeDisposedAfterExecution() {
VirtualTimeScheduler vtScheduler = VirtualTimeScheduler.create();

Disposable immediateDisposable = vtScheduler.schedule(() -> {
});
assertThat(immediateDisposable.isDisposed()).isTrue();

Duration scheduleDelayDuration = Duration.ofSeconds(10);
Disposable scheduledDisposable = vtScheduler.schedule(() -> {
}, scheduleDelayDuration.getSeconds(), TimeUnit.SECONDS);
assertThat(scheduledDisposable.isDisposed()).isFalse();

vtScheduler.advanceTimeBy(scheduleDelayDuration);
assertThat(scheduledDisposable.isDisposed()).isTrue();
}

@Test
void scheduledTaskShouldNotBeExecutedIfDisposed() {
VirtualTimeScheduler vtScheduler = VirtualTimeScheduler.create();

Duration scheduleDelayDuration = Duration.ofSeconds(10);
Disposable scheduledDisposable = vtScheduler.schedule(() -> Assertions.fail(
"This task should not be executed, because it was disposed of"
+ " beforehand."),
scheduleDelayDuration.getSeconds(),
TimeUnit.SECONDS);

scheduledDisposable.dispose();
assertThat(scheduledDisposable.isDisposed()).isTrue();

vtScheduler.advanceTimeBy(scheduleDelayDuration);
assertThat(scheduledDisposable.isDisposed()).isTrue();
}

@SuppressWarnings("unchecked")
private static Scheduler uncache(Scheduler potentialCached) {
if (potentialCached instanceof Supplier) {
Expand Down

0 comments on commit 7803a34

Please sign in to comment.