Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic tracking support #1481

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions reactor-core/src/main/java/reactor/core/ContextAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core;

import reactor.util.context.Context;

/**
* A common interface for the {@link Context} aware abstractions.
*
* @see reactor.core.scheduler.Scheduler.ContextRunnable
* @see CoreSubscriber
*
* @author Sergei Egorov
*/
public interface ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
Context currentContext();
}
7 changes: 2 additions & 5 deletions reactor-core/src/main/java/reactor/core/CoreSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
*
* @since 3.1.0
*/
public interface CoreSubscriber<T> extends Subscriber<T> {
public interface CoreSubscriber<T> extends Subscriber<T>, ContextAware {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
* {@inheritDoc}
*/
default Context currentContext(){
return Context.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.util.annotation.NonNull;
import reactor.util.context.Context;

/**
* A set of callbacks to track the {@link Context} propagation inside a reactive execution
* chain.
*
* @author Sergei Egorov
* @since 3.2.6
*/
public interface ContextTracker {

/**
* A callback to be triggered on the subscription.
* Implementors should return a new, enriched {@link Context} (see {@link Context#put(Object, Object)}),
* or the same context if the tracking is not desired.
* <p>
* Example implementation:
* <pre>{@code
* if (!shouldTrack()) return context;
* Marker parent = context.getOrDefault(Marker.class, null);
* Marker marker = new Marker(parent);
* return context.put(Marker.class, marker);
* }</pre>
*
* @param context the {@link Context} of {@link reactor.core.CoreSubscriber}
* @see reactor.core.CoreSubscriber
* @return modified or the same {@link Context}
*/
@NonNull
Context onSubscribe(Context context);

/**
* {@link ContextTracker} implementors receive this callback before
* the {@link reactor.core.scheduler.Scheduler}s execute scheduled {@link Runnable}s.
* <p>
* The method would extract tracking information from {@link Context} populated in {@link #onSubscribe(Context)}
* and put it in appropriate {@link ThreadLocal} variables,
* then return a {@link Disposable} that clears said {@link ThreadLocal}s.
*
* @param context the current {@link Context}
* @return {@link Disposable} to be disposed after the scheduled {@link Runnable}
*/
default Disposable onContextPassing(Context context) {
return Disposables.disposed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;

import reactor.core.ContextAware;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;

class ContextTrackingExecutorServiceDecorator
implements BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> {

private final Collection<ContextTracker> trackers;

ContextTrackingExecutorServiceDecorator(Collection<ContextTracker> trackers) {
this.trackers = trackers;
}

@Override
public ScheduledExecutorService apply(Scheduler scheduler, ScheduledExecutorService service) {
return new TaskWrappingScheduledExecutorService(service) {

@Override
protected Runnable wrap(Runnable runnable) {
if (!(runnable instanceof ContextAware)) {
return runnable;
}
Context context = ((ContextAware) runnable).currentContext();

if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return runnable;
}

return new Scheduler.ContextRunnable() {

@Override
public Context currentContext() {
return context;
}

@Override
public void run() {
Disposable.Composite composite = Disposables.composite();

for (ContextTracker tracker : trackers) {
Disposable disposable = tracker.onContextPassing(context);
composite.add(disposable);
}

try {
runnable.run();
}
finally {
composite.dispose();
}
}
};
}

@Override
protected <V> Callable<V> wrap(Callable<V> callable) {
if (!(callable instanceof ContextAware)) {
return callable;
}
Context context = ((ContextAware) callable).currentContext();

if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return callable;
}

return new ContextAwareCallable<>(callable, context);
}
};
}

private class ContextAwareCallable<V> implements Callable<V>, ContextAware {

private final Callable<V> callable;
private final Context context;

public ContextAwareCallable(Callable<V> callable, Context context) {
this.callable = callable;
this.context = context;
}

@Override
public V call() throws Exception {
Disposable.Composite composite = Disposables.composite();

for (ContextTracker contextTracker : trackers) {
Disposable disposable = contextTracker.onContextPassing(context);
composite.add(disposable);
}

try {
return callable.call();
}
finally {
composite.dispose();
}
}

@Override
public Context currentContext() {
return context;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Collection;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.FluxContextStart.ContextStartSubscriber;
import reactor.util.context.Context;

class ContextTrackingPublisher implements CorePublisher<Object> {

private final Publisher<Object> source;

final Collection<ContextTracker> trackers;

ContextTrackingPublisher(Publisher<Object> source, Collection<ContextTracker> trackers) {
this.source = source;
this.trackers = trackers;
}

@Override
public void subscribe(CoreSubscriber<? super Object> subscriber) {
Context originalContext = subscriber.currentContext();
Context context = originalContext;

for (ContextTracker tracker : trackers) {
context = tracker.onSubscribe(context);
}

if (context != originalContext) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
context = context.put(Hooks.KEY_CONTEXT_TRACKING, true);
}
subscriber = new ContextStartSubscriber<>(subscriber, context);
}

if (source instanceof CorePublisher) {
((CorePublisher<Object>) source).subscribe(subscriber);
}
else {
source.subscribe(subscriber);
}
}

@Override
public void subscribe(Subscriber<? super Object> s) {
source.subscribe(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.ContextRunnable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -89,10 +90,10 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
final static int TERMINATED_WITH_ERROR = 2;
final static int TERMINATED_WITH_CANCEL = 3;

final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final Runnable flushTask;
final int batchSize;
final long timespan;
final Scheduler.Worker timer;
final ContextRunnable flushTask;

protected Subscription subscription;

Expand Down Expand Up @@ -130,19 +131,27 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
this.ctx = actual.currentContext();
this.timespan = timespan;
this.timer = timer;
this.flushTask = () -> {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(this, index, 0)){
break;
this.flushTask = new ContextRunnable() {
@Override
public void run() {
if (terminated == NOT_TERMINATED) {
int index;
for(;;){
index = BufferTimeoutSubscriber.this.index;
if(index == 0){
return;
}
if(INDEX.compareAndSet(BufferTimeoutSubscriber.this, index, 0)){
break;
}
}
flushCallback(null);
}
flushCallback(null);
}

@Override
public Context currentContext() {
return ctx;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Object scanUnsafe(Attr key) {
}

static final class CancelSubscriber<T>
implements InnerOperator<T, T>, Runnable {
implements InnerOperator<T, T>, Scheduler.ContextRunnable {

final CoreSubscriber<? super T> actual;
final Scheduler scheduler;
Expand Down
Loading