Skip to content

Commit

Permalink
Stream reactive (#2038)
Browse files Browse the repository at this point in the history
Added native support for Reactive Spring Cloud Stream.
  • Loading branch information
marcingrzejszczak committed Oct 26, 2021
1 parent e1ef593 commit 90417f1
Show file tree
Hide file tree
Showing 15 changed files with 601 additions and 35 deletions.
12 changes: 12 additions & 0 deletions docs/.asciidoctorconfig.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + Initial AsciiDoc editor configuration file - V1.0 +
// ++++++++++++++++++++++++++++++++++++++++++++++++++++++
//
// Did not found any configuration files, so create this at project root level.
// If you do not like those files to be generated - you can turn it off inside Asciidoctor Editor preferences.
//
// You can define editor specific parts here.
// For example: with next line you could set imagesdir attribute to subfolder "images" relative to the folder where this config file is located.
// :imagesdir: {asciidoctorconfigdir}/images
//
// For more information please take a look at https://github.com/de-jcup/eclipse-asciidoctor-editor/wiki/Asciidoctor-configfiles
35 changes: 29 additions & 6 deletions docs/src/main/asciidoc/integrations.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,39 @@ include::{common_tests_path}/src/main/java/org/springframework/cloud/sleuth/inst

This feature is available for all tracer implementations.

Spring Cloud Sleuth can instrument Spring Cloud Function.
Spring Cloud Sleuth can instrument Spring Cloud Function. Since Spring Cloud Stream uses Spring Cloud Function
you will get the messaging instrumentation out of the box.

The way to achieve it is to provide a `Function` or `Consumer` or `Supplier` that takes in a `Message` as a parameter e.g. `Function<Message<String>, Message<Integer>>`.
If the type is not `Message` then instrumentation will not take place.
Out of the box instrumentation will not take place when dealing with Reactor based streams - e.g. `Function<Flux<Message<String>>, Flux<Message<Integer>>>`.
If the type **is not** `Message` then instrumentation **will not** take place.

For a reactive `Consumer<Flux<Message<?>>>` remember to manually close the span and clear the context before you call `.subscribe()`. Example:

[source,java,indent=0]
----
@Bean
Consumer<Flux<Message<String>>> channel(Tracer tracer) {
// For the reactive consumer remember to call "subscribe()" at the end, otherwise
// you'll get the "Dispatcher has no subscribers" error
return i -> i
.doOnNext(s -> log.info("HELLO"))
// You must finish the span yourself and clear the tracing context like presented below.
// Otherwise you will be missing out the span that wraps the function execution.
.doOnNext(s -> {
tracer.currentSpan().end();
tracer.withSpan(null);
})
.subscribe();
}
}
----

Since Spring Cloud Stream reuses Spring Cloud Function, you'll get the instrumentation out of the box.
You can disable Spring Cloud Stream integration by setting the value of `spring.sleuth.function.enabled` to `false`.

You can disable this behavior by setting the value of `spring.sleuth.function.enabled` to `false`.
If you want to fully control the life cycle of spans within the reactive messaging context of Spring Cloud Stream
remember to disable the Spring Cloud Stream integration and leverage the `MessagingSleuthOperators` utility
class that allows you to manipulate the input and output messages in order to continue the tracing context and to execute custom code within the tracing context.

In order to work with reactive Stream functions you can leverage the `MessagingSleuthOperators` utility class that allows you to manipulate the input and output messages in order to continue the tracing context and to execute custom code within the tracing context.

[source,java,indent=0]
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ public interface Tracer extends BaggageManager {
*/
TraceContext.Builder traceContextBuilder();

/**
* Returns the {@link CurrentTraceContext}. Can be {@code null} so that we don't break
* backward compatibility.
* @return current trace context
*/
@Nullable
CurrentTraceContext currentTraceContext();

/**
* Allows to customize the current span in scope.
* @return current span customizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import org.springframework.cloud.sleuth.BaggageInScope;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.ScopedSpan;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanCustomizer;
Expand Down Expand Up @@ -99,4 +100,9 @@ public BaggageInScope createBaggage(String name, String value) {
return new NoOpBaggageInScope();
}

@Override
public CurrentTraceContext currentTraceContext() {
return new NoOpCurrentTraceContext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,10 @@ static Span.Builder toBuilder(Tracer tracer, TraceContextOrSamplingFlags context
return new BraveSpanBuilder(tracer, context);
}

@Override
public String toString() {
return "{" + " delegate='" + this.delegate + "'" + ", parentContext='" + this.parentContext + "'"
+ ", startTimestamp='" + this.startTimestamp + "'" + "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import brave.propagation.TraceContextOrSamplingFlags;

import org.springframework.cloud.sleuth.BaggageInScope;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.ScopedSpan;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanCustomizer;
Expand All @@ -40,9 +41,18 @@ public class BraveTracer implements Tracer {

private final BraveBaggageManager braveBaggageManager;

private final CurrentTraceContext currentTraceContext;

public BraveTracer(brave.Tracer tracer, BraveBaggageManager braveBaggageManager) {
this.tracer = tracer;
this.braveBaggageManager = braveBaggageManager;
this.currentTraceContext = null;
}

public BraveTracer(brave.Tracer tracer, CurrentTraceContext context, BraveBaggageManager braveBaggageManager) {
this.tracer = tracer;
this.braveBaggageManager = braveBaggageManager;
this.currentTraceContext = context;
}

@Override
Expand Down Expand Up @@ -122,6 +132,11 @@ public BaggageInScope createBaggage(String name, String value) {
return this.braveBaggageManager.createBaggage(name).set(value);
}

@Override
public CurrentTraceContext currentTraceContext() {
return this.currentTraceContext;
}

}

class BraveSpanInScope implements Tracer.SpanInScope {
Expand Down
4 changes: 4 additions & 0 deletions spring-cloud-sleuth-instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<version>3.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<properties>
<!-- <kotlin.compiler.incremental>true</kotlin.compiler.incremental> -->
</properties>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public String getKey() {
}
},

/**
* Name of the Spring Cloud Function function name.
*/
FUNCTION_NAME {
@Override
public String getKey() {
return "function.name";
}
},

/**
* User provided keys via customization options.
*/
Expand Down
Loading

0 comments on commit 90417f1

Please sign in to comment.