Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ratpack server context propagation and enable its concurrency test #2910

Merged
merged 2 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public static class SetExecuteRunnableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could drop newTask and just assign to task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

task = newTask;
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
Expand Down Expand Up @@ -118,8 +118,8 @@ public static class SetSubmitRunnableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
task = newTask;
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
Expand Down Expand Up @@ -148,8 +148,8 @@ public static class SetCallableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Callable task) {
Callable newTask = CallableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
Callable newTask = CallableWrapper.wrapIfNeeded(task);
task = newTask;
ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import ratpack.handling.Context;
import ratpack.handling.Handler;

Expand All @@ -23,9 +24,14 @@ public void handle(Context ctx) {
ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_SPAN);
io.opentelemetry.context.Context serverSpanContext = spanAttribute.get();

// Relying on executor instrumentation to assume the netty span is in context as the parent.
// Must use context from channel, as executor instrumentation is not accurate - Ratpack
// internally queues events and then drains them in batches, causing executor instrumentation to
// attach the same context to a batch of events from different requests.
Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

io.opentelemetry.context.Context parentContext =
serverSpanContext != null ? serverSpanContext : Java8BytecodeBridge.currentContext();

io.opentelemetry.context.Context ratpackContext =
tracer().startSpan("ratpack.handler", SpanKind.INTERNAL);
tracer().startSpan(parentContext, "ratpack.handler", SpanKind.INTERNAL);
ctx.getExecution().add(ratpackContext);

ctx.getResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package server

import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import ratpack.exec.Promise
import ratpack.groovy.test.embed.GroovyEmbeddedApp
import ratpack.test.embed.EmbeddedApp
Expand Down Expand Up @@ -40,6 +42,18 @@ class RatpackAsyncHttpServerTest extends RatpackHttpServerTest {
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
Promise.sync {
INDEXED_CHILD
} then {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
Promise.sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package server

import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import ratpack.exec.Promise
import ratpack.groovy.test.embed.GroovyEmbeddedApp
import ratpack.test.embed.EmbeddedApp
Expand Down Expand Up @@ -40,6 +42,18 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest {
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
Promise.sync {
INDEXED_CHILD
}.fork().then {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
Promise.sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package server
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
Expand Down Expand Up @@ -43,6 +45,14 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
controller(QUERY_PARAM) {
Expand Down Expand Up @@ -108,6 +118,11 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
true
}

@Override
boolean testConcurrency() {
true
}

@Override
void handlerSpan(TraceAssert trace, int index, Object parent, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ protected Boolean computeValue(Class<?> taskClass) {
return false;
}

if (taskClass.getName().startsWith("ratpack.exec.internal.")) {
// Context is passed through Netty channels in Ratpack as executor instrumentation is
// not suitable. As the context that would be propagated via executor would be
// incorrect, skip the propagation. Not checking for concrete class names as this covers
// anonymous classes from ratpack.exec.internal.DefaultExecution and
// ratpack.exec.internal.DefaultExecController.
Comment on lines +96 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return false;
}

return true;
}
};
Expand Down