Skip to content

Commit

Permalink
Opentelemetry Redis Instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
luneo7 committed Mar 25, 2024
1 parent e7f2a1e commit 227eee4
Show file tree
Hide file tree
Showing 14 changed files with 736 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,10 @@ void setupVertx(InstrumentationRecorder recorder, BeanContainerBuildItem beanCon
|| capabilities.isPresent(Capability.REACTIVE_MYSQL_CLIENT)
|| capabilities.isPresent(Capability.REACTIVE_ORACLE_CLIENT)
|| capabilities.isPresent(Capability.REACTIVE_PG_CLIENT);
boolean redisClientAvailable = capabilities.isPresent(Capability.REDIS_CLIENT);
recorder.setupVertxTracer(beanContainerBuildItem.getValue(),
sqlClientAvailable,
redisClientAvailable,
ConfigProvider.getConfig()
.getConfigValue(QUARKUS_OTEL_SEMCONV_STABILITY_OPT_IN)
.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ public interface InstrumentRuntimeConfig {
@WithDefault("true")
boolean vertxSqlClient();

/**
* Enables instrumentation for Vert.x Redis Client.
*/
@WithDefault("true")
boolean vertxRedisClient();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.OpenTelemetryVertxMetricsFactory;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.OpenTelemetryVertxTracer;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.OpenTelemetryVertxTracingFactory;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.RedisClientInstrumenterVertxTracer;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.SqlClientInstrumenterVertxTracer;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
Expand Down Expand Up @@ -42,9 +43,9 @@ public Consumer<VertxOptions> getVertxTracingOptions() {

/* RUNTIME INIT */
public void setupVertxTracer(BeanContainer beanContainer, boolean sqlClientAvailable,
final String semconvStability) {
boolean redisClientAvailable, final String semconvStability) {
OpenTelemetry openTelemetry = beanContainer.beanInstance(OpenTelemetry.class);
List<InstrumenterVertxTracer<?, ?>> tracers = new ArrayList<>(3);
List<InstrumenterVertxTracer<?, ?>> tracers = new ArrayList<>(4);
if (config.getValue().instrument().vertxHttp()) {
tracers.add(new HttpInstrumenterVertxTracer(openTelemetry, getSemconvStabilityOptin(semconvStability)));
}
Expand All @@ -54,6 +55,9 @@ public void setupVertxTracer(BeanContainer beanContainer, boolean sqlClientAvail
if (sqlClientAvailable && config.getValue().instrument().vertxSqlClient()) {
tracers.add(new SqlClientInstrumenterVertxTracer(openTelemetry));
}
if (redisClientAvailable && config.getValue().instrument().vertxRedisClient()) {
tracers.add(new RedisClientInstrumenterVertxTracer(openTelemetry));
}
OpenTelemetryVertxTracer openTelemetryVertxTracer = new OpenTelemetryVertxTracer(tracers);
FACTORY.getVertxTracerDelegator().setDelegate(openTelemetryVertxTracer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx;

import static io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig.INSTRUMENTATION_NAME;

import java.util.Map;
import java.util.function.BiConsumer;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil;
import io.opentelemetry.semconv.SemanticAttributes;
import io.vertx.core.Context;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.tracing.TracingPolicy;

public class RedisClientInstrumenterVertxTracer implements
InstrumenterVertxTracer<RedisClientInstrumenterVertxTracer.CommandTrace, Object> {
private final Instrumenter<CommandTrace, Object> redisClientInstrumenter;

public RedisClientInstrumenterVertxTracer(final OpenTelemetry openTelemetry) {
InstrumenterBuilder<CommandTrace, Object> clientInstrumenterBuilder = Instrumenter.builder(
openTelemetry,
INSTRUMENTATION_NAME,
DbClientSpanNameExtractor.create(RedisClientAttributesGetter.INSTANCE));

this.redisClientInstrumenter = clientInstrumenterBuilder
.addAttributesExtractor(DbClientAttributesExtractor.create(RedisClientAttributesGetter.INSTANCE))
.addAttributesExtractor(RedisClientAttributesExtractor.INSTANCE)
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}

@Override
public <R> boolean canHandle(R request, TagExtractor<R> tagExtractor) {
if (request instanceof CommandTrace) {
return true;
}

return "redis".equals(tagExtractor.extract(request).get("db.type"));
}

@Override
@SuppressWarnings("unchecked")
public <R> OpenTelemetryVertxTracer.SpanOperation sendRequest(
final Context context,
final SpanKind kind,
final TracingPolicy policy,
final R request,
final String operation,
final BiConsumer<String, String> headers,
final TagExtractor<R> tagExtractor) {
R commandTrace = (R) CommandTrace.commandTrace(tagExtractor.extract(request));
return InstrumenterVertxTracer.super.sendRequest(context, kind, policy, commandTrace, operation, headers, tagExtractor);
}

@Override
public <R> void receiveResponse(
final Context context,
final R response,
final OpenTelemetryVertxTracer.SpanOperation spanOperation,
final Throwable failure,
final TagExtractor<R> tagExtractor) {

InstrumenterVertxTracer.super.receiveResponse(context, response, spanOperation, failure, tagExtractor);
}

@Override
public Instrumenter<CommandTrace, Object> getReceiveRequestInstrumenter() {
return null;
}

@Override
public Instrumenter<CommandTrace, Object> getSendResponseInstrumenter() {
return null;
}

@Override
public Instrumenter<CommandTrace, Object> getSendRequestInstrumenter() {
return redisClientInstrumenter;
}

@Override
public Instrumenter<CommandTrace, Object> getReceiveResponseInstrumenter() {
return redisClientInstrumenter;
}

// From io.vertx.redis.client.impl.CommandReporter
static class CommandTrace {
private final Map<String, String> attributes;

CommandTrace(final Map<String, String> attributes) {
this.attributes = attributes;
}

static CommandTrace commandTrace(final Map<String, String> attributes) {
return new CommandTrace(attributes);
}

public String operation() {
return attributes.get("db.statement");
}

public String user() {
return attributes.get("db.user");
}

public String peerAddress() {
return attributes.get("peer.address");
}

public long dbIndex() {
return Long.parseLong(attributes.get("db.instance"));
}
}

enum RedisClientAttributesGetter implements DbClientAttributesGetter<CommandTrace> {
INSTANCE;

@Override
public String getStatement(final CommandTrace commandTrace) {
return null;
}

@Override
public String getOperation(final CommandTrace commandTrace) {
return commandTrace.operation();
}

@Override
public String getSystem(final CommandTrace commandTrace) {
return SemanticAttributes.DbSystemValues.REDIS;
}

@Override
public String getUser(final CommandTrace commandTrace) {
return commandTrace.user();
}

@Override
public String getName(final CommandTrace commandTrace) {
return null;
}

@Override
public String getConnectionString(final CommandTrace commandTrace) {
return commandTrace.peerAddress();
}
}

enum RedisClientAttributesExtractor implements AttributesExtractor<CommandTrace, Object> {
INSTANCE;

@Override
public void onStart(AttributesBuilder attributes, io.opentelemetry.context.Context parentContext,
CommandTrace request) {
AttributesExtractorUtil.internalSet(attributes, SemanticAttributes.DB_REDIS_DATABASE_INDEX, request.dbIndex());
}

@Override
public void onEnd(AttributesBuilder attributes,
io.opentelemetry.context.Context context,
CommandTrace request,
Object response,
Throwable error) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public <R> boolean canHandle(final R request, final TagExtractor<R> tagExtractor
return true;
}

return tagExtractor.extract(request).containsKey("db.statement");
return "sql".equals(tagExtractor.extract(request).get("db.type"));
}

@Override
Expand Down Expand Up @@ -87,7 +87,7 @@ public Instrumenter<QueryTrace, QueryTrace> getReceiveResponseInstrumenter() {
return sqlClientInstrumenter;
}

// From io.vertx.sqlclient.impl.tracing.QueryTracer
// From io.vertx.sqlclient.impl.tracing.QueryReporter
static class QueryTrace {
private final Map<String, String> attributes;

Expand Down

0 comments on commit 227eee4

Please sign in to comment.