diff --git a/pom.xml b/pom.xml
index b23a16229f..5d04270676 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 5.0.0-SNAPSHOT
+ 5.0.0-GH-5082-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index fc88571622..930d1412e5 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -15,7 +15,7 @@
org.springframework.data
spring-data-mongodb-parent
- 5.0.0-SNAPSHOT
+ 5.0.0-GH-5082-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index 37730f7d40..4b08f38814 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -13,7 +13,7 @@
org.springframework.data
spring-data-mongodb-parent
- 5.0.0-SNAPSHOT
+ 5.0.0-GH-5082-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/observability/MongoObservationCommandListener.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/observability/MongoObservationCommandListener.java
index fcd4778042..314da09eaa 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/observability/MongoObservationCommandListener.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/observability/MongoObservationCommandListener.java
@@ -17,6 +17,7 @@
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
+import io.micrometer.observation.ObservationView;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import java.util.function.BiConsumer;
@@ -191,7 +192,13 @@ private void stopObservation(@Nullable RequestContext requestContext,
log.debug(
"Restoring parent observation [" + observation + "] for Mongo instrumentation and put it in Mongo context");
}
- requestContext.put(ObservationThreadLocalAccessor.KEY, observation.getContext().getParentObservation());
+ ObservationView parentObservation = observation.getContext().getParentObservation();
+
+ if (parentObservation == null) {
+ requestContext.delete(ObservationThreadLocalAccessor.KEY);
+ } else {
+ requestContext.put(ObservationThreadLocalAccessor.KEY, parentObservation);
+ }
}
}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/observability/MongoObservationCommandListenerTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/observability/MongoObservationCommandListenerTests.java
index dadb98ce2b..48504b4e6b 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/observability/MongoObservationCommandListenerTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/observability/MongoObservationCommandListenerTests.java
@@ -26,10 +26,12 @@
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
+import reactor.core.publisher.BaseSubscriber;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.BsonString;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,6 +45,7 @@
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
+import com.mongodb.reactivestreams.client.ReactiveContextProvider;
/**
* Series of test cases exercising {@link MongoObservationCommandListener}.
@@ -70,57 +73,74 @@ void setup() {
this.listener = new MongoObservationCommandListener(observationRegistry);
}
+ @AfterEach
+ void tearDown() {
+ Observation currentObservation = observationRegistry.getCurrentObservation();
+ if (currentObservation != null) {
+ currentObservation.stop();
+ observationRegistry.setCurrentObservationScope(null);
+ }
+ }
+
@Test
void commandStartedShouldNotInstrumentWhenAdminDatabase() {
- // when
listener.commandStarted(new CommandStartedEvent(null, 0, 0, null, "admin", "", null));
- // then
assertThat(meterRegistry).hasNoMetrics();
}
@Test
void commandStartedShouldNotInstrumentWhenNoRequestContext() {
- // when
listener.commandStarted(new CommandStartedEvent(null, 0, 0, null, "some name", "", null));
- // then
assertThat(meterRegistry).hasNoMetrics();
}
@Test
void commandStartedShouldNotInstrumentWhenNoParentSampleInRequestContext() {
- // when
listener.commandStarted(new CommandStartedEvent(new MapRequestContext(), 0, 0, null, "some name", "", null));
- // then
assertThat(meterRegistry).hasMeterWithName("spring.data.mongodb.command.active");
}
@Test // GH-4994
void commandStartedShouldAlwaysIncludeCollection() {
- // when
listener.commandStarted(new CommandStartedEvent(new MapRequestContext(), 0, 0, null, "some name", "hello", null));
- // then
// although command 'hello' is collection-less, metric must have tag "db.mongodb.collection"
assertThat(meterRegistry).hasMeterWithNameAndTags(
"spring.data.mongodb.command.active",
Tags.of("db.mongodb.collection", "none"));
}
+ @Test // GH-5082
+ void reactiveContextCompletesNormally() {
+
+ ReactiveContextProvider rcp = (ReactiveContextProvider) ContextProviderFactory.create(observationRegistry);
+ RequestContext context = rcp.getContext(new BaseSubscriber<>() {});
+
+ listener.commandStarted(new CommandStartedEvent(context, 0, 0, //
+ new ConnectionDescription( //
+ new ServerId( //
+ new ClusterId("description"), //
+ new ServerAddress("localhost", 1234))),
+ "database", "insert", //
+ new BsonDocument("collection", new BsonString("user"))));
+ listener.commandSucceeded(new CommandSucceededEvent(context, 0, 0, null, "insert", null, null, 0));
+
+ assertThatTimerRegisteredWithTags();
+ }
+
@Test
void successfullyCompletedCommandShouldCreateTimerWhenParentSampleInRequestContext() {
- // given
Observation parent = Observation.start("name", observationRegistry);
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, //
new ConnectionDescription( //
new ServerId( //
@@ -129,18 +149,15 @@ void successfullyCompletedCommandShouldCreateTimerWhenParentSampleInRequestConte
new BsonDocument("collection", new BsonString("user"))));
listener.commandSucceeded(new CommandSucceededEvent(traceRequestContext, 0, 0, null, "insert", null, null, 0));
- // then
assertThatTimerRegisteredWithTags();
}
@Test
void successfullyCompletedCommandWithCollectionHavingCommandNameShouldCreateTimerWhenParentSampleInRequestContext() {
- // given
Observation parent = Observation.start("name", observationRegistry);
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, //
new ConnectionDescription( //
new ServerId( //
@@ -150,18 +167,15 @@ void successfullyCompletedCommandWithCollectionHavingCommandNameShouldCreateTime
new BsonDocument("aggregate", new BsonString("user"))));
listener.commandSucceeded(new CommandSucceededEvent(traceRequestContext, 0, 0, null, "aggregate", null, null, 0));
- // then
assertThatTimerRegisteredWithTags();
}
@Test
void successfullyCompletedCommandWithoutClusterInformationShouldCreateTimerWhenParentSampleInRequestContext() {
- // given
Observation parent = Observation.start("name", observationRegistry);
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, null, "database", "insert",
new BsonDocument("collection", new BsonString("user"))));
listener.commandSucceeded(new CommandSucceededEvent(traceRequestContext, 0, 0, null, "insert", null, null, 0));
@@ -176,11 +190,9 @@ void successfullyCompletedCommandWithoutClusterInformationShouldCreateTimerWhenP
@Test
void commandWithErrorShouldCreateTimerWhenParentSampleInRequestContext() {
- // given
Observation parent = Observation.start("name", observationRegistry);
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, //
new ConnectionDescription( //
new ServerId( //
@@ -191,20 +203,17 @@ void commandWithErrorShouldCreateTimerWhenParentSampleInRequestContext() {
listener.commandFailed( //
new CommandFailedEvent(traceRequestContext, 0, 0, null, "db", "insert", 0, new IllegalAccessException()));
- // then
assertThatTimerRegisteredWithTags();
}
@Test // GH-4481
void completionShouldIgnoreIncompatibleObservationContext() {
- // given
RequestContext traceRequestContext = getContext();
Observation observation = mock(Observation.class);
traceRequestContext.put(ObservationThreadLocalAccessor.KEY, observation);
- // when
listener.commandSucceeded(new CommandSucceededEvent(traceRequestContext, 0, 0, null, "insert", null, null, 0));
verify(observation).getContext();
@@ -214,13 +223,11 @@ void completionShouldIgnoreIncompatibleObservationContext() {
@Test // GH-4481
void failureShouldIgnoreIncompatibleObservationContext() {
- // given
RequestContext traceRequestContext = getContext();
Observation observation = mock(Observation.class);
traceRequestContext.put(ObservationThreadLocalAccessor.KEY, observation);
- // when
listener.commandFailed(new CommandFailedEvent(traceRequestContext, 0, 0, null, "db", "insert", 0, null));
verify(observation).getContext();
@@ -230,7 +237,6 @@ void failureShouldIgnoreIncompatibleObservationContext() {
@Test // GH-4321
void shouldUseObservationConvention() {
- // given
MongoHandlerObservationConvention customObservationConvention = new MongoHandlerObservationConvention() {
@Override
public boolean supportsContext(Observation.Context context) {
@@ -245,22 +251,18 @@ public String getName() {
this.listener = new MongoObservationCommandListener(observationRegistry, mock(ConnectionString.class),
customObservationConvention);
- // when
listener.commandStarted(new CommandStartedEvent(new MapRequestContext(), 0, 0, null, "some name", "", null));
- // then
assertThat(meterRegistry).hasMeterWithName("custom.name.active");
}
@Test // GH-5064
void completionRestoresParentObservation() {
- // given
Observation parent = Observation.start("name", observationRegistry);
observationRegistry.setCurrentObservationScope(parent.openScope());
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, null, "database", "insert",
new BsonDocument("collection", new BsonString("user"))));
@@ -275,12 +277,10 @@ void completionRestoresParentObservation() {
@Test // GH-5064
void failureRestoresParentObservation() {
- // given
Observation parent = Observation.start("name", observationRegistry);
observationRegistry.setCurrentObservationScope(parent.openScope());
RequestContext traceRequestContext = getContext();
- // when
listener.commandStarted(new CommandStartedEvent(traceRequestContext, 0, 0, null, "database", "insert",
new BsonDocument("collection", new BsonString("user"))));