Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce ContextAware interface, store tracking on the context #1492

Merged
merged 10 commits into from
Jan 28, 2019
38 changes: 38 additions & 0 deletions reactor-core/src/main/java/reactor/core/ContextAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core;

import reactor.util.context.Context;

/**
* A common interface for the {@link Context} aware abstractions.
*
* @see reactor.core.scheduler.Scheduler.ContextRunnable
* @see CoreSubscriber
*
* @author Sergei Egorov
*/
public interface ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
Context currentContext();
}
7 changes: 2 additions & 5 deletions reactor-core/src/main/java/reactor/core/CoreSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
*
* @since 3.1.0
*/
public interface CoreSubscriber<T> extends Subscriber<T> {
public interface CoreSubscriber<T> extends Subscriber<T>, ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
* {@inheritDoc}
*/
default Context currentContext(){
return Context.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -89,10 +90,10 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
final static int TERMINATED_WITH_ERROR = 2;
final static int TERMINATED_WITH_CANCEL = 3;

final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final Runnable flushTask;
final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final ContextRunnable flushTask;

protected Subscription subscription;

Expand Down Expand Up @@ -130,19 +131,27 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
this.ctx = actual.currentContext();
this.timespan = timespan;
this.timer = timer;
this.flushTask = () -> {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(this, index, 0)){
break;
this.flushTask = new ContextRunnable() {
@Override
public void run() {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = BufferTimeoutSubscriber.this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(BufferTimeoutSubscriber.this, index, 0)){
break;
}
}
flushCallback(null);
}
flushCallback(null);
}

@Override
public Context currentContext() {
return ctx;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Object scanUnsafe(Attr key) {
}

static final class CancelSubscriber<T>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;
final Scheduler scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;

/**
* @author Simon Baslé
Expand Down Expand Up @@ -106,7 +107,17 @@ public void onNext(final T t) {
//we can also delay onError/onComplete when an onNext
//is "in flight"
DELAYED.incrementAndGet(this);
w.schedule(() -> delayedNext(t), delay, timeUnit);
w.schedule(new Scheduler.ContextRunnable() {
@Override
public void run() {
delayedNext(t);
}

@Override
public Context currentContext() {
return DelaySubscriber.this.currentContext();
}
}, delay, timeUnit);
}

private void delayedNext(T t) {
Expand Down Expand Up @@ -169,13 +180,18 @@ public Object scanUnsafe(Attr key) {
return InnerOperator.super.scanUnsafe(key);
}

final class OnError implements Runnable {
final class OnError implements Scheduler.ContextRunnable {
private final Throwable t;

OnError(Throwable t) {
this.t = t;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void run() {
try {
Expand All @@ -186,7 +202,13 @@ public void run() {
}
}

final class OnComplete implements Runnable {
final class OnComplete implements Scheduler.ContextRunnable {

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Periodically emits an ever increasing long value either via a ScheduledExecutorService
Expand Down Expand Up @@ -83,13 +84,19 @@ public Object scanUnsafe(Attr key) {
return null;
}

static final class IntervalRunnable implements Runnable, Subscription,
static final class IntervalRunnable implements Scheduler.ContextRunnable, Subscription,
InnerProducer<Long> {
final CoreSubscriber<? super Long> actual;

final Worker worker;

volatile long requested;

@Override
public Context currentContext() {
return actual.currentContext();
}

static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Object scanUnsafe(Attr key) {
}

static final class BackpressureBufferTimeoutSubscriber<T> extends ArrayDeque<Object>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;
final Context ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -115,7 +116,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

static final class PublishOnSubscriber<T>
implements QueueSubscription<T>, Runnable, InnerOperator<T, T> {
implements QueueSubscription<T>, ContextRunnable, InnerOperator<T, T> {

final CoreSubscriber<? super T> actual;

Expand Down Expand Up @@ -584,7 +585,7 @@ public int size() {
}

static final class PublishOnConditionalSubscriber<T>
implements QueueSubscription<T>, Runnable, InnerOperator<T, T> {
implements QueueSubscription<T>, ContextRunnable, InnerOperator<T, T> {

final ConditionalSubscriber<? super T> actual;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* @author Simon Baslé
Expand Down Expand Up @@ -75,7 +76,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
synchronized (this) {
conn = connection;
if (conn == null || conn.terminated) {
conn = new RefConnection(this);
conn = new RefConnection(this, actual.currentContext());
connection = conn;
}

Expand Down Expand Up @@ -152,21 +153,29 @@ void timeout(RefConnection rc) {
}
}

static final class RefConnection implements Runnable, Consumer<Disposable> {
static final class RefConnection implements Scheduler.ContextRunnable, Consumer<Disposable> {

final FluxRefCountGrace<?> parent;
final Context context;

Disposable timer;
long subscriberCount;
boolean connected;
boolean terminated;

volatile Disposable sourceDisconnector;

@Override
public Context currentContext() {
return context;
}

static final AtomicReferenceFieldUpdater<RefConnection, Disposable> SOURCE_DISCONNECTOR =
AtomicReferenceFieldUpdater.newUpdater(RefConnection.class, Disposable.class, "sourceDisconnector");

RefConnection(FluxRefCountGrace<?> parent) {
RefConnection(FluxRefCountGrace<?> parent, Context context) {
this.parent = parent;
this.context = context;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Subscribes to the source Publisher asynchronously through a scheduler function or
Expand Down Expand Up @@ -77,7 +79,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

static final class SubscribeOnSubscriber<T>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, ContextRunnable {

final CoreSubscriber<? super T> actual;

Expand Down Expand Up @@ -132,7 +134,17 @@ void requestUpstream(final long n, final Subscription s) {
}
else {
try {
worker.schedule(() -> s.request(n));
worker.schedule(new ContextRunnable() {
@Override
public void run() {
s.request(n);
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
}
catch (RejectedExecutionException ree) {
if(!worker.isDisposed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Executes a Callable and emits its value on the given Scheduler.
Expand Down Expand Up @@ -71,7 +72,7 @@ public Object scanUnsafe(Attr key) {
}

static final class CallableSubscribeOnSubscription<T>
implements QueueSubscription<T>, InnerProducer<T>, Runnable {
implements QueueSubscription<T>, InnerProducer<T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;

Expand All @@ -80,6 +81,12 @@ static final class CallableSubscribeOnSubscription<T>
final Scheduler scheduler;

volatile int state;

@Override
public Context currentContext() {
return actual.currentContext();
}

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<CallableSubscribeOnSubscription> STATE =
AtomicIntegerFieldUpdater.newUpdater(CallableSubscribeOnSubscription.class,
Expand Down Expand Up @@ -270,7 +277,17 @@ public void request(long n) {
if (s == NO_REQUEST_HAS_VALUE) {
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
try {
Disposable f = scheduler.schedule(this::emitValue);
Disposable f = scheduler.schedule(new Scheduler.ContextRunnable() {
simonbasle marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void run() {
CallableSubscribeOnSubscription.this.emitValue();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
setRequestFuture(f);
}
catch (RejectedExecutionException ree) {
Expand Down
Loading