Skip to content

Commit

Permalink
fix #1107 Avoid interrupting WorkerTask Future if cancelled race
Browse files Browse the repository at this point in the history
This commit prevents the undesirable interruption of the WorkerTask's
thread when a cancellation happens within the run() method (eg. a
MonoNext is cancelling upon first onNext downstream).

This would previously lead to a race condition between the cancellation
and the setFuture, which would interrupt if called AFTER the dispose.
  • Loading branch information
simonbasle committed Mar 5, 2018
1 parent 74b94a4 commit a0b752a
Show file tree
Hide file tree
Showing 2 changed files with 390 additions and 17 deletions.
55 changes: 38 additions & 17 deletions reactor-core/src/main/java/reactor/core/scheduler/WorkerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package reactor.core.scheduler;

import reactor.core.Disposable;
import reactor.util.annotation.Nullable;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import reactor.core.Disposable;
import reactor.util.annotation.Nullable;

/**
* A runnable task for {@link Scheduler} Workers that are time-capable (implementing a
* relevant schedule(delay) and schedulePeriodically(period) methods).
Expand All @@ -38,12 +38,25 @@ final class WorkerTask implements Runnable, Disposable, Callable<Void> {

final Runnable task;

/** marker that the Worker was disposed and the parent got notified */
static final Composite DISPOSED = new EmptyCompositeDisposable();
/** marker that the Worker has completed, for the PARENT field */
static final Composite DONE = new EmptyCompositeDisposable();


static final Future<Void> FINISHED = new FutureTask<>(() -> null);
static final Future<Void> CANCELLED = new FutureTask<>(() -> null);
/** marker that the Worker has completed, for the FUTURE field */
static final Future<Void> FINISHED = new FutureTask<>(() -> null);
/**
* marker that the Worker was cancelled from the same thread (ie. within call()/run()),
* which means setFuture might race: we avoid interrupting the Future in this case.
*/
static final Future<Void> SYNC_CANCELLED = new FutureTask<>(() -> null);
/**
* marker that the Worker was cancelled from another thread, making it safe to
* interrupt the Future task.
*/
//see https://github.com/reactor/reactor-core/issues/1107
static final Future<Void> ASYNC_CANCELLED = new FutureTask<>(() -> null);

volatile Future<?> future;
static final AtomicReferenceFieldUpdater<WorkerTask, Future> FUTURE =
Expand All @@ -53,7 +66,9 @@ final class WorkerTask implements Runnable, Disposable, Callable<Void> {
static final AtomicReferenceFieldUpdater<WorkerTask, Composite> PARENT =
AtomicReferenceFieldUpdater.newUpdater(WorkerTask.class, Composite.class, "parent");

Thread thread;
volatile Thread thread;
static final AtomicReferenceFieldUpdater<WorkerTask, Thread> THREAD =
AtomicReferenceFieldUpdater.newUpdater(WorkerTask.class, Thread.class, "thread");

WorkerTask(Runnable task, Composite parent) {
this.task = task;
Expand All @@ -63,7 +78,7 @@ final class WorkerTask implements Runnable, Disposable, Callable<Void> {
@Override
@Nullable
public Void call() {
thread = Thread.currentThread();
THREAD.lazySet(this, Thread.currentThread());
try {
try {
task.run();
Expand All @@ -73,16 +88,17 @@ public Void call() {
}
}
finally {
thread = null;
THREAD.lazySet(this, null);
Composite o = parent;
if (o != DISPOSED && o != null && PARENT.compareAndSet(this, o, DONE)) {
//note: the o != null check must happen after the compareAndSet for it to always mark task as DONE
if (o != DISPOSED && PARENT.compareAndSet(this, o, DONE) && o != null) {
o.remove(this);
}

Future f;
for (;;) {
f = future;
if (f == CANCELLED || FUTURE.compareAndSet(this, f, FINISHED)) {
if (f == SYNC_CANCELLED || f == ASYNC_CANCELLED || FUTURE.compareAndSet(this, f, FINISHED)) {
break;
}
}
Expand All @@ -101,8 +117,12 @@ void setFuture(Future<?> f) {
if (o == FINISHED) {
return;
}
if (o == CANCELLED) {
f.cancel(thread != Thread.currentThread());
if (o == SYNC_CANCELLED) {
f.cancel(false);
return;
}
if (o == ASYNC_CANCELLED) {
f.cancel(true);
return;
}
if (FUTURE.compareAndSet(this, o, f)) {
Expand All @@ -113,20 +133,21 @@ void setFuture(Future<?> f) {

@Override
public boolean isDisposed() {
Future<?> a = future;
return FINISHED == a || CANCELLED == a;
Composite o = PARENT.get(this);
return o == DISPOSED || o == DONE;
}

@Override
public void dispose() {
for (;;) {
Future f = future;
if (f == FINISHED || f == CANCELLED) {
if (f == FINISHED || f == SYNC_CANCELLED || f == ASYNC_CANCELLED) {
break;
}
if (FUTURE.compareAndSet(this, f, CANCELLED)) {
boolean async = thread != Thread.currentThread();
if (FUTURE.compareAndSet(this, f, async ? ASYNC_CANCELLED : SYNC_CANCELLED)) {
if (f != null) {
f.cancel(thread != Thread.currentThread());
f.cancel(async);
}
break;
}
Expand Down

0 comments on commit a0b752a

Please sign in to comment.