Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace trac(k)ing abstract with more generic context tracking #1495

Merged
merged 2 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.util.annotation.NonNull;
import reactor.util.context.Context;

/**
* A set of callbacks to track the {@link Context} propagation inside a reactive execution
* chain.
*
* @author Sergei Egorov
* @since 3.2.6
*/
public interface ContextTracker {

/**
* A callback to be triggered on the subscription.
* Implementors should return a new, enriched {@link Context} (see {@link Context#put(Object, Object)}),
* or the same context if the tracking is not desired.
* <p>
* Example implementation:
* <pre>{@code
* if (!shouldTrack()) return context;
* Marker parent = context.getOrDefault(Marker.class, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snippet still references Marker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an example, Marker is something provided by the implementation

* Marker marker = new Marker(parent);
* return context.put(Marker.class, marker);
* }</pre>
*
* @param context the {@link Context} of {@link reactor.core.CoreSubscriber}
* @see reactor.core.CoreSubscriber
* @return modified or the same {@link Context}
*/
@NonNull
Context onSubscribe(Context context);

/**
* {@link ContextTracker} implementors receive this callback before
* the {@link reactor.core.scheduler.Scheduler}s execute scheduled {@link Runnable}s.
* <p>
* Consider implementing it if your tracking relies on {@link ThreadLocal}s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe flesh out this sentence: the method would extract key-value pairs from Context and put them in appropriate ThreadLocal variables, then return a Disposable that clears said threadlocals. Maybe ThreadLocal is not the right example either (MDC ?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most tracers use ThreadLocal to store the spans (as well as MDC's implementation), hence the mention :)

But I like the idea of guiding how to implement it, will add 👍

*
* @param context the current {@link Context}
* @return {@link Disposable} to be disposed after the scheduled {@link Runnable}
*/
default Disposable onContextPassing(Context context) {
return Disposables.disposed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;

class TrackingExecutorServiceDecorator
class ContextTrackingExecutorServiceDecorator
implements BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> {

private final Collection<Tracker> trackers;
private final Collection<ContextTracker> trackers;

TrackingExecutorServiceDecorator(Collection<Tracker> trackers) {
ContextTrackingExecutorServiceDecorator(Collection<ContextTracker> trackers) {
this.trackers = trackers;
}

Expand All @@ -46,9 +46,8 @@ protected Runnable wrap(Runnable runnable) {
return runnable;
}
Context context = ((ContextAware) runnable).currentContext();
Tracker.Marker marker = context.getOrDefault(Tracker.Marker.class, null);

if (marker == null) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return runnable;
}

Expand All @@ -63,8 +62,8 @@ public Context currentContext() {
public void run() {
Disposable.Composite composite = Disposables.composite();

for (Tracker tracker : trackers) {
Disposable disposable = tracker.onScopePassing(marker);
for (ContextTracker tracker : trackers) {
Disposable disposable = tracker.onContextPassing(context);
composite.add(disposable);
}

Expand All @@ -84,27 +83,22 @@ protected <V> Callable<V> wrap(Callable<V> callable) {
return callable;
}
Context context = ((ContextAware) callable).currentContext();
Tracker.Marker marker = context.getOrDefault(Tracker.Marker.class, null);

if (marker == null) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
return callable;
}

return new ContextAwareCallable<>(marker, callable, context);
return new ContextAwareCallable<>(callable, context);
}
};
}

private class ContextAwareCallable<V> implements Callable<V>, ContextAware {

private final Tracker.Marker marker;
private final Callable<V> callable;
private final Context context;

public ContextAwareCallable(Tracker.Marker marker,
Callable<V> callable,
Context context) {
this.marker = marker;
public ContextAwareCallable(Callable<V> callable, Context context) {
this.callable = callable;
this.context = context;
}
Expand All @@ -113,8 +107,8 @@ public ContextAwareCallable(Tracker.Marker marker,
public V call() throws Exception {
Disposable.Composite composite = Disposables.composite();

for (Tracker tracker : trackers) {
Disposable disposable = tracker.onScopePassing(marker);
for (ContextTracker contextTracker : trackers) {
Disposable disposable = contextTracker.onContextPassing(context);
composite.add(disposable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,38 @@
import reactor.core.publisher.FluxContextStart.ContextStartSubscriber;
import reactor.util.context.Context;

class TrackingPublisher implements CorePublisher<Object> {
class ContextTrackingPublisher implements CorePublisher<Object> {

private final Publisher<Object> source;

final Collection<Tracker> trackers;
final Collection<ContextTracker> trackers;

TrackingPublisher(Publisher<Object> source, Collection<Tracker> trackers) {
ContextTrackingPublisher(Publisher<Object> source, Collection<ContextTracker> trackers) {
this.source = source;
this.trackers = trackers;
}

@Override
public void subscribe(CoreSubscriber<? super Object> subscriber) {
if (trackers.stream().noneMatch(Tracker::shouldCreateMarker)) {
if (source instanceof CorePublisher) {
((CorePublisher<Object>) source).subscribe(subscriber);
}
else {
source.subscribe(subscriber);
}
return;
}
Context originalContext = subscriber.currentContext();
Context context = originalContext;

Context context = subscriber.currentContext();
Tracker.Marker parent = context.getOrDefault(Tracker.Marker.class, null);

Tracker.Marker current = new Tracker.Marker(parent);
context = context.put(Tracker.Marker.class, current);
for (ContextTracker tracker : trackers) {
context = tracker.onSubscribe(context);
}

for (Tracker tracker : trackers) {
tracker.onMarkerCreated(current);
if (context != originalContext) {
if (!context.hasKey(Hooks.KEY_CONTEXT_TRACKING)) {
context = context.put(Hooks.KEY_CONTEXT_TRACKING, true);
}
subscriber = new ContextStartSubscriber<>(subscriber, context);
}

if (source instanceof CorePublisher) {
((CorePublisher<Object>) source).subscribe(new ContextStartSubscriber<>(subscriber, context));
((CorePublisher<Object>) source).subscribe(subscriber);
}
else {
source.subscribe(new ContextStartSubscriber<>(subscriber, context));
source.subscribe(subscriber);
}
}

Expand Down
40 changes: 20 additions & 20 deletions reactor-core/src/main/java/reactor/core/publisher/Hooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,41 +469,41 @@ public static void resetOnNextError() {
}
}

public static void addTracker(String key, Tracker tracker) {
public static void addContextTracker(String key, ContextTracker contextTracker) {
synchronized (log) {
boolean wasEmpty = trackers.isEmpty();
trackers.put(key, tracker);
boolean wasEmpty = contextTrackers.isEmpty();
contextTrackers.put(key, contextTracker);
if (wasEmpty) {
installTracing();
installContextTracing();
}
}
}

public static boolean removeTracker(String key) {
public static boolean removeContextTracker(String key) {
synchronized (log) {
boolean removed = trackers.remove(key) != null;
boolean removed = contextTrackers.remove(key) != null;
if (removed) {
if (trackers.isEmpty()) {
uninstallTracing();
if (contextTrackers.isEmpty()) {
uninstallContextTracing();
}
}
return removed;
}
}

static void installTracing() {
static void installContextTracing() {
synchronized (log) {
Collection<Tracker> trackersView = Collections.unmodifiableCollection(trackers.values());
Collection<ContextTracker> trackersView = Collections.unmodifiableCollection(contextTrackers.values());

onLastOperator(KEY_TRACKING, source -> new TrackingPublisher(source, trackersView));
Schedulers.addExecutorServiceDecorator(KEY_TRACKING, new TrackingExecutorServiceDecorator(trackersView));
onLastOperator(KEY_CONTEXT_TRACKING, source -> new ContextTrackingPublisher(source, trackersView));
Schedulers.addExecutorServiceDecorator(KEY_CONTEXT_TRACKING, new ContextTrackingExecutorServiceDecorator(trackersView));
}
}

static void uninstallTracing() {
static void uninstallContextTracing() {
synchronized (log) {
Schedulers.removeExecutorServiceDecorator(KEY_TRACKING);
resetOnLastOperator(KEY_TRACKING);
Schedulers.removeExecutorServiceDecorator(KEY_CONTEXT_TRACKING);
resetOnLastOperator(KEY_CONTEXT_TRACKING);
}
}

Expand Down Expand Up @@ -585,7 +585,7 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
private static final LinkedHashMap<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> onLastOperatorHooks;
private static final LinkedHashMap<String, BiFunction<? super Throwable, Object, ? extends Throwable>> onOperatorErrorHooks;

private static final LinkedHashMap<String, Tracker> trackers;
private static final LinkedHashMap<String, ContextTracker> contextTrackers;

//Immutable views on hook trackers, for testing purpose
static final Map<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> getOnEachOperatorHooks() {
Expand All @@ -598,8 +598,8 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
return Collections.unmodifiableMap(onOperatorErrorHooks);
}

static final Map<String, Tracker> getTrackers() {
return Collections.unmodifiableMap(trackers);
static final Map<String, ContextTracker> getContextTrackers() {
return Collections.unmodifiableMap(contextTrackers);
}

static final Logger log = Loggers.getLogger(Hooks.class);
Expand Down Expand Up @@ -633,7 +633,7 @@ static final Map<String, Tracker> getTrackers() {
*/
static final String KEY_ON_REJECTED_EXECUTION = "reactor.onRejectedExecution.local";

static final String KEY_TRACKING = "reactor.tracking";
static final String KEY_CONTEXT_TRACKING = "reactor.context.tracking";

static boolean GLOBAL_TRACE =
Boolean.parseBoolean(System.getProperty("reactor.trace.operatorStacktrace",
Expand All @@ -643,7 +643,7 @@ static final Map<String, Tracker> getTrackers() {
onEachOperatorHooks = new LinkedHashMap<>(1);
onLastOperatorHooks = new LinkedHashMap<>(1);
onOperatorErrorHooks = new LinkedHashMap<>(1);
trackers = new LinkedHashMap<>(1);
contextTrackers = new LinkedHashMap<>(1);
}

Hooks() {
Expand Down
43 changes: 0 additions & 43 deletions reactor-core/src/main/java/reactor/core/publisher/Tracker.java

This file was deleted.

Loading