Skip to content

Commit

Permalink
Use Context more in DatabaseClientTracer (#1836)
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Dec 7, 2020
1 parent 1ca562c commit 4cbfb36
Show file tree
Hide file tree
Showing 40 changed files with 398 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
Expand All @@ -28,12 +27,17 @@ public DatabaseClientTracer() {
tracer = OpenTelemetry.getGlobalTracer(getInstrumentationName(), getVersion());
}

public Span startSpan(CONNECTION connection, QUERY query) {
public boolean shouldStartSpan(Context parentContext) {
return parentContext.get(CONTEXT_CLIENT_SPAN_KEY) == null;
}

public Context startSpan(Context parentContext, CONNECTION connection, QUERY query) {
String normalizedQuery = normalizeQuery(query);

Span span =
tracer
.spanBuilder(spanName(connection, query, normalizedQuery))
.setParent(parentContext)
.setSpanKind(CLIENT)
.setAttribute(SemanticAttributes.DB_SYSTEM, dbSystem(connection))
.startSpan();
Expand All @@ -44,20 +48,7 @@ public Span startSpan(CONNECTION connection, QUERY query) {
}
onStatement(span, normalizedQuery);

return span;
}

/**
* Creates new scoped context with the given span.
*
* <p>Attaches new context to the request to avoid creating duplicate client spans.
*/
@Override
public Scope startScope(Span span) {
// TODO we could do this in one go, but TracingContextUtils.CONTEXT_SPAN_KEY is private
Context clientSpanContext = Context.current().with(CONTEXT_CLIENT_SPAN_KEY, span);
Context newContext = clientSpanContext.with(span);
return newContext.makeCurrent();
return parentContext.with(span).with(CONTEXT_CLIENT_SPAN_KEY, span);
}

@Override
Expand All @@ -70,13 +61,12 @@ public Span getClientSpan() {
return context.get(CONTEXT_CLIENT_SPAN_KEY);
}

@Override
public void end(Span span) {
span.end();
public void end(Context context) {
Span.fromContext(context).end();
}

@Override
public void endExceptionally(Span span, Throwable throwable) {
public void endExceptionally(Context context, Throwable throwable) {
Span span = Span.fromContext(context);
onError(span, throwable);
end(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.datastax.driver.core.Session;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import io.opentelemetry.javaagent.instrumentation.api.db.DbSystem;
Expand Down Expand Up @@ -53,9 +54,10 @@ protected InetSocketAddress peerAddress(Session session) {
return null;
}

public void end(Span span, ExecutionInfo executionInfo) {
public void end(Context context, ExecutionInfo executionInfo) {
Span span = Span.fromContext(context);
Host host = executionInfo.getQueriedHost();
NetPeerUtils.INSTANCE.setNetPeer(span, host.getSocketAddress());
end(span);
end(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.Map;

Expand Down Expand Up @@ -58,97 +58,97 @@ public Session apply(Session session) {

@Override
public ResultSet execute(String query) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}

@Override
public ResultSet execute(String query, Object... values) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}

@Override
public ResultSet execute(String query, Map<String, Object> values) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}

@Override
public ResultSet execute(Statement statement) {
Span span = tracer().startSpan(session, getQuery(statement));
Context context = tracer().startSpan(Context.current(), session, getQuery(statement));
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(statement);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}

@Override
public ResultSetFuture executeAsync(String query) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}

@Override
public ResultSetFuture executeAsync(String query, Object... values) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}

@Override
public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}

@Override
public ResultSetFuture executeAsync(Statement statement) {
String query = getQuery(statement);
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(statement);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}
Expand Down Expand Up @@ -209,18 +209,18 @@ private static String getQuery(Statement statement) {
return query == null ? "" : query;
}

private void addCallbackToEndSpan(ResultSetFuture future, final Span span) {
private void addCallbackToEndSpan(ResultSetFuture future, Context context) {
Futures.addCallback(
future,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
tracer().end(span, result.getExecutionInfo());
tracer().end(context, result.getExecutionInfo());
}

@Override
public void onFailure(Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
}
},
directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import io.opentelemetry.javaagent.instrumentation.api.db.DbSystem;
Expand Down Expand Up @@ -48,11 +49,12 @@ protected InetSocketAddress peerAddress(CqlSession cqlSession) {
return null;
}

public void onResponse(Span span, ExecutionInfo executionInfo) {
public void onResponse(Context context, ExecutionInfo executionInfo) {
Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
SocketAddress socketAddress = coordinator.getEndPoint().resolve();
if (socketAddress instanceof InetSocketAddress) {
Span span = Span.fromContext(context);
NetPeerUtils.INSTANCE.setNetPeer(span, ((InetSocketAddress) socketAddress));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -174,17 +174,17 @@ public <REQUEST extends Request, RESULT> RESULT execute(
@NonNull
public ResultSet execute(@NonNull String query) {

Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(query);
tracer().onResponse(span, resultSet.getExecutionInfo());
tracer().onResponse(context, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
throw e;
} finally {
tracer().end(span);
tracer().end(context);
}
}
}
Expand All @@ -194,17 +194,17 @@ public ResultSet execute(@NonNull String query) {
public ResultSet execute(@NonNull Statement<?> statement) {
String query = getQuery(statement);

Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(statement);
tracer().onResponse(span, resultSet.getExecutionInfo());
tracer().onResponse(context, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
throw e;
} finally {
tracer().end(span);
tracer().end(context);
}
}
}
Expand All @@ -214,16 +214,16 @@ public ResultSet execute(@NonNull Statement<?> statement) {
public CompletionStage<AsyncResultSet> executeAsync(@NonNull Statement<?> statement) {
String query = getQuery(statement);

Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
CompletionStage<AsyncResultSet> stage = session.executeAsync(statement);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().onResponse(span, asyncResultSet.getExecutionInfo());
tracer().end(span);
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});
}
Expand All @@ -232,16 +232,16 @@ public CompletionStage<AsyncResultSet> executeAsync(@NonNull Statement<?> statem
@Override
@NonNull
public CompletionStage<AsyncResultSet> executeAsync(@NonNull String query) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
CompletionStage<AsyncResultSet> stage = session.executeAsync(query);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().onResponse(span, asyncResultSet.getExecutionInfo());
tracer().end(span);
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});
}
Expand Down
Loading

0 comments on commit 4cbfb36

Please sign in to comment.