Skip to content

Commit

Permalink
Ensure span.finish() is run only once with client call terminates (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravirajj authored and malafeev committed Apr 10, 2019
1 parent af3cc88 commit bf434b6
Show file tree
Hide file tree
Showing 9 changed files with 1,203 additions and 608 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ ClientTracingInterceptor clientInterceptor = new ClientTracingInterceptor
.withClientCloseDecorator(new ClientCloseDecorator() {
@Override
public void close(Span span, Status status, Metadata trailers) {
span.setTag("grpc.statusCode", status.getCode().value());
span.setTag("some_other_tag", "some_other_value");
}
})
...
Expand All @@ -297,13 +297,13 @@ ServerTracingInterceptor serverInterceptor = new ServerTracingInterceptor
@Override
public void interceptCall(Span span, ServerCall call, Metadata headers) {
span.setTag("some_tag", "some_value");
span.log("Example log");
span.log("Intercepting server call");
}
})
.withServerCloseDecorator(new ServerCloseDecorator() {
@Override
public void close(Span span, Status status, Metadata trailers) {
span.setTag("grpc.statusCode", status.getCode().value());
span.setTag("some_other_tag", "some_other_value");
}
})
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,6 @@ public Channel intercept(Channel channel) {
return ClientInterceptors.intercept(channel, this);
}

private SpanContext getActiveSpanContext() {
if (activeSpanSource != null) {
Span activeSpan = activeSpanSource.getActiveSpan();
if (activeSpan != null) {
return activeSpan.context();
}
}
if (activeSpanContextSource != null) {
final SpanContext spanContext = activeSpanContextSource.getActiveSpanContext();
if (spanContext != null) {
return spanContext;
}
}
if (tracer.activeSpan() != null) {
return tracer.activeSpan().context();
}
return null;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Expand Down Expand Up @@ -169,6 +150,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {

volatile boolean finished = false;

@Override
public void start(Listener<RespT> responseListener, final Metadata headers) {
if (verbose) {
Expand Down Expand Up @@ -223,30 +206,26 @@ public void onMessage(RespT message) {

@Override
public void onClose(Status status, Metadata trailers) {
if (finished) {
delegate().onClose(status, trailers);
return;
}

if (verbose) {
if (status.getCode() == Status.Code.OK) {
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.CLIENT_CALL_LISTENER_ON_CLOSE)
.put(Fields.MESSAGE, "Client call closed")
.build());
} else {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.ERROR)
.put(Fields.ERROR_KIND, status.getCode());
Throwable cause = status.getCause();
if (cause != null) {
builder.put(Fields.ERROR_OBJECT, cause);
}
String description = status.getDescription();
builder.put(Fields.MESSAGE, description != null ? description : "Client call failed");
span.log(builder.build());
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.CLIENT_CALL_LISTENER_ON_CLOSE)
.put(Fields.MESSAGE, "Client call closed")
.build());
if (!status.isOk()) {
GrpcFields.logClientCallError(span, status.getDescription(), status.getCause());
}
}
GrpcTags.GRPC_STATUS.set(span, status);
if (clientCloseDecorator != null) {
clientCloseDecorator.close(span, status, trailers);
}
delegate().onClose(status, trailers);
finished = true;
span.finish();
}
};
Expand All @@ -263,23 +242,16 @@ public void request(int numMessages) {
}
}


@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (verbose) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
if (cause != null) {
builder
.put(Fields.EVENT, GrpcFields.ERROR)
.put(Fields.ERROR_OBJECT, cause);
} else {
builder.put(Fields.EVENT, GrpcFields.CLIENT_CALL_CANCEL);
}
builder.put(Fields.MESSAGE, message != null ? message : "Client call canceled");
span.log(builder.build());
public void sendMessage(ReqT message) {
if (streaming || verbose) {
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.CLIENT_CALL_SEND_MESSAGE)
.put(Fields.MESSAGE, "Client sent message")
.build());
}
try (Scope ignored = tracer.scopeManager().activate(span)) {
delegate().cancel(message, cause);
delegate().sendMessage(message);
}
}

Expand All @@ -297,15 +269,26 @@ public void halfClose() {
}

@Override
public void sendMessage(ReqT message) {
if (streaming || verbose) {
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (finished) {
delegate().cancel(message, cause);
return;
}

if (verbose) {
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.CLIENT_CALL_SEND_MESSAGE)
.put(Fields.MESSAGE, "Client sent message")
.put(Fields.EVENT, GrpcFields.CLIENT_CALL_CANCEL)
.put(Fields.MESSAGE, "Client call canceled")
.build());
GrpcFields.logClientCallError(span, message, cause);
}
Status status = cause == null ? Status.UNKNOWN : Status.fromThrowable(cause);
GrpcTags.GRPC_STATUS.set(span, status.withDescription(message));
try (Scope ignored = tracer.scopeManager().activate(span)) {
delegate().sendMessage(message);
delegate().cancel(message, cause);
} finally {
finished = true;
span.finish();
}
}

Expand Down Expand Up @@ -333,6 +316,25 @@ public Attributes getAttributes() {
}
}

private SpanContext getActiveSpanContext() {
if (activeSpanSource != null) {
Span activeSpan = activeSpanSource.getActiveSpan();
if (activeSpan != null) {
return activeSpan.context();
}
}
if (activeSpanContextSource != null) {
final SpanContext spanContext = activeSpanContextSource.getActiveSpanContext();
if (spanContext != null) {
return spanContext;
}
}
if (tracer.activeSpan() != null) {
return tracer.activeSpan().context();
}
return null;
}

private Span createSpanFromParent(SpanContext parentSpanContext, String operationName) {
final Tracer.SpanBuilder spanBuilder;
if (parentSpanContext == null) {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/opentracing/contrib/grpc/GrpcFields.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package io.opentracing.contrib.grpc;

import com.google.common.collect.ImmutableMap;
import io.opentracing.Span;
import io.opentracing.log.Fields;

final class GrpcFields {

static final String ERROR = "error";
Expand All @@ -35,4 +39,30 @@ final class GrpcFields {
static final String SERVER_CALL_LISTENER_ON_HALF_CLOSE = "server-call-listener-on-half-close";
static final String SERVER_CALL_LISTENER_ON_CANCEL = "server-call-listener-on-cancel";
static final String SERVER_CALL_LISTENER_ON_COMPLETE = "server-call-listener-on-complete";

static void logClientCallError(Span span, String message, Throwable cause) {
logCallError(span, message, cause, "Client");
}

static void logServerCallError(Span span, String message, Throwable cause) {
logCallError(span, message, cause, "Server");
}

private static void logCallError(Span span, String message, Throwable cause, String name) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.ERROR);
String causeMessage = null;
if (cause != null) {
builder.put(Fields.ERROR_OBJECT, cause);
causeMessage = cause.getMessage();
}
if (message != null) {
builder.put(Fields.MESSAGE, message);
} else if (causeMessage != null) {
builder.put(Fields.MESSAGE, causeMessage);
} else {
builder.put(Fields.MESSAGE, name + " call failed");
}
span.log(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.opentracing.contrib.grpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.BindableService;
import io.grpc.Context;
Expand Down Expand Up @@ -189,22 +190,12 @@ public void sendMessage(RespT message) {
@Override
public void close(Status status, Metadata trailers) {
if (verbose) {
if (status.getCode() == Status.Code.OK) {
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.SERVER_CALL_CLOSE)
.put(Fields.MESSAGE, "Server call closed")
.build());
} else {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.ERROR)
.put(Fields.ERROR_KIND, status.getCode());
Throwable cause = status.getCause();
if (cause != null) {
builder.put(Fields.ERROR_OBJECT, cause);
}
String description = status.getDescription();
builder.put(Fields.MESSAGE, description != null ? description : "Server call failed");
span.log(builder.build());
span.log(ImmutableMap.<String, Object>builder()
.put(Fields.EVENT, GrpcFields.SERVER_CALL_CLOSE)
.put(Fields.MESSAGE, "Server call closed")
.build());
if (!status.isOk()) {
GrpcFields.logServerCallError(span, status.getDescription(), status.getCause());
}
}
GrpcTags.GRPC_STATUS.set(span, status);
Expand Down Expand Up @@ -256,6 +247,7 @@ public void onCancel() {
.put(Fields.MESSAGE, "Server call cancelled")
.build());
}
GrpcTags.GRPC_STATUS.set(span, Status.CANCELLED);
try (Scope ignored = tracer.scopeManager().activate(span)) {
delegate().onCancel();
} finally {
Expand All @@ -271,6 +263,7 @@ public void onComplete() {
.put(Fields.MESSAGE, "Server call completed")
.build());
}
// Server span may complete with non-OK ServerCall.close(status).
try (Scope ignored = tracer.scopeManager().activate(span)) {
delegate().onComplete();
} finally {
Expand All @@ -288,7 +281,8 @@ public void onReady() {
}
}

private Span getSpanFromHeaders(Map<String, String> headers, String operationName) {
@VisibleForTesting
Span getSpanFromHeaders(Map<String, String> headers, String operationName) {
Map<String, Object> fields = null;
Tracer.SpanBuilder spanBuilder = tracer.buildSpan(operationName);
try {
Expand Down

0 comments on commit bf434b6

Please sign in to comment.