Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cosmetic doc improvements part 3 #2877

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<S

...

@KafkaListener(... containerPostProcessor="customContainerPostProcessor" ...)
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)
----

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
|[[assignmentCommitOption]]<<assignmentCommitOption,`assignmentCommitOption`>>
|LATEST_ONLY _NO_TX
|Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present.
See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.
See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.

|[[asyncAcks]]<<asyncAcks,`asyncAcks`>>
|false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc

Using transactions enables Exactly Once Semantics (EOS).

This means that, for a `read->process-write` sequence, it is guaranteed that the **sequence** is completed exactly once.
(The read and process are have at least once semantics).
This means that, for a `read+++->+++process+++->+++write` sequence, it is guaranteed that the **sequence** is completed exactly once.
(The read and process have at least once semantics).

Spring for Apache Kafka version 3.0 and later only supports `EOSMode.V2`:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface KafkaHeaderMapper {
The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values.

The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A "`special`" header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
A +++"+++`special`+++"+++ header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

On the inbound side, all Kafka `Header` instances are mapped to `MessageHeaders`.
Expand Down Expand Up @@ -70,7 +70,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
<3> Uses a default Jackson `ObjectMapper` and maps headers according to the provided patterns.
<4> Uses the provided Jackson `ObjectMapper` and maps headers according to the provided patterns.

Patterns are rather simple and can contain a leading wildcard (`*`), a trailing wildcard, or both (for example, `*.cat.*`).
Patterns are rather simple and can contain a leading wildcard (`+++*+++`), a trailing wildcard, or both (for example, `+++*+++.cat.+++*+++`).
You can negate patterns with a leading `!`.
The first pattern that matches a header name (whether positive or negative) wins.

Expand All @@ -79,7 +79,7 @@ When you provide your own patterns, we recommend including `!id` and `!timestamp
IMPORTANT: By default, the mapper deserializes only classes in `java.lang` and `java.util`.
You can trust other (or all) packages by adding trusted packages with the `addTrustedPackages` method.
If you receive messages from untrusted sources, you may wish to add only those packages you trust.
To trust all packages, you can use `mapper.addTrustedPackages("*")`.
To trust all packages, you can use `mapper.addTrustedPackages("+++*+++")`.

NOTE: Mapping `String` header values in a raw form is useful when communicating with systems that are not aware of the mapper's JSON format.

Expand Down Expand Up @@ -150,17 +150,17 @@ By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConve
With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.

If there is no converter (either because Jackson is not present or it is explicitly set to `null`), the headers from the consumer record are provided unconverted in the `KafkaHeaders.NATIVE_HEADERS` header.
This header is a `Headers` object (or a `List<Headers>` in the case of the batch converter), where the position in the list corresponds to the data position in the payload).
This header is a `Headers` object (or a `List<Headers>` in the case of the batch converter), where the position in the list corresponds to the data position in the payload.

IMPORTANT: Certain types are not suitable for JSON serialization, and a simple `toString()` serialization might be preferred for these types.
The `DefaultKafkaHeaderMapper` has a method called `addToStringClasses()` that lets you supply the names of classes that should be treated this way for outbound mapping.
During inbound mapping, they are mapped as `String`.
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.

NOTE: Starting with version 2.3, handling of String-valued headers is simplified.
Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"..."` added).
Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"+++...+++"` added).
The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from `byte[]`).
The mapper can handle (decode) headers produced by older versions (it checks for a leading `"`); in this way an application using 2.3 can consume records from older versions.
The mapper can handle (decode) headers produced by older versions (it checks for a leading `+++"+++`); in this way an application using 2.3 can consume records from older versions.

IMPORTANT: To be compatible with earlier versions, set `encodeStrings` to `true`, if records produced by a version using 2.3 might be consumed by applications using earlier versions.
When all applications are using 2.3 or higher, you can leave the property at its default value of `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Apache Kafka provides a mechanism to add interceptors to producers and consumers.
These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won't work for wiring in dependent Spring Beans.
However, you can manually wire in those dependencies using the interceptor `config()` method.
The following Spring Boot application shows how to do this by overriding boot's default factories to add some dependent bean into the configuration properties.
The following Spring Boot application shows how to do this by overriding Spring Boot's default factories to add some dependent bean into the configuration properties.

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
[[monitoring-listener-performance]]
== Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer` s for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty` `micrometerEnabled` to `false`.
Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty`+++'+++s `micrometerEnabled` to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.

Expand All @@ -15,16 +15,16 @@ The timers are named `spring.kafka.listener` and have the following tags:
* `result` : `success` or `failure`
* `exception` : `none` or `ListenerExecutionFailedException`

You can add additional tags using the `ContainerProperties` `micrometerTags` property.
You can add additional tags using the `ContainerProperties`+++'+++s `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties` `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.
Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties`+++'+++s `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.

[[monitoring-kafkatemplate-performance]]
== Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer `Timer` s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template's `micrometerEnabled` property to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.
Expand Down Expand Up @@ -83,7 +83,7 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
.count();
----

A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support].
Expand All @@ -103,7 +103,7 @@ The default implementations add the `bean.name` tag for template observations an

You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.

See xref:appendix.adoc#observation-gen[Micrometer Observation Documentation] for details of the default observations that are recorded.
See xref:appendix/micrometer.adoc#observation-gen[Micrometer Observation Documentation] for details of the default observations that are recorded.

Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records.
To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
:page-section-summary-toc: 1

Since version 2.7 you can pause and resume the consumption of specific partitions assigned to that consumer by using the `pausePartition(TopicPartition topicPartition)` and `resumePartition(TopicPartition topicPartition)` methods in the listener containers.
The pausing and resuming takes place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods.
The pausing and resuming take place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods.
The `isPartitionPauseRequested()` method returns true if pause for that partition has been requested.
The `isPartitionPaused()` method returns true if that partition has effectively been paused.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class MyProducerInterceptor implements ProducerInterceptor<String, String

@Override
public void configure(Map<String, ?> configs) {

}

@Override
Expand Down Expand Up @@ -47,7 +46,7 @@ public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(pf);
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ For another technique to achieve similar results, but with the additional capabi

As seen in xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`], a `ProducerFactory` is used to create the producer.

When not using xref:kafka/transactions.adoc[Transactions], by default, the `DefaultKafkaProducerFactory` creates a singleton producer used by all clients, as recommended in the `KafkaProducer` javadocs.
When not using xref:kafka/transactions.adoc[Transactions], by default, the `DefaultKafkaProducerFactory` creates a singleton producer used by all clients, as recommended in the `KafkaProducer` JavaDocs.
However, if you call `flush()` on the template, this can cause delays for other threads using the same producer.
Starting with version 2.3, the `DefaultKafkaProducerFactory` has a new property `producerPerThread`.
When set to `true`, the factory will create (and cache) a separate producer for each thread, to avoid this issue.
Expand Down Expand Up @@ -708,6 +708,6 @@ After a rebalance, it is possible for duplicate reply deliveries; these will be
NOTE: If you use an xref:kafka/serdes.adoc#error-handling-deserializer[`ErrorHandlingDeserializer`] with this aggregating template, the framework will not automatically detect `DeserializationException`+++s+++.
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
See its javadocs for more information.
See its JavaDocs for more information.
The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply.

Loading
Loading