Skip to content

Commit

Permalink
Replace trac(k)ing abstract with more generic context tracking (#1495)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Jan 28, 2019
1 parent d11fa50 commit acc2b0d
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.util.annotation.NonNull;
import reactor.util.context.Context;

/**
* A set of callbacks to track the {@link Context} propagation inside a reactive execution
* chain.
*
* @author Sergei Egorov
* @since 3.2.6
*/
public interface ContextTracker {

/**
* A callback to be triggered on the subscription.
* Implementors should return a new, enriched {@link Context} (see {@link Context#put(Object, Object)}),
* or the same context if the tracking is not desired.
* <p>
* Example implementation:
* <pre>{@code
* if (!shouldTrack()) return context;
* Marker parent = context.getOrDefault(Marker.class, null);
* Marker marker = new Marker(parent);
* return context.put(Marker.class, marker);
* }</pre>
*
* @param context the {@link Context} of {@link reactor.core.CoreSubscriber}
* @see reactor.core.CoreSubscriber
* @return modified or the same {@link Context}
*/
@NonNull
Context onSubscribe(Context context);

/**
* {@link ContextTracker} implementors receive this callback before
* the {@link reactor.core.scheduler.Scheduler}s execute scheduled {@link Runnable}s.
* <p>
* The method would extract tracking information from {@link Context} populated in {@link #onSubscribe(Context)}
* and put it in appropriate {@link ThreadLocal} variables,
* then return a {@link Disposable} that clears said {@link ThreadLocal}s.
*
* @param context the current {@link Context}
* @return {@link Disposable} to be disposed after the scheduled {@link Runnable}
*/
default Disposable onContextPassing(Context context) {
return Disposables.disposed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;

class TrackingExecutorServiceDecorator
class ContextTrackingExecutorServiceDecorator
implements BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> {

private final Collection<Tracker> trackers;
private final Collection<ContextTracker> trackers;

TrackingExecutorServiceDecorator(Collection<Tracker> trackers) {
ContextTrackingExecutorServiceDecorator(Collection<ContextTracker> trackers) {
this.trackers = trackers;
}

Expand All @@ -46,9 +46,8 @@ protected Runnable wrap(Runnable runnable) {
return runnable;
}
Context context = ((ContextAware) runnable).currentContext();
Tracker.Marker marker = context.getOrDefault(Tracker.Marker.class, null);

if (marker == null) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return runnable;
}

Expand All @@ -63,8 +62,8 @@ public Context currentContext() {
public void run() {
Disposable.Composite composite = Disposables.composite();

for (Tracker tracker : trackers) {
Disposable disposable = tracker.onScopePassing(marker);
for (ContextTracker tracker : trackers) {
Disposable disposable = tracker.onContextPassing(context);
composite.add(disposable);
}

Expand All @@ -84,27 +83,22 @@ protected <V> Callable<V> wrap(Callable<V> callable) {
return callable;
}
Context context = ((ContextAware) callable).currentContext();
Tracker.Marker marker = context.getOrDefault(Tracker.Marker.class, null);

if (marker == null) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return callable;
}

return new ContextAwareCallable<>(marker, callable, context);
return new ContextAwareCallable<>(callable, context);
}
};
}

private class ContextAwareCallable<V> implements Callable<V>, ContextAware {

private final Tracker.Marker marker;
private final Callable<V> callable;
private final Context context;

public ContextAwareCallable(Tracker.Marker marker,
Callable<V> callable,
Context context) {
this.marker = marker;
public ContextAwareCallable(Callable<V> callable, Context context) {
this.callable = callable;
this.context = context;
}
Expand All @@ -113,8 +107,8 @@ public ContextAwareCallable(Tracker.Marker marker,
public V call() throws Exception {
Disposable.Composite composite = Disposables.composite();

for (Tracker tracker : trackers) {
Disposable disposable = tracker.onScopePassing(marker);
for (ContextTracker contextTracker : trackers) {
Disposable disposable = contextTracker.onContextPassing(context);
composite.add(disposable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,38 @@
import reactor.core.publisher.FluxContextStart.ContextStartSubscriber;
import reactor.util.context.Context;

class TrackingPublisher implements CorePublisher<Object> {
class ContextTrackingPublisher implements CorePublisher<Object> {

private final Publisher<Object> source;

final Collection<Tracker> trackers;
final Collection<ContextTracker> trackers;

TrackingPublisher(Publisher<Object> source, Collection<Tracker> trackers) {
ContextTrackingPublisher(Publisher<Object> source, Collection<ContextTracker> trackers) {
this.source = source;
this.trackers = trackers;
}

@Override
public void subscribe(CoreSubscriber<? super Object> subscriber) {
if (trackers.stream().noneMatch(Tracker::shouldCreateMarker)) {
if (source instanceof CorePublisher) {
((CorePublisher<Object>) source).subscribe(subscriber);
}
else {
source.subscribe(subscriber);
}
return;
}
Context originalContext = subscriber.currentContext();
Context context = originalContext;

Context context = subscriber.currentContext();
Tracker.Marker parent = context.getOrDefault(Tracker.Marker.class, null);

Tracker.Marker current = new Tracker.Marker(parent);
context = context.put(Tracker.Marker.class, current);
for (ContextTracker tracker : trackers) {
context = tracker.onSubscribe(context);
}

for (Tracker tracker : trackers) {
tracker.onMarkerCreated(current);
if (context != originalContext) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
context = context.put(Hooks.KEY_CONTEXT_TRACKING, true);
}
subscriber = new ContextStartSubscriber<>(subscriber, context);
}

if (source instanceof CorePublisher) {
((CorePublisher<Object>) source).subscribe(new ContextStartSubscriber<>(subscriber, context));
((CorePublisher<Object>) source).subscribe(subscriber);
}
else {
source.subscribe(new ContextStartSubscriber<>(subscriber, context));
source.subscribe(subscriber);
}
}

Expand Down
40 changes: 20 additions & 20 deletions reactor-core/src/main/java/reactor/core/publisher/Hooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,41 +469,41 @@ public static void resetOnNextError() {
}
}

public static void addTracker(String key, Tracker tracker) {
public static void addContextTracker(String key, ContextTracker contextTracker) {
synchronized (log) {
boolean wasEmpty = trackers.isEmpty();
trackers.put(key, tracker);
boolean wasEmpty = contextTrackers.isEmpty();
contextTrackers.put(key, contextTracker);
if (wasEmpty) {
installTracing();
installContextTracing();
}
}
}

public static boolean removeTracker(String key) {
public static boolean removeContextTracker(String key) {
synchronized (log) {
boolean removed = trackers.remove(key) != null;
boolean removed = contextTrackers.remove(key) != null;
if (removed) {
if (trackers.isEmpty()) {
uninstallTracing();
if (contextTrackers.isEmpty()) {
uninstallContextTracing();
}
}
return removed;
}
}

static void installTracing() {
static void installContextTracing() {
synchronized (log) {
Collection<Tracker> trackersView = Collections.unmodifiableCollection(trackers.values());
Collection<ContextTracker> trackersView = Collections.unmodifiableCollection(contextTrackers.values());

onLastOperator(KEY_TRACKING, source -> new TrackingPublisher(source, trackersView));
Schedulers.addExecutorServiceDecorator(KEY_TRACKING, new TrackingExecutorServiceDecorator(trackersView));
onLastOperator(KEY_CONTEXT_TRACKING, source -> new ContextTrackingPublisher(source, trackersView));
Schedulers.addExecutorServiceDecorator(KEY_CONTEXT_TRACKING, new ContextTrackingExecutorServiceDecorator(trackersView));
}
}

static void uninstallTracing() {
static void uninstallContextTracing() {
synchronized (log) {
Schedulers.removeExecutorServiceDecorator(KEY_TRACKING);
resetOnLastOperator(KEY_TRACKING);
Schedulers.removeExecutorServiceDecorator(KEY_CONTEXT_TRACKING);
resetOnLastOperator(KEY_CONTEXT_TRACKING);
}
}

Expand Down Expand Up @@ -585,7 +585,7 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
private static final LinkedHashMap<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> onLastOperatorHooks;
private static final LinkedHashMap<String, BiFunction<? super Throwable, Object, ? extends Throwable>> onOperatorErrorHooks;

private static final LinkedHashMap<String, Tracker> trackers;
private static final LinkedHashMap<String, ContextTracker> contextTrackers;

//Immutable views on hook trackers, for testing purpose
static final Map<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> getOnEachOperatorHooks() {
Expand All @@ -598,8 +598,8 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
return Collections.unmodifiableMap(onOperatorErrorHooks);
}

static final Map<String, Tracker> getTrackers() {
return Collections.unmodifiableMap(trackers);
static final Map<String, ContextTracker> getContextTrackers() {
return Collections.unmodifiableMap(contextTrackers);
}

static final Logger log = Loggers.getLogger(Hooks.class);
Expand Down Expand Up @@ -633,7 +633,7 @@ static final Map<String, Tracker> getTrackers() {
*/
static final String KEY_ON_REJECTED_EXECUTION = "reactor.onRejectedExecution.local";

static final String KEY_TRACKING = "reactor.tracking";
static final String KEY_CONTEXT_TRACKING = "reactor.context.tracking";

static boolean GLOBAL_TRACE =
Boolean.parseBoolean(System.getProperty("reactor.trace.operatorStacktrace",
Expand All @@ -643,7 +643,7 @@ static final Map<String, Tracker> getTrackers() {
onEachOperatorHooks = new LinkedHashMap<>(1);
onLastOperatorHooks = new LinkedHashMap<>(1);
onOperatorErrorHooks = new LinkedHashMap<>(1);
trackers = new LinkedHashMap<>(1);
contextTrackers = new LinkedHashMap<>(1);
}

Hooks() {
Expand Down
43 changes: 0 additions & 43 deletions reactor-core/src/main/java/reactor/core/publisher/Tracker.java

This file was deleted.

Loading

0 comments on commit acc2b0d

Please sign in to comment.