diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/SimpleObservation.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/SimpleObservation.java new file mode 100644 index 00000000..8ead4fc9 --- /dev/null +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/SimpleObservation.java @@ -0,0 +1,401 @@ +/* + * Copyright 2022 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.observation; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.lang.Nullable; +import io.micrometer.common.util.StringUtils; +import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Default implementation of {@link Observation}. + * + * @author Jonatan Ivanov + * @author Tommy Ludwig + * @author Marcin Grzejszczak + * @since 1.10.0 + */ +public class SimpleObservation implements Observation { + + private final ObservationRegistry registry; + + private final Context context; + + @Nullable + @SuppressWarnings("rawtypes") + private ObservationConvention convention; + + @SuppressWarnings("rawtypes") + private final Deque handlers; + + private final Collection filters; + + public final Map> enclosingScopes = new ConcurrentHashMap<>(); + + SimpleObservation(@Nullable String name, ObservationRegistry registry, Context context) { + this.registry = registry; + this.context = context; + this.context.setName(name); + this.convention = getConventionFromConfig(registry, context); + this.handlers = getHandlersFromConfig(registry, context); + this.filters = registry.observationConfig().getObservationFilters(); + setParentFromCurrentObservation(); + } + + SimpleObservation(ObservationConvention convention, ObservationRegistry registry, + Context context) { + this.registry = registry; + this.context = context; + // name is set later in start() + this.handlers = getHandlersFromConfig(registry, context); + this.filters = registry.observationConfig().getObservationFilters(); + if (convention.supportsContext(context)) { + this.convention = convention; + } + else { + throw new IllegalStateException( + "Convention [" + convention + "] doesn't support context [" + context + "]"); + } + setParentFromCurrentObservation(); + } + + private void setParentFromCurrentObservation() { + Observation currentObservation = this.registry.getCurrentObservation(); + if (currentObservation != null) { + this.context.setParentObservation(currentObservation); + } + } + + @Nullable + private static ObservationConvention getConventionFromConfig(ObservationRegistry registry, Context context) { + for (ObservationConvention convention : registry.observationConfig().getObservationConventions()) { + if (convention.supportsContext(context)) { + return convention; + } + } + return null; + } + + private static Deque getHandlersFromConfig(ObservationRegistry registry, Context context) { + Collection> handlers = registry.observationConfig().getObservationHandlers(); + Deque deque = new ArrayDeque<>(handlers.size()); + for (ObservationHandler handler : handlers) { + if (handler.supportsContext(context)) { + deque.add(handler); + } + } + return deque; + } + + @Override + public Observation contextualName(@Nullable String contextualName) { + this.context.setContextualName(contextualName); + return this; + } + + @Override + public Observation parentObservation(@Nullable Observation parentObservation) { + this.context.setParentObservation(parentObservation); + return this; + } + + @Override + public Observation lowCardinalityKeyValue(KeyValue keyValue) { + this.context.addLowCardinalityKeyValue(keyValue); + return this; + } + + @Override + public Observation highCardinalityKeyValue(KeyValue keyValue) { + this.context.addHighCardinalityKeyValue(keyValue); + return this; + } + + @Override + public Observation observationConvention(ObservationConvention convention) { + if (convention.supportsContext(context)) { + this.convention = convention; + } + return this; + } + + @Override + public Observation error(Throwable error) { + this.context.setError(error); + notifyOnError(); + return this; + } + + @Override + public Observation event(Event event) { + notifyOnEvent(event); + return this; + } + + @Override + public Observation start() { + if (this.convention != null) { + this.context.addLowCardinalityKeyValues(convention.getLowCardinalityKeyValues(context)); + this.context.addHighCardinalityKeyValues(convention.getHighCardinalityKeyValues(context)); + + String newName = convention.getName(); + if (StringUtils.isNotBlank(newName)) { + this.context.setName(newName); + } + } + + notifyOnObservationStarted(); + return this; + } + + @Override + public Context getContext() { + return this.context; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void stop() { + if (this.convention != null) { + this.context.addLowCardinalityKeyValues(convention.getLowCardinalityKeyValues(context)); + this.context.addHighCardinalityKeyValues(convention.getHighCardinalityKeyValues(context)); + + String newContextualName = convention.getContextualName(context); + if (StringUtils.isNotBlank(newContextualName)) { + this.context.setContextualName(newContextualName); + } + } + + Observation.Context modifiedContext = this.context; + for (ObservationFilter filter : this.filters) { + modifiedContext = filter.map(modifiedContext); + } + + notifyOnObservationStopped(modifiedContext); + } + + @Override + public Scope openScope() { + Deque scopes = enclosingScopes.get(Thread.currentThread()); + if (scopes == null) { + scopes = new ArrayDeque<>(); + enclosingScopes.put(Thread.currentThread(), scopes); + } + Scope currentScope = registry.getCurrentObservationScope(); + if (currentScope != null) { + scopes.addFirst(currentScope); + } + Scope scope = new SimpleScope(this.registry, this); + notifyOnScopeOpened(); + return scope; + } + + @Nullable + @Override + public Scope getEnclosingScope() { + Deque scopes = enclosingScopes.get(Thread.currentThread()); + if (scopes != null && !scopes.isEmpty()) { + return scopes.pop(); + } + return null; + } + + @Override + public String toString() { + return "{" + "name=" + this.context.getName() + "(" + this.context.getContextualName() + ")" + ", error=" + + this.context.getError() + ", context=" + this.context + '}'; + } + + @SuppressWarnings("unchecked") + private void notifyOnObservationStarted() { + for (ObservationHandler handler : this.handlers) { + handler.onStart(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnError() { + for (ObservationHandler handler : this.handlers) { + handler.onError(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnEvent(Event event) { + for (ObservationHandler handler : this.handlers) { + handler.onEvent(event, this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnScopeOpened() { + for (ObservationHandler handler : this.handlers) { + handler.onScopeOpened(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnScopeClosed() { + // We're closing from end till the beginning - e.g. we opened scope with handlers + // with ids 1,2,3 and we need to close the scope in order 3,2,1 + Iterator iterator = this.handlers.descendingIterator(); + while (iterator.hasNext()) { + ObservationHandler handler = iterator.next(); + handler.onScopeClosed(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnScopeMakeCurrent() { + for (ObservationHandler handler : this.handlers) { + handler.onScopeOpened(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnScopeReset() { + for (ObservationHandler handler : this.handlers) { + handler.onScopeReset(this.context); + } + } + + @SuppressWarnings("unchecked") + private void notifyOnObservationStopped(Observation.Context context) { + // We're closing from end till the beginning - e.g. we started with handlers with + // ids 1,2,3 and we need to call close on 3,2,1 + this.handlers.descendingIterator().forEachRemaining(handler -> handler.onStop(context)); + } + + static class SimpleScope implements Scope { + + private final ObservationRegistry registry; + + private final SimpleObservation currentObservation; + + @Nullable + private final Observation.Scope previousObservationScope; + + SimpleScope(ObservationRegistry registry, SimpleObservation current) { + this.registry = registry; + this.currentObservation = current; + this.previousObservationScope = registry.getCurrentObservationScope(); + this.registry.setCurrentObservationScope(this); + } + + @Override + public Observation getCurrentObservation() { + return this.currentObservation; + } + + @Override + public void close() { + Deque enclosingScopes = this.currentObservation.enclosingScopes.get(Thread.currentThread()); + // If we're closing a scope then we have to remove an enclosing scope from the + // deque + if (enclosingScopes != null && !enclosingScopes.isEmpty()) { + enclosingScopes.removeFirst(); + } + this.registry.setCurrentObservationScope(previousObservationScope); + this.currentObservation.notifyOnScopeClosed(); + } + + @Override + public void reset() { + SimpleScope scope = this; + Deque scopes = scope.currentObservation.enclosingScopes.get(Thread.currentThread()); + scopes.addFirst(this); + NullScope nullScope = new NullScope(registry, scope.currentObservation); + do { + // We don't want to remove any enclosing scopes when resetting + // we just want to remove any scopes if they are present (that's why we're + // not calling scope#close) + scope.currentObservation.notifyOnScopeReset(); + scope = (SimpleScope) scope.previousObservationScope; + } + while (scope != null); + this.registry.setCurrentObservationScope(nullScope); + } + + /** + * This method is called e.g. via + * {@link ObservationThreadLocalAccessor#restore(Observation)}. In that case, + * we're calling {@link ObservationThreadLocalAccessor#reset()} first, and we're + * closing all the scopes, HOWEVER those are called on the Observation scope that + * was present there in thread local at the time of calling the method, NOT on the + * scope that we want to make current (that one can contain some leftovers from + * previous scope openings like creation of e.g. Brave scope in the TracingContext + * that is there inside the Observation's Context. + * + * When we want to go back to the enclosing scope and want to make that scope + * current we need to be sure that there are no remaining scoped objects inside + * Observation's context. This is why BEFORE rebuilding the scope structure we + * need to notify the handlers to clear them first (again this is a separate scope + * to the one that was cleared by the reset method) via calling + * {@link ObservationHandler#onScopeReset(Context)}. + */ + @Override + public void makeCurrent() { + SimpleScope scope = this; + do { + // We don't want to remove any enclosing scopes when resetting + // we just want to remove any scopes if they are present (that's why we're + // not calling scope#close) + scope.currentObservation.notifyOnScopeReset(); + scope = (SimpleScope) scope.previousObservationScope; + } + while (scope != null); + + this.currentObservation.notifyOnScopeReset(); // wywalic + Deque scopes = new ArrayDeque<>(); + scope = this; + do { + scopes.addFirst(scope); + scope = (SimpleScope) scope.previousObservationScope; + } + while (scope != null); + for (SimpleScope simpleScope : scopes) { + simpleScope.currentObservation.notifyOnScopeMakeCurrent(); + } + this.registry.setCurrentObservationScope(this); + } + + static class NullScope extends SimpleScope { + + NullScope(ObservationRegistry registry, SimpleObservation current) { + super(registry, current); + } + + @Override + public void reset() { + + } + + @Override + public void makeCurrent() { + + } + + @Override + public void close() { + + } + } + } + +} diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/contextpropagation/ObservationThreadLocalAccessor.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/contextpropagation/ObservationThreadLocalAccessor.java new file mode 100644 index 00000000..1e0dcf92 --- /dev/null +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/observation/contextpropagation/ObservationThreadLocalAccessor.java @@ -0,0 +1,73 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.observation.contextpropagation; + +import io.micrometer.context.ThreadLocalAccessor; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + +/** + * A {@link ThreadLocalAccessor} to put and restore current {@link Observation}. + * + * @author Marcin Grzejszczak + * @since 1.10.0 + */ +public class ObservationThreadLocalAccessor implements ThreadLocalAccessor { + + /** + * Key under which Micrometer Observation is being registered. + */ + public static final String KEY = "micrometer.observation"; + + private static final ObservationRegistry observationRegistry = ObservationRegistry.create(); + + @Override + public Object key() { + return KEY; + } + + @Override + public Observation getValue() { + return observationRegistry.getCurrentObservation(); + } + + @Override + public void setValue(Observation value) { + // Iterate over all handlers and open a new scope. The created scope will put + // itself to TL. + value.openScope(); + } + + @Override + public void reset() { + Observation.Scope scope = observationRegistry.getCurrentObservationScope(); + if (scope != null) { + scope.reset(); + } + } + + + // value - obs2 + @Override + public void restore(Observation value) { + Observation.Scope enclosingScope = value.getEnclosingScope(); + if (enclosingScope != null) { + enclosingScope.makeCurrent(); + } +// setValue(value); + } + +} diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/NestedScopesTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/NestedScopesTests.java index 5089a94e..66a7ed4d 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/NestedScopesTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/NestedScopesTests.java @@ -118,6 +118,13 @@ void nestedScopesShouldBeMaintainedWithContextPropagationApi() { thenCurrentObservationIsNull(); + // OBS1 SCOPE1 -> enclosing (); + // OBS1 SCOPE2 -> enclosing (obs1 scope1) + // OBS2 SCOPE1 -> enclosing (obs1 scope2) + // OBS2 SCOPE2 -> enclosing (obs1 scope2, obs2 scope1) + // OBS1 SCOPE3 -> enclosing (obs1 scope2, obs2 scope2) + // OBS1 SCOPE4 -> enclosing (obs1 scope2, obs2 scope2, obs1 scope3) + try (ContextSnapshot.Scope obs1Scope1 = snapshot1.setThreadLocals()) { Span spanObs1Scope1 = tracer.currentSpan(); try (ContextSnapshot.Scope obs1Scope2 = snapshot1.setThreadLocals()) { diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java index afac4f9a..4e57d614 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java @@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; -import reactor.util.context.Context; import static org.assertj.core.api.BDDAssertions.then; @@ -84,19 +83,29 @@ void should_open_and_close_scopes_with_reactor() { Span span2 = tracer.currentSpan(); logger.info("SPAN 2 [" + tracer.currentSpan() + "]"); - Mono.just(1).flatMap(integer -> { - return Mono.just(2).doOnNext(integer1 -> { - Span spanWEmpty = tracer.currentSpan(); - logger.info("\n\n[2] SPAN IN EMPTY [" + spanWEmpty + "]"); - logger.info("[2] SIZE [" + CorrelationFlushScopeArrayReader.size() + "]"); - then(spanWEmpty).isNull(); - }).contextWrite(context -> Context.empty()); - }).doOnNext(integer -> { + // enclosing scope 1 + + // 3 obs scope + Mono.just(1).doOnNext(integer -> { Span spanWOnNext = tracer.currentSpan(); logger.info("\n\n[1] SPAN IN ON NEXT [" + spanWOnNext + "]"); logger.info("[1] SIZE [" + CorrelationFlushScopeArrayReader.size() + "]"); then(spanWOnNext).isEqualTo(span2); - }).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, obs2)).block(); + }) + .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, obs2)) + .block(); + + // enclosing scope 1 + + // OBSERVATION + // obs2 scope + // obs2 scope + // obs1 scope + + // BRAVE + // span scope 2 (NOOP) + // span scope 2 + // span scope 1 logger.info("\n\nSPAN OUTSIDE REACTOR [" + tracer.currentSpan() + "]"); logger.info("SIZE AFTER [" + CorrelationFlushScopeArrayReader.size() + "]"); @@ -106,7 +115,7 @@ void should_open_and_close_scopes_with_reactor() { obs2.stop(); logger.info("SIZE OUTSIDE CLOSE 2 [" + CorrelationFlushScopeArrayReader.size() + "]"); logger.info("SPAN AFTER CLOSE 2 [" + tracer.currentSpan() + "]"); - then(tracer.currentSpan()).isEqualTo(span1); + then(tracer.currentSpan()).as("Scopes should be restored to previous so current span should be Span 1 which is <%s>. Span 2 is <%s>", span1, span2).isEqualTo(span1); scope.close(); obs1.stop();