Skip to content

Commit

Permalink
Depend on Micrometer v1.13.0-SNAPSHOT
Browse files Browse the repository at this point in the history
Depend on Micrometer Tracing v1.3.0-SNAPSHOT
Depend on Micrometer Doc Generator v1.0.2
Depend on Context Propagation v1.1.0
  • Loading branch information
violetagg committed Apr 17, 2024
1 parent 78f6651 commit 883f078
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 56 deletions.
10 changes: 4 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ ext {
}

//Metrics
micrometerVersion = '1.10.0' //optional baseline
micrometerTracingVersion = '1.0.0' //optional baseline
micrometerDocsVersion = '1.0.0' //optional baseline
micrometerVersion = '1.13.0-SNAPSHOT' //optional baseline
micrometerTracingVersion = '1.3.0-SNAPSHOT' //optional baseline
micrometerDocsVersion = '1.0.2' //optional baseline

contextPropagationDefaultVersion = '1.0.0' //optional baseline
contextPropagationDefaultVersion = '1.1.0' //optional baseline
if (!project.hasProperty("forceContextPropagationVersion")) {
contextPropagationVersion = contextPropagationDefaultVersion
}
Expand Down Expand Up @@ -282,8 +282,6 @@ subprojects {
systemProperty("forceTransport", forceTransport)
}
systemProperty("nettyVersionMicro", VersionNumber.parse(nettyVersion.toString()).micro)
systemProperty("reactorCoreVersionMinor", VersionNumber.parse(reactorCoreVersion.toString()).minor)
systemProperty("contextPropagationVersionMicro", VersionNumber.parse(contextPropagationVersion.toString()).micro)
scanForTestClasses = false
include '**/*Tests.*'
include '**/*Test.*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings({"FutureReturnValueIgnored", "try"})
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings({"FutureReturnValueIgnored", "try"})
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -68,7 +69,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings({"FutureReturnValueIgnored", "try"})
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
//"FutureReturnValueIgnored" this is deliberate
ctx.connect(remoteAddress, localAddress, promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -70,7 +71,7 @@ static final class CustomChannelInboundHandler extends ChannelInboundHandlerAdap
@Override
@SuppressWarnings("try")
public void channelActive(ChannelHandlerContext ctx) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
ctx.fireChannelActive();
}
Expand Down
7 changes: 6 additions & 1 deletion reactor-netty-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ dependencies {
// JSR-305 annotations
testCompileOnly "com.google.code.findbugs:jsr305:$jsr305Version"

testImplementation "org.mockito:mockito-core:$mockitoVersion"
testImplementation ("org.mockito:mockito-core") {
// Workaround for https://github.com/micrometer-metrics/micrometer/issues/4968
version {
strictly "$mockitoVersion"
}
}
testImplementation "io.specto:hoverfly-java-junit5:$hoverflyJavaVersion"
testImplementation "org.apache.tomcat.embed:tomcat-embed-core:$tomcatVersion"
testImplementation "io.projectreactor:reactor-test:$testAddonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,14 @@ void testContextPropagation(HttpClient client) {
@ParameterizedTest
@MethodSource("httpClientCombinations")
void testAutomaticContextPropagation(HttpClient client) {
String reactorCoreVersionMinor = System.getProperty("reactorCoreVersionMinor");
String contextPropagationVersionMicro = System.getProperty("contextPropagationVersionMicro");

boolean enableAutomaticContextPropagation =
reactorCoreVersionMinor != null && !reactorCoreVersionMinor.isEmpty() && Integer.parseInt(reactorCoreVersionMinor) >= 6 && // 3.6.x
contextPropagationVersionMicro != null && !contextPropagationVersionMicro.isEmpty() && Integer.parseInt(contextPropagationVersionMicro) >= 5; // 1.0.5 or above

HttpServer server = client.configuration().sslProvider() != null ?
baseServer.secure(spec -> spec.sslContext(serverCtx)).protocol(HttpProtocol.HTTP11, HttpProtocol.H2) :
baseServer.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);

disposableServer = server.bindNow();

try {
if (enableAutomaticContextPropagation) {
Hooks.enableAutomaticContextPropagation();
}
Hooks.enableAutomaticContextPropagation();

registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());

Expand All @@ -151,17 +142,15 @@ void testAutomaticContextPropagation(HttpClient client) {
.uri("/")
.send(ByteBufMono.fromString(Mono.just("test")));

response(responseReceiver, enableAutomaticContextPropagation);
responseConnection(responseReceiver, enableAutomaticContextPropagation);
responseContent(responseReceiver, enableAutomaticContextPropagation);
responseSingle(responseReceiver, enableAutomaticContextPropagation);
response(responseReceiver);
responseConnection(responseReceiver);
responseContent(responseReceiver);
responseSingle(responseReceiver);
}
finally {
TestThreadLocalHolder.reset();
registry.removeThreadLocalAccessor(TestThreadLocalAccessor.KEY);
if (enableAutomaticContextPropagation) {
Hooks.disableAutomaticContextPropagation();
}
Hooks.disableAutomaticContextPropagation();
disposableServer.disposeNow();
}
}
Expand Down Expand Up @@ -206,63 +195,43 @@ static Object[] httpClientCombinations() {
};
}

static void response(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void response(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.response()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseConnection(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseConnection(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseConnection((res, conn) -> conn.inbound().receive().aggregate().asString())
.next()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseContent(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseContent(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseContent()
.aggregate()
.asString()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseSingle(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseSingle(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseSingle((res, bytes) -> bytes.asString())
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void sendRequest(HttpClient client, String expectation) {
Expand Down

0 comments on commit 883f078

Please sign in to comment.