Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-streams: cleans up after deprecation removal #1404

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 6 additions & 63 deletions instrumentation/kafka-streams/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Brave Kafka Streams instrumentation [EXPERIMENTAL]

Add decorators for Kafka Streams to enable tracing.
* `TracingKafkaClientSupplier` a client supplier which traces poll and send operations.
* `TracingProcessorSupplier` completes a span on `process`
* `TracingTransformerSupplier` completes a span on `transform`
* `KafkaStreamsTracing` completes a span on `process` or `processValues`

This does not trace all operations by default. See [RATIONALE.md] for why.

Expand All @@ -19,12 +17,10 @@ kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
```

[KIP-820](https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API)
introduces new processor APIs to the Kafka Streams DSL.
The following sections show how to instrument applications with the latest and previous APIs.
introduces new processor APIs to the Kafka Streams DSL. You must use version >= v3.4.0
to instrument applications.

## Kafka Streams >= v3.4.0

To trace a processor in your application use the `TracingV2ProcessorSupplier`, provided by instrumentation API:
To trace a processor in your application use `kafkaStreamsTracing.process` like so:

```java
builder.stream(inputTopic)
Expand All @@ -33,68 +29,15 @@ builder.stream(inputTopic)
customProcessor));
```

or the `TracingV2FixedKeyProcessorSupplier`, provided by instrumentation API:
or use `kafkaStreamsTracing.processValues` like so:

```java
builder.stream(inputTopic)
.processValues(kafkaStreamsTracing.processValues(
"process",
"processValues",
customProcessor));
```

## Kafka Streams < v3.4.0

To trace a processor in your application use `TracingProcessorSupplier`, provided by instrumentation API:

```java
builder.stream(inputTopic)
.processor(kafkaStreamsTracing.processor(
"process",
customProcessor));
```

To trace a transformer, use `TracingTransformerSupplier`, `TracingValueTransformerSupplier`, or `TracingValueTransformerWithValueSupplier` provided by instrumentation API:

```java
builder.stream(inputTopic)
.transform(kafkaStreamsTracing.transformer(
"transformer-1",
customTransformer))
.to(outputTopic);
```

```java
builder.stream(inputTopic)
.transformValue(kafkaStreamsTracing.valueTransformer(
"transform-value",
customTransformer))
.to(outputTopic);
```

```java
builder.stream(inputTopic)
.transformValueWithKey(kafkaStreamsTracing.valueTransformerWithKey(
"transform-value-with-key",
customTransformer))
.to(outputTopic);
```

Additional transformers have been introduced to cover most common Kafka Streams DSL operations (e.g. `map`, `mapValues`, `foreach`, `peek`, `filter`).

```java
builder.stream(inputTopic)
.transform(kafkaStreamsTracing.map("map", mapper))
.to(outputTopic);
```

For flat operations like flatMap, the `flatTransform` method can be used:

```java
builder.stream(inputTopic)
.flatTransform(kafkaStreamsTracing.flatMap("flat-map", mapper))
.to(outputTopic);
```

For more details, [see here](https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java).

To create a Kafka Streams with Tracing Client Supplier enabled, pass your topology and configuration like this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

/** Use this class to decorate Kafka Stream Topologies and enable Tracing. */
public final class KafkaStreamsTracing {
Expand Down Expand Up @@ -85,7 +87,7 @@ public static Builder newBuilder(MessagingTracing messagingTracing) {
* Provides a {@link KafkaClientSupplier} with tracing enabled, hence Producer and Consumer
* operations will be traced.
* <p>
* This is mean to be used in scenarios {@link KafkaStreams} creation is not controlled by the
* This is meant to be used in scenarios {@link KafkaStreams} creation is not controlled by the
* user but framework (e.g. Spring Kafka Streams) creates it, and {@link KafkaClientSupplier} is
* accepted.
*/
Expand Down Expand Up @@ -113,7 +115,7 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) {
}

/**
* Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}
* Create a tracing-decorated {@link ProcessorSupplier}
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
Expand All @@ -124,13 +126,13 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) {
*
* @see TracingKafkaClientSupplier
*/
public <KIn, VIn, KOut, VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> process(String spanName,
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
return new TracingV2ProcessorSupplier<>(this, spanName, processorSupplier);
public <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> process(String spanName,
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
return new TracingProcessorSupplier<>(this, spanName, processorSupplier);
}

/**
* Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier}
* Create a tracing-decorated {@link FixedKeyProcessorSupplier}
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
Expand All @@ -141,34 +143,22 @@ public <KIn, VIn, KOut, VOut> org.apache.kafka.streams.processor.api.ProcessorSu
*
* @see TracingKafkaClientSupplier
*/
public <KIn, VIn, VOut> org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn, VIn, VOut> processValues(String spanName,
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
return new TracingV2FixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
public <KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> processValues(String spanName,
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
return new TracingFixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
}

static void addTags(ProcessorContext processorContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processorContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processorContext.taskId().toString());
}

static void addTags(org.apache.kafka.streams.processor.api.ProcessingContext processingContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processingContext.applicationId());
static void addTags(ProcessingContext processingContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG,
processingContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processingContext.taskId().toString());
}

Span nextSpan(ProcessorContext context) {
TraceContextOrSamplingFlags extracted = extractor.extract(context.headers());
// Clear any propagation keys present in the headers
if (!extracted.equals(emptyExtraction)) {
clearHeaders(context.headers());
}
Span result = tracer.nextSpan(extracted);
if (!result.isNoop()) {
addTags(context, result);
}
return result;
}

Span nextSpan(ProcessingContext context, Headers headers) {
TraceContextOrSamplingFlags extracted = extractor.extract(headers);
// Clear any propagation keys present in the headers
Expand All @@ -183,7 +173,7 @@ Span nextSpan(ProcessingContext context, Headers headers) {
}

// We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3
// multi, or visa versa.
// multi, or vice versa.
void clearHeaders(Headers headers) {
// Headers::remove creates and consumes an iterator each time. This does one loop instead.
for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@

import static brave.internal.Throwables.propagateIfFatal;

/*
* Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes
* and those that implement the new kafka streams API introduced in version 3.4.0
*/
class TracingV2FixedKeyProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {
class TracingFixedKeyProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final Tracer tracer;
final String spanName;
final FixedKeyProcessor<KIn, VIn, VOut> delegateProcessor;

FixedKeyProcessorContext processorContext;

TracingV2FixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, FixedKeyProcessor<KIn, VIn, VOut> delegateProcessor) {
TracingFixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, FixedKeyProcessor<KIn, VIn, VOut> delegateProcessor) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracer;
this.spanName = spanName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,25 +16,23 @@
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

/*
* Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes
* and those that implement the new kafka streams API introduced in version 3.4.0
*/
class TracingV2FixedKeyProcessorSupplier<KIn, VIn, VOut> implements FixedKeyProcessorSupplier<KIn, VIn, VOut> {
class TracingFixedKeyProcessorSupplier<KIn, VIn, VOut>
implements FixedKeyProcessorSupplier<KIn, VIn, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final String spanName;
final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegateProcessorSupplier;

TracingV2FixedKeyProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
String spanName,
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
TracingFixedKeyProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
String spanName,
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.spanName = spanName;
this.delegateProcessorSupplier = processorSupplier;
}

/** This wraps process method to enable tracing. */
@Override public FixedKeyProcessor<KIn, VIn, VOut> get() {
return new TracingV2FixedKeyProcessor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get());
return new TracingFixedKeyProcessor<>(kafkaStreamsTracing, spanName,
delegateProcessorSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@

import static brave.internal.Throwables.propagateIfFatal;

/*
* Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes
* and those that implement the new kafka streams API introduced in version 3.4.0
*/
class TracingV2Processor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
class TracingProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final Tracer tracer;
final String spanName;
final Processor<KIn, VIn, KOut, VOut> delegateProcessor;

ProcessorContext processorContext;

TracingV2Processor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, Processor<KIn, VIn, KOut, VOut> delegateProcessor) {
TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, Processor<KIn, VIn, KOut, VOut> delegateProcessor) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracer;
this.spanName = spanName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,25 +16,22 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

/*
* Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes
* and those that implement the new kafka streams API introduced in version 3.4.0
*/
class TracingV2ProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
class TracingProcessorSupplier<KIn, VIn, KOut, VOut>
implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final String spanName;
final ProcessorSupplier<KIn, VIn, KOut, VOut> delegateProcessorSupplier;

TracingV2ProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
String spanName,
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
TracingProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
String spanName,
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.spanName = spanName;
this.delegateProcessorSupplier = processorSupplier;
}

/** This wraps process method to enable tracing. */
@Override public Processor<KIn, VIn, KOut, VOut> get() {
return new TracingV2Processor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get());
return new TracingProcessor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -25,7 +25,7 @@ public class KafkaHeadersTest {
record.headers().add("b3", new byte[] {'1'});

assertThat(KafkaHeaders.lastStringHeader(record.headers(), "b3"))
.isEqualTo("1");
.isEqualTo("1");
}

@Test void lastStringHeader_null() {
Expand All @@ -36,14 +36,14 @@ public class KafkaHeadersTest {
KafkaHeaders.replaceHeader(record.headers(), "b3", "1");

assertThat(record.headers().lastHeader("b3").value())
.containsExactly('1');
.containsExactly('1');
}

@Test void replaceHeader_replace() {
record.headers().add("b3", new byte[0]);
KafkaHeaders.replaceHeader(record.headers(), "b3", "1");

assertThat(record.headers().lastHeader("b3").value())
.containsExactly('1');
.containsExactly('1');
}
}
Loading