Skip to content

Commit

Permalink
fix #451 Merge TimedScheduler features into Scheduler
Browse files Browse the repository at this point in the history
This commit merges the functionality of SingleTimedScheduler into its
SingleTimeScheduler sibling, now that both expose an API for delayed
submit:

 - moving the methods into Scheduler as default methods (will return
 a dedicated RejectedDisposable by default)
 - using Scheduler as input everywhere a TimedScheduler was used
 - add a `isTimeCapable()` method that can be used to ensure a Scheduler
 can schedule in the future
 - deprecating `TimedScheduler` and `TimedWorker`

The 3 task implementations of SingleTimedScheduler and SingleScheduler
have all been replaced by an extracted ScheduledRunnable.

Added tests in AbstractSchedulerTests, conditioned on isTimeCapable.

[Behavior Change]
As demonstrated in SchedulersTest, the single scheduler Worker won't
execute an immediate task anymore if it is disposed right after
`submit(task)`.
  • Loading branch information
simonbasle committed Mar 9, 2017
1 parent 819c71b commit 6f3383d
Show file tree
Hide file tree
Showing 41 changed files with 1,937 additions and 1,430 deletions.
214 changes: 121 additions & 93 deletions src/main/java/reactor/core/publisher/Flux.java

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions src/main/java/reactor/core/publisher/FluxBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.reactivestreams.Subscription;
import reactor.core.Cancellation;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.TimedScheduler;

/**
* @author Stephane Maldini
Expand All @@ -35,12 +34,12 @@ abstract class FluxBatch<T, V> extends FluxSource<T, V> {

final int batchSize;
final long timespan;
final TimedScheduler timer;
final Scheduler timer;

public FluxBatch(Publisher<T> source,
int batchSize,
long timespan,
final TimedScheduler timer) {
final Scheduler timer) {
super(source);
if (timespan <= 0) {
throw new IllegalArgumentException("Timeout period must be strictly " + "positive");
Expand Down Expand Up @@ -79,7 +78,7 @@ public synchronized Throwable fillInStackTrace() {
final boolean first;
final int batchSize;
final long timespan;
final TimedScheduler.TimedWorker timer;
final Scheduler.Worker timer;
final Runnable flushTask;

volatile int terminated =
Expand All @@ -106,7 +105,7 @@ public BatchAction(Subscriber<? super V> actual,
int batchSize,
boolean first,
long timespan,
final TimedScheduler.TimedWorker timer) {
final Scheduler.Worker timer) {

super(actual);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.scheduler.Scheduler;

/**
* @author Stephane Maldini
Expand All @@ -35,7 +35,7 @@ final class FluxBufferTimeOrSize<T, C extends Collection<? super T>> extends Flu
public FluxBufferTimeOrSize(Publisher<T> source,
int maxSize,
long timespan,
TimedScheduler timer,
Scheduler timer,
Supplier<C> bufferSupplier) {
super(source, maxSize, timespan, timer);
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
Expand All @@ -58,7 +58,7 @@ final static class BufferAction<T, C extends Collection<? super T>> extends Batc
public BufferAction(Subscriber<? super C> actual,
int maxSize,
long timespan,
TimedScheduler.TimedWorker timer,
Scheduler.Worker timer,
Supplier<C> bufferSupplier) {
super(actual, maxSize, false, timespan, timer);
this.bufferSupplier = bufferSupplier;
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/reactor/core/publisher/FluxElapsed.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

Expand All @@ -31,9 +31,9 @@
*/
final class FluxElapsed<T> extends FluxSource<T, Tuple2<Long, T>> implements Fuseable {

final TimedScheduler scheduler;
final Scheduler scheduler;

FluxElapsed(Publisher<T> source, TimedScheduler scheduler) {
FluxElapsed(Publisher<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Expand All @@ -47,15 +47,14 @@ static final class ElapsedSubscriber<T>
implements Subscriber<T>, QueueSubscription<Tuple2<Long, T>> {

final Subscriber<? super Tuple2<Long, T>> actual;
final TimedScheduler scheduler;
final Scheduler scheduler;

Subscription s;
QueueSubscription<T> qs;

long lastTime;

ElapsedSubscriber(Subscriber<? super Tuple2<Long, T>> actual,
TimedScheduler scheduler) {
ElapsedSubscriber(Subscriber<? super Tuple2<Long, T>> actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/reactor/core/publisher/FluxInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.scheduler.TimedScheduler.TimedWorker;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;

/**
* Periodically emits an ever increasing long value either via a ScheduledExecutorService
Expand All @@ -32,7 +32,7 @@
*/
final class FluxInterval extends Flux<Long> {

final TimedScheduler timedScheduler;
final Scheduler timedScheduler;

final long initialDelay;

Expand All @@ -44,7 +44,7 @@ public FluxInterval(
long initialDelay,
long period,
TimeUnit unit,
TimedScheduler timedScheduler) {
Scheduler timedScheduler) {
if (period < 0L) {
throw new IllegalArgumentException("period >= 0 required but it was " + period);
}
Expand All @@ -57,7 +57,7 @@ public FluxInterval(
@Override
public void subscribe(Subscriber<? super Long> s) {

TimedWorker w = timedScheduler.createWorker();
Worker w = timedScheduler.createWorker();

IntervalRunnable r = new IntervalRunnable(s, w);

Expand All @@ -69,7 +69,7 @@ public void subscribe(Subscriber<? super Long> s) {
static final class IntervalRunnable implements Runnable, Subscription {
final Subscriber<? super Long> s;

final TimedWorker worker;
final Worker worker;

volatile long requested;
static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
Expand All @@ -79,7 +79,7 @@ static final class IntervalRunnable implements Runnable, Subscription {

volatile boolean cancelled;

public IntervalRunnable(Subscriber<? super Long> s, TimedWorker worker) {
public IntervalRunnable(Subscriber<? super Long> s, Worker worker) {
this.s = s;
this.worker = worker;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import reactor.core.Producer;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.QueueSupplier;

/**
Expand All @@ -47,7 +47,7 @@ final class FluxReplay<T> extends ConnectableFlux<T>
final Publisher<T> source;
final int history;
final long ttl;
final TimedScheduler scheduler;
final Scheduler scheduler;

volatile ReplaySubscriber<T> connection;
@SuppressWarnings("rawtypes")
Expand All @@ -59,7 +59,7 @@ final class FluxReplay<T> extends ConnectableFlux<T>
FluxReplay(Publisher<T> source,
int history,
long ttl,
TimedScheduler scheduler) {
Scheduler scheduler) {
this.source = Objects.requireNonNull(source, "source");
this.history = history;
if(history < 0){
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/reactor/core/publisher/FluxWindowTimeOrSize.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.reactivestreams.Subscription;
import reactor.core.Loopback;
import reactor.core.Producer;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.scheduler.Scheduler;

/**
* WindowAction is forwarding events on a steam until {@code backlog} is reached, after that streams collected events
Expand All @@ -30,7 +30,7 @@
*/
final class FluxWindowTimeOrSize<T> extends FluxBatch<T, Flux<T>> {

public FluxWindowTimeOrSize(Publisher<T> source, int backlog, long timespan, TimedScheduler timer) {
public FluxWindowTimeOrSize(Publisher<T> source, int backlog, long timespan, Scheduler timer) {
super(source, backlog, timespan, timer);
}

Expand All @@ -42,11 +42,11 @@ public void subscribe(Subscriber<? super Flux<T>> subscriber) {
final static class Window<T> extends Flux<T> implements Subscriber<T>, Subscription, Producer {

final protected UnicastProcessor<T> processor;
final protected TimedScheduler timer;
final protected Scheduler timer;

protected int count = 0;

public Window(TimedScheduler timer) {
public Window(Scheduler timer) {
this.processor = UnicastProcessor.create();
this.timer = timer;
}
Expand Down Expand Up @@ -95,14 +95,14 @@ public Object downstream() {

final static class WindowAction<T> extends BatchAction<T, Flux<T>> implements Loopback {

private final TimedScheduler timer;
private final Scheduler timer;

private Window<T> currentWindow;

public WindowAction(Subscriber<? super Flux<T>> actual,
int backlog,
long timespan,
TimedScheduler timer) {
Scheduler timer) {

super(actual, backlog, true, timespan, timer.createWorker());
this.timer = timer;
Expand Down

0 comments on commit 6f3383d

Please sign in to comment.