Skip to content

Add tracing support using Micrometer #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions driver-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ dependencies {

optionalImplementation(libs.snappy.java)
optionalImplementation(libs.zstd.jni)
optionalImplementation(libs.micrometer)

testImplementation(project(path = ":bson", configuration = "testArtifacts"))
testImplementation(libs.reflections)
31 changes: 31 additions & 0 deletions driver-core/src/main/com/mongodb/MongoClientSettings.java
Original file line number Diff line number Diff line change
@@ -30,9 +30,11 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.InetAddressResolver;
import com.mongodb.tracing.Tracer;
import org.bson.UuidRepresentation;
import org.bson.codecs.BsonCodecProvider;
import org.bson.codecs.BsonValueCodecProvider;
@@ -118,6 +120,7 @@ public final class MongoClientSettings {
private final InetAddressResolver inetAddressResolver;
@Nullable
private final Long timeoutMS;
private final TracingManager tracingManager;

/**
* Gets the default codec registry. It includes the following providers:
@@ -238,6 +241,7 @@ public static final class Builder {
private ContextProvider contextProvider;
private DnsClient dnsClient;
private InetAddressResolver inetAddressResolver;
private TracingManager tracingManager;

private Builder() {
}
@@ -275,6 +279,7 @@ private Builder(final MongoClientSettings settings) {
if (settings.heartbeatSocketTimeoutSetExplicitly) {
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
}
tracingManager = settings.tracingManager;
}

/**
@@ -723,6 +728,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
return this;
}

/**
* Sets the tracer to use for creating Spans for operations and commands.
*
* @param tracer the tracer
* @see com.mongodb.tracing.MicrometerTracer
* @return this
* @since 5.5
*/
@Alpha(Reason.CLIENT)
public Builder tracer(final Tracer tracer) {
this.tracingManager = new TracingManager(tracer);
return this;
}

/**
* Build an instance of {@code MongoClientSettings}.
*
@@ -1040,6 +1059,17 @@ public ContextProvider getContextProvider() {
return contextProvider;
}

/**
* Get the tracer to create Spans for operations and commands.
*
* @return this
* @since 5.5
*/
@Alpha(Reason.CLIENT)
public TracingManager getTracingManager() {
return tracingManager;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -1156,5 +1186,6 @@ private MongoClientSettings(final Builder builder) {
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
contextProvider = builder.contextProvider;
timeoutMS = builder.timeoutMS;
tracingManager = builder.tracingManager;
}
}
Original file line number Diff line number Diff line change
@@ -186,6 +186,10 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
}
}

BsonDocument getCommand() {
return command;
}

/**
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.
Original file line number Diff line number Diff line change
@@ -51,6 +51,9 @@
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.internal.tracing.Span;
import com.mongodb.internal.tracing.TraceContext;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
@@ -75,8 +78,8 @@
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
@@ -374,13 +377,24 @@ public boolean isClosed() {
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final OperationContext operationContext) {
Supplier<T> sendAndReceiveInternal = () -> sendAndReceiveInternal(
message, decoder, operationContext);

Span tracingSpan = createTracingSpan(message, operationContext);

try {
return sendAndReceiveInternal.get();
} catch (MongoCommandException e) {
if (tracingSpan != null) {
tracingSpan.error(e);
}

if (reauthenticationIsTriggered(e)) {
return reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
}
throw e;
} finally {
if (tracingSpan != null) {
tracingSpan.end();
}
}
}

@@ -391,6 +405,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<

AsyncSupplier<T> sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal(
message, decoder, operationContext, c);

beginAsync().<T>thenSupply(c -> {
sendAndReceiveAsyncInternal.getAsync(c);
}).onErrorIf(e -> reauthenticationIsTriggered(e), (t, c) -> {
@@ -872,6 +887,42 @@ public ByteBuf getBuffer(final int size) {
return stream.getBuffer(size);
}

@Nullable
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext) {
TracingManager tracingManager = operationContext.getTracingManager();
Span span;
if (tracingManager.isEnabled()) {
BsonDocument command = message.getCommand();
TraceContext parentContext = null;
long cursorId = -1;
if (command.containsKey("getMore")) {
cursorId = command.getInt64("getMore").longValue();
parentContext = tracingManager.getCursorParentContext(cursorId);
} else {
parentContext = tracingManager.getParentContext(operationContext.getId());
}

span = tracingManager.addSpan("Command " + command.getFirstKey(), parentContext);
span.tag("db.system", "mongodb");
span.tag("db.namespace", message.getNamespace().getFullName());
span.tag("db.query.summary", command.getFirstKey());
span.tag("db.query.opcode", String.valueOf(message.getOpCode()));
span.tag("db.query.text", command.toString());
if (cursorId != -1) {
span.tag("db.mongodb.cursor_id", String.valueOf(cursorId));
}
span.tag("server.address", serverId.getAddress().getHost());
span.tag("server.port", String.valueOf(serverId.getAddress().getPort()));
span.tag("server.type", message.getSettings().getServerType().name());

span.tag("db.mongodb.server_connection_id", this.description.getConnectionId().toString());
} else {
span = null;
}

return span;
}

private class MessageHeaderCallback implements SingleResultCallback<ByteBuf> {
private final OperationContext operationContext;
private final SingleResultCallback<ResponseBuffers> callback;
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import com.mongodb.selector.ServerSelector;

@@ -49,10 +50,11 @@ public class OperationContext {
private final TimeoutContext timeoutContext;
@Nullable
private final ServerApi serverApi;
private final TracingManager tracingManager;

public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
@Nullable final ServerApi serverApi) {
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi);
@Nullable final ServerApi serverApi, final TracingManager tracingManager) {
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, tracingManager);
}

public static OperationContext simpleOperationContext(
@@ -61,29 +63,35 @@ public static OperationContext simpleOperationContext(
IgnorableRequestContext.INSTANCE,
NoOpSessionContext.INSTANCE,
new TimeoutContext(timeoutSettings),
serverApi);
serverApi,
TracingManager.NO_OP);
}

public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) {
return new OperationContext(
IgnorableRequestContext.INSTANCE,
NoOpSessionContext.INSTANCE,
timeoutContext,
null);
null,
TracingManager.NO_OP);
}

public OperationContext withSessionContext(final SessionContext sessionContext) {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
}

public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
}

public long getId() {
return id;
}

public TracingManager getTracingManager() {
return tracingManager;
}

public SessionContext getSessionContext() {
return sessionContext;
}
@@ -107,27 +115,31 @@ public OperationContext(final long id,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
final ServerDeprioritization serverDeprioritization,
@Nullable final ServerApi serverApi) {
@Nullable final ServerApi serverApi,
final TracingManager tracingManager) {
this.id = id;
this.serverDeprioritization = serverDeprioritization;
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.serverApi = serverApi;
this.tracingManager = tracingManager;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
@Nullable final ServerApi serverApi) {
@Nullable final ServerApi serverApi,
final TracingManager tracingManager) {
this.id = id;
this.serverDeprioritization = new ServerDeprioritization();
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.serverApi = serverApi;
this.tracingManager = tracingManager;
}


Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
@Nullable
private List<T> nextBatch;
private boolean resetTimeoutWhenClosing;
private final long cursorId;

CommandBatchCursor(
final TimeoutMode timeoutMode,
@@ -95,10 +96,13 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;

ServerCursor serverCursor = commandCursorResult.getServerCursor();
this.cursorId = serverCursor != null ? serverCursor.getId() : -1;

operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);

Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, serverCursor);
resetTimeoutWhenClosing = true;
}

@@ -169,6 +173,7 @@ public void remove() {

@Override
public void close() {
operationContext.getTracingManager().removeCursorParentContext(cursorId);
resourceManager.close();
}

Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.tracing.Span;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
@@ -290,21 +292,35 @@ public BatchCursor<T> execute(final ReadBinding binding) {
if (invalidTimeoutModeException != null) {
throw invalidTimeoutModeException;
}
OperationContext operationContext = binding.getOperationContext();

RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
// Adds a Tracing Span for 'find' operation
TracingManager tracingManager = operationContext.getTracingManager();
Span tracingSpan = tracingManager.addSpan("find", operationContext.getId());

RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, operationContext, () ->
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getOperationContext()));
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), operationContext));
try {
return createReadCommandAndExecute(retryState, binding.getOperationContext(), source, namespace.getDatabaseName(),
return createReadCommandAndExecute(retryState, operationContext, source, namespace.getDatabaseName(),
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
transformer(), connection);
} catch (MongoCommandException e) {
throw new MongoQueryException(e.getResponse(), e.getServerAddress());
}
})
);
return read.get();
try {
return read.get();
} catch (MongoQueryException e) {
tracingSpan.error(e);
throw e;
} finally {
tracingSpan.end();
// Clean up the tracing span after the operation is complete
tracingManager.cleanContexts(operationContext.getId());
}
}

@Override
@@ -469,9 +485,17 @@ private TimeoutMode getTimeoutMode() {
}

private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(source.getOperationContext()), decoder,
comment, source, connection);
return (result, source, connection) -> {
OperationContext operationContext = source.getOperationContext();

// register cursor id with the operation context, so 'getMore' commands can be folded under the 'find' operation
long cursorId = result.getDocument("cursor").getInt64("id").longValue();
TracingManager tracingManager = operationContext.getTracingManager();
tracingManager.addCursorParentContext(cursorId, operationContext.getId());

return new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(operationContext), decoder,
comment, source, connection);
};
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
Loading
Oops, something went wrong.