-
Notifications
You must be signed in to change notification settings - Fork 770
/
ContextPropagationOperator.java
377 lines (322 loc) · 12.8 KB
/
ContextPropagationOperator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.reactor.v3_1;
import static java.lang.invoke.MethodType.methodType;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
/** Based on Spring Sleuth's Reactor instrumentation. */
public final class ContextPropagationOperator {
private static final Logger logger = Logger.getLogger(ContextPropagationOperator.class.getName());
private static final Object VALUE = new Object();
@Nullable
private static final MethodHandle MONO_CONTEXT_WRITE_METHOD = getContextWriteMethod(Mono.class);
@Nullable
private static final MethodHandle FLUX_CONTEXT_WRITE_METHOD = getContextWriteMethod(Flux.class);
@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
@Nullable
private static MethodHandle getContextWriteMethod(Class<?> type) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
try {
return lookup.findVirtual(type, "contextWrite", methodType(type, Function.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// ignore
}
try {
return lookup.findVirtual(type, "subscriberContext", methodType(type, Function.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// ignore
}
return null;
}
@Nullable
private static MethodHandle getSchedulersHookMethod() {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
try {
return lookup.findStatic(
Schedulers.class, "onScheduleHook", methodType(void.class, String.class, Function.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// ignore
}
return null;
}
public static ContextPropagationOperator create() {
return builder().build();
}
public static ContextPropagationOperatorBuilder builder() {
return new ContextPropagationOperatorBuilder();
}
private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;
private static final Object TRACE_CONTEXT_KEY =
new Object() {
@Override
public String toString() {
return "otel-trace-context";
}
};
private static final Object lock = new Object();
private static volatile boolean enabled = false;
/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to store trace context in.
* @param traceContext Trace context to be stored.
*/
public static reactor.util.context.Context storeOpenTelemetryContext(
reactor.util.context.Context context, Context traceContext) {
return context.put(TRACE_CONTEXT_KEY, traceContext);
}
/**
* Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to get trace context from.
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or default value.
*/
public static Context getOpenTelemetryContext(
reactor.util.context.Context context, Context defaultTraceContext) {
return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}
/**
* Gets Trace {@link Context} from Reactor {@link reactor.util.context.ContextView}.
*
* @param contextView Reactor's context to get trace context from.
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or default value.
*/
@NoMuzzle
public static Context getOpenTelemetryContextFromContextView(
reactor.util.context.ContextView contextView, Context defaultTraceContext) {
return contextView.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}
ContextPropagationOperator(boolean captureExperimentalSpanAttributes) {
this.asyncOperationEndStrategy =
ReactorAsyncOperationEndStrategy.builder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.build();
}
/**
* Registers a hook that applies to every operator, propagating {@link Context} to downstream
* callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a
* reactive stream. This should generally be called in a static initializer block in your
* application.
*/
public void registerOnEachOperator() {
synchronized (lock) {
if (enabled) {
return;
}
Hooks.onEachOperator(
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
enabled = true;
}
}
private static void registerScheduleHook(String key, Function<Runnable, Runnable> function) {
if (SCHEDULERS_HOOK_METHOD == null) {
return;
}
try {
SCHEDULERS_HOOK_METHOD.invoke(key, function);
} catch (Throwable throwable) {
logger.log(Level.WARNING, "Failed to install scheduler hook", throwable);
}
}
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
synchronized (lock) {
if (!enabled) {
return;
}
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
enabled = false;
}
}
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
return Operators.lift(
ContextPropagationOperator::shouldInstrument, new Lifter<>(asyncOperationEndStrategy));
}
/** Forces Mono to run in traceContext scope. */
@SuppressWarnings("unchecked")
public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
if (!enabled || MONO_CONTEXT_WRITE_METHOD == null) {
return publisher;
}
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
try {
return (Mono<T>)
MONO_CONTEXT_WRITE_METHOD.invoke(
ScalarPropagatingMono.create(publisher),
new StoreOpenTelemetryContext(tracingContext));
} catch (Throwable t) {
// rethrowing without any wrapping to avoid any change to the underlying application behavior
throw sneakyThrow(t);
}
}
/** Forces Flux to run in traceContext scope. */
@SuppressWarnings("unchecked")
public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
if (!enabled || FLUX_CONTEXT_WRITE_METHOD == null) {
return publisher;
}
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
try {
return (Flux<T>)
FLUX_CONTEXT_WRITE_METHOD.invoke(
ScalarPropagatingFlux.create(publisher),
new StoreOpenTelemetryContext(tracingContext));
} catch (Throwable t) {
// rethrowing without any wrapping to avoid any change to the underlying application behavior
throw sneakyThrow(t);
}
}
@SuppressWarnings({"TypeParameterUnusedInFormals", "unchecked"})
private static <T extends Throwable> T sneakyThrow(Throwable t) throws T {
throw (T) t;
}
private static class StoreOpenTelemetryContext
implements Function<reactor.util.context.Context, reactor.util.context.Context> {
private final Context tracingContext;
private StoreOpenTelemetryContext(Context tracingContext) {
this.tracingContext = tracingContext;
}
@Override
public reactor.util.context.Context apply(reactor.util.context.Context context) {
return storeOpenTelemetryContext(context, tracingContext);
}
}
private static boolean shouldInstrument(Scannable publisher) {
// skip if Flux/Mono #just, #empty, #error
return !(publisher instanceof Fuseable.ScalarCallable);
}
private static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
/** Holds reference to strategy to prevent it from being collected. */
@SuppressWarnings({"FieldCanBeLocal", "UnusedVariable"})
private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;
public Lifter(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
this.asyncOperationEndStrategy = asyncOperationEndStrategy;
}
@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
return new TracingSubscriber<>(sub, sub.currentContext());
}
}
static void subscribeInActiveSpan(CoreSubscriber<? super Object> actual, Object value) {
Context tracingContextInReactor =
ContextPropagationOperator.getOpenTelemetryContext(actual.currentContext(), null);
if (tracingContextInReactor == null || tracingContextInReactor == Context.current()) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
} else {
try (Scope ignored = tracingContextInReactor.makeCurrent()) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
}
}
static class ScalarPropagatingMono extends Mono<Object> implements Scannable {
static <T> Mono<T> create(Mono<T> source) {
return new ScalarPropagatingMono(source).flatMap(unused -> source);
}
private final Mono<?> source;
private ScalarPropagatingMono(Mono<?> source) {
this.source = source;
}
@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, VALUE);
}
@Override
@Nullable
// Interface method doesn't have type parameter so we can't add it either.
@SuppressWarnings("rawtypes")
public Object scanUnsafe(Attr attr) {
if (attr == Attr.PARENT) {
return source;
}
return null;
}
}
static class ScalarPropagatingFlux extends Flux<Object> implements Scannable {
static <T> Flux<T> create(Flux<T> source) {
return new ScalarPropagatingFlux(source).flatMap(unused -> source);
}
private final Flux<?> source;
private ScalarPropagatingFlux(Flux<?> source) {
this.source = source;
}
@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, VALUE);
}
@Override
@Nullable
// Interface method doesn't have type parameter so we can't add it either.
@SuppressWarnings("rawtypes")
public Object scanUnsafe(Scannable.Attr attr) {
if (attr == Scannable.Attr.PARENT) {
return source;
}
return null;
}
}
private static class RunnableWrapper implements Runnable {
private final Runnable delegate;
private final Context context;
RunnableWrapper(Runnable delegate) {
this.delegate = delegate;
context = Context.current();
}
@Override
public void run() {
try (Scope ignore = context.makeCurrent()) {
delegate.run();
}
}
}
}