Skip to content

Commit

Permalink
introduce ContextAware interface, store tracking in the context (#1492
Browse files Browse the repository at this point in the history
)

`ContextRunnable` is a new abstraction for the context passing.

It implements newly introduced `ContextAware` interface and
forces the implementors to provide a context (either current or empty).

old `Runnable`-based methods are marked as `@Deprecated` to
enforce the usage of the `ContextRunnable`.
  • Loading branch information
bsideup committed Jan 28, 2019
1 parent d53b85a commit d11fa50
Show file tree
Hide file tree
Showing 39 changed files with 552 additions and 107 deletions.
38 changes: 38 additions & 0 deletions reactor-core/src/main/java/reactor/core/ContextAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core;

import reactor.util.context.Context;

/**
* A common interface for the {@link Context} aware abstractions.
*
* @see reactor.core.scheduler.Scheduler.ContextRunnable
* @see CoreSubscriber
*
* @author Sergei Egorov
*/
public interface ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
Context currentContext();
}
7 changes: 2 additions & 5 deletions reactor-core/src/main/java/reactor/core/CoreSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
*
* @since 3.1.0
*/
public interface CoreSubscriber<T> extends Subscriber<T> {
public interface CoreSubscriber<T> extends Subscriber<T>, ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
* {@inheritDoc}
*/
default Context currentContext(){
return Context.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -89,10 +90,10 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
final static int TERMINATED_WITH_ERROR = 2;
final static int TERMINATED_WITH_CANCEL = 3;

final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final Runnable flushTask;
final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final ContextRunnable flushTask;

protected Subscription subscription;

Expand Down Expand Up @@ -130,19 +131,27 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
this.ctx = actual.currentContext();
this.timespan = timespan;
this.timer = timer;
this.flushTask = () -> {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(this, index, 0)){
break;
this.flushTask = new ContextRunnable() {
@Override
public void run() {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = BufferTimeoutSubscriber.this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(BufferTimeoutSubscriber.this, index, 0)){
break;
}
}
flushCallback(null);
}
flushCallback(null);
}

@Override
public Context currentContext() {
return ctx;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Object scanUnsafe(Attr key) {
}

static final class CancelSubscriber<T>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;
final Scheduler scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;

/**
* @author Simon Baslé
Expand Down Expand Up @@ -106,7 +107,17 @@ public void onNext(final T t) {
//we can also delay onError/onComplete when an onNext
//is "in flight"
DELAYED.incrementAndGet(this);
w.schedule(() -> delayedNext(t), delay, timeUnit);
w.schedule(new Scheduler.ContextRunnable() {
@Override
public void run() {
delayedNext(t);
}

@Override
public Context currentContext() {
return DelaySubscriber.this.currentContext();
}
}, delay, timeUnit);
}

private void delayedNext(T t) {
Expand Down Expand Up @@ -169,13 +180,18 @@ public Object scanUnsafe(Attr key) {
return InnerOperator.super.scanUnsafe(key);
}

final class OnError implements Runnable {
final class OnError implements Scheduler.ContextRunnable {
private final Throwable t;

OnError(Throwable t) {
this.t = t;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void run() {
try {
Expand All @@ -186,7 +202,13 @@ public void run() {
}
}

final class OnComplete implements Runnable {
final class OnComplete implements Scheduler.ContextRunnable {

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Periodically emits an ever increasing long value either via a ScheduledExecutorService
Expand Down Expand Up @@ -83,13 +84,19 @@ public Object scanUnsafe(Attr key) {
return null;
}

static final class IntervalRunnable implements Runnable, Subscription,
static final class IntervalRunnable implements Scheduler.ContextRunnable, Subscription,
InnerProducer<Long> {
final CoreSubscriber<? super Long> actual;

final Worker worker;

volatile long requested;

@Override
public Context currentContext() {
return actual.currentContext();
}

static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Object scanUnsafe(Attr key) {
}

static final class BackpressureBufferTimeoutSubscriber<T> extends ArrayDeque<Object>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;
final Context ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -115,7 +116,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

static final class PublishOnSubscriber<T>
implements QueueSubscription<T>, Runnable, InnerOperator<T, T> {
implements QueueSubscription<T>, ContextRunnable, InnerOperator<T, T> {

final CoreSubscriber<? super T> actual;

Expand Down Expand Up @@ -584,7 +585,7 @@ public int size() {
}

static final class PublishOnConditionalSubscriber<T>
implements QueueSubscription<T>, Runnable, InnerOperator<T, T> {
implements QueueSubscription<T>, ContextRunnable, InnerOperator<T, T> {

final ConditionalSubscriber<? super T> actual;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* @author Simon Baslé
Expand Down Expand Up @@ -75,7 +76,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
synchronized (this) {
conn = connection;
if (conn == null || conn.terminated) {
conn = new RefConnection(this);
conn = new RefConnection(this, actual.currentContext());
connection = conn;
}

Expand Down Expand Up @@ -152,21 +153,29 @@ void timeout(RefConnection rc) {
}
}

static final class RefConnection implements Runnable, Consumer<Disposable> {
static final class RefConnection implements Scheduler.ContextRunnable, Consumer<Disposable> {

final FluxRefCountGrace<?> parent;
final Context context;

Disposable timer;
long subscriberCount;
boolean connected;
boolean terminated;

volatile Disposable sourceDisconnector;

@Override
public Context currentContext() {
return context;
}

static final AtomicReferenceFieldUpdater<RefConnection, Disposable> SOURCE_DISCONNECTOR =
AtomicReferenceFieldUpdater.newUpdater(RefConnection.class, Disposable.class, "sourceDisconnector");

RefConnection(FluxRefCountGrace<?> parent) {
RefConnection(FluxRefCountGrace<?> parent, Context context) {
this.parent = parent;
this.context = context;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Subscribes to the source Publisher asynchronously through a scheduler function or
Expand Down Expand Up @@ -77,7 +79,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

static final class SubscribeOnSubscriber<T>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, ContextRunnable {

final CoreSubscriber<? super T> actual;

Expand Down Expand Up @@ -132,7 +134,17 @@ void requestUpstream(final long n, final Subscription s) {
}
else {
try {
worker.schedule(() -> s.request(n));
worker.schedule(new ContextRunnable() {
@Override
public void run() {
s.request(n);
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
}
catch (RejectedExecutionException ree) {
if(!worker.isDisposed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Executes a Callable and emits its value on the given Scheduler.
Expand Down Expand Up @@ -71,7 +72,7 @@ public Object scanUnsafe(Attr key) {
}

static final class CallableSubscribeOnSubscription<T>
implements QueueSubscription<T>, InnerProducer<T>, Runnable {
implements QueueSubscription<T>, InnerProducer<T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;

Expand All @@ -80,6 +81,12 @@ static final class CallableSubscribeOnSubscription<T>
final Scheduler scheduler;

volatile int state;

@Override
public Context currentContext() {
return actual.currentContext();
}

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<CallableSubscribeOnSubscription> STATE =
AtomicIntegerFieldUpdater.newUpdater(CallableSubscribeOnSubscription.class,
Expand Down Expand Up @@ -270,7 +277,17 @@ public void request(long n) {
if (s == NO_REQUEST_HAS_VALUE) {
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
try {
Disposable f = scheduler.schedule(this::emitValue);
Disposable f = scheduler.schedule(new Scheduler.ContextRunnable() {
@Override
public void run() {
CallableSubscribeOnSubscription.this.emitValue();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
setRequestFuture(f);
}
catch (RejectedExecutionException ree) {
Expand Down
Loading

0 comments on commit d11fa50

Please sign in to comment.