Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Smallrye Reactive Messaging to 3.8.0 #18972

Merged
merged 2 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>2.6.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>2.12.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>3.7.1</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>3.8.0</smallrye-reactive-messaging.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
<jakarta.el-impl.version>3.0.3</jakarta.el-impl.version>
Expand Down
297 changes: 28 additions & 269 deletions docs/src/main/asciidoc/kafka.adoc

Large diffs are not rendered by default.

155 changes: 155 additions & 0 deletions docs/src/main/asciidoc/smallrye-kafka-incoming.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
.Incoming Attributes of the 'smallrye-kafka' connector
[cols="25, 30, 15, 20",options="header"]
|===
|Attribute (_alias_) | Description | Mandatory | Default

| *bootstrap.servers*

_(kafka.bootstrap.servers)_ | A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: _string_ | false | `localhost:9092`

| *topic* | The consumed / populated Kafka topic. If neither this property nor the `topics` properties are set, the channel name is used

Type: _string_ | false |

| *health-enabled* | Whether health reporting is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *health-readiness-enabled* | Whether readiness health reporting is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *health-readiness-topic-verification* | _deprecated_ - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.

Type: _boolean_ | false |

| *health-readiness-timeout* | _deprecated_ - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.

Type: _long_ | false |

| *health-topic-verification-enabled* | Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.

Type: _boolean_ | false | `false`

| *health-topic-verification-timeout* | During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: _long_ | false | `2000`

| *tracing-enabled* | Whether tracing is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *cloud-events* | Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: _boolean_ | false | `true`

| *topics* | A comma-separating list of topics to be consumed. Cannot be used with the `topic` or `pattern` properties

Type: _string_ | false |

| *pattern* | Indicate that the `topic` property is a regular expression. Must be used with the `topic` property. Cannot be used with the `topics` property

Type: _boolean_ | false | `false`

| *key.deserializer* | The deserializer classname used to deserialize the record's key

Type: _string_ | false | `org.apache.kafka.common.serialization.StringDeserializer`

| *value.deserializer* | The deserializer classname used to deserialize the record's value

Type: _string_ | true |

| *fetch.min.bytes* | The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.

Type: _int_ | false | `1`

| *group.id* | A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the `quarkus.application.name` configuration property.

If that is not set either, a unique, generated id is used.

It is recommended to always define a `group.id`, the automatic generation is only a convenient feature for development.
You can explicitly ask for automatically generated unique id by setting this property to `${quarkus.uuid}`.

Type: _string_ | false |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So originally, we had

| *group.id* | A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the `quarkus.application.name` configuration property.

If that is not set either, a unique, generated id is used.
It is recommended to always define a `group.id`, the automatic generation is only a convenient feature for development.

Type: _string_ | false |

What we have now is a little hard to understand (the mention of automatic generation without previous explanation is strange, and the sentence about quarkus.uuid interrupts the reading flow IMHO).

I'd add the quarkus.uuid stuff at the end, resulting in something like:

| *group.id* | A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the `quarkus.application.name` configuration property.

If that is not set either, a unique, generated id is used.

It is recommended to always define a `group.id`, the automatic generation is only a convenient feature for development.
You can explicitly ask for automatically generated unique id by setting this property to `${quarkus.uuid}`.

Type: _string_ | false |


| *enable.auto.commit* | If enabled, consumer's offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.

Type: _boolean_ | false | `false`

| *retry* | Whether or not the connection to the broker is re-attempted in case of failure

Type: _boolean_ | false | `true`

| *retry-attempts* | The maximum number of reconnection before failing. -1 means infinite retry

Type: _int_ | false | `-1`

| *retry-max-wait* | The max delay (in seconds) between 2 reconnects

Type: _int_ | false | `30`

| *broadcast* | Whether the Kafka records should be dispatched to multiple consumer

Type: _boolean_ | false | `false`

| *auto.offset.reset* | What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none

Type: _string_ | false | `latest`

| *failure-strategy* | Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`

Type: _string_ | false | `fail`

| *commit-strategy* | Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be `latest`, `ignore` or `throttled`. If `enable.auto.commit` is true then the default is `ignore` otherwise it is `throttled`

Type: _string_ | false |

| *throttled.unprocessed-record-max-age.ms* | While using the `throttled` commit-strategy, specify the max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy.

Type: _int_ | false | `60000`

| *dead-letter-queue.topic* | When the `failure-strategy` is set to `dead-letter-queue` indicates on which topic the record is sent. Defaults is `dead-letter-topic-$channel`

Type: _string_ | false |

| *dead-letter-queue.key.serializer* | When the `failure-strategy` is set to `dead-letter-queue` indicates the key serializer to use. If not set the serializer associated to the key deserializer is used

Type: _string_ | false |

| *dead-letter-queue.value.serializer* | When the `failure-strategy` is set to `dead-letter-queue` indicates the value serializer to use. If not set the serializer associated to the value deserializer is used

Type: _string_ | false |

| *partitions* | The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic

Type: _int_ | false | `1`

| *consumer-rebalance-listener.name* | The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set, this rebalance listener is applied to the consumer.

Type: _string_ | false |

| *key-deserialization-failure-handler* | The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler`. If set, deserialization failure happening when deserializing keys are delegated to this handler which may provide a fallback value.

Type: _string_ | false |

| *value-deserialization-failure-handler* | The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler`. If set, deserialization failure happening when deserializing values are delegated to this handler which may provide a fallback value.

Type: _string_ | false |

| *graceful-shutdown* | Whether or not a graceful shutdown should be attempted when the application terminates.

Type: _boolean_ | false | `true`

| *poll-timeout* | The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms

Type: _int_ | false | `1000`

| *pause-if-no-requests* | Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.

Type: _boolean_ | false | `true`

|===
136 changes: 136 additions & 0 deletions docs/src/main/asciidoc/smallrye-kafka-outgoing.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
.Outgoing Attributes of the 'smallrye-kafka' connector
[cols="25, 30, 15, 20",options="header"]
|===
|Attribute (_alias_) | Description | Mandatory | Default

| *acks* | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all

Type: _string_ | false | `1`

| *bootstrap.servers*

_(kafka.bootstrap.servers)_ | A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: _string_ | false | `localhost:9092`

| *buffer.memory* | The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

Type: _long_ | false | `33554432`

| *close-timeout* | The amount of milliseconds waiting for a graceful shutdown of the Kafka producer

Type: _int_ | false | `10000`

| *cloud-events* | Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: _boolean_ | false | `true`

| *cloud-events-data-content-type*

_(cloud-events-default-data-content-type)_ | Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself

Type: _string_ | false |

| *cloud-events-data-schema*

_(cloud-events-default-data-schema)_ | Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself

Type: _string_ | false |

| *cloud-events-insert-timestamp*

_(cloud-events-default-timestamp)_ | Whether or not the connector should insert automatically the `time` attribute` into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself

Type: _boolean_ | false | `true`

| *cloud-events-mode* | The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record

Type: _string_ | false | `binary`

| *cloud-events-source*

_(cloud-events-default-source)_ | Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself

Type: _string_ | false |

| *cloud-events-subject*

_(cloud-events-default-subject)_ | Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself

Type: _string_ | false |

| *cloud-events-type*

_(cloud-events-default-type)_ | Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself

Type: _string_ | false |

| *health-enabled* | Whether health reporting is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *health-readiness-enabled* | Whether readiness health reporting is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *health-readiness-timeout* | _deprecated_ - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.

Type: _long_ | false |

| *health-readiness-topic-verification* | _deprecated_ - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.

Type: _boolean_ | false |

| *health-topic-verification-enabled* | Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.

Type: _boolean_ | false | `false`

| *health-topic-verification-timeout* | During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: _long_ | false | `2000`

| *key* | A key to used when writing the record

Type: _string_ | false |

| *key.serializer* | The serializer classname used to serialize the record's key

Type: _string_ | false | `org.apache.kafka.common.serialization.StringSerializer`

| *max-inflight-messages* | The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to `0` remove the limit

Type: _long_ | false | `1024`

| *merge* | Whether the connector should allow multiple upstreams

Type: _boolean_ | false | `false`

| *partition* | The target partition id. -1 to let the client determine the partition

Type: _int_ | false | `-1`

| *propagate-record-key* | Propagate incoming record key to the outgoing record

Type: _boolean_ | false | `false`

| *retries* | If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by `delivery.timeout.ms`.

Type: _long_ | false | `2147483647`

| *topic* | The consumed / populated Kafka topic. If neither this property nor the `topics` properties are set, the channel name is used

Type: _string_ | false |

| *tracing-enabled* | Whether tracing is enabled (default) or disabled

Type: _boolean_ | false | `true`

| *value.serializer* | The serializer classname used to serialize the payload

Type: _string_ | true |

| *waitForWriteCompletion* | Whether the client waits for Kafka to acknowledge the written record before acknowledging the message

Type: _boolean_ | false | `true`

|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.kubernetes.spi;

import io.quarkus.builder.item.SimpleBuildItem;

public final class KubernetesHealthStartupPathBuildItem extends SimpleBuildItem {

private final String path;

public KubernetesHealthStartupPathBuildItem(String path) {
this.path = path;
}

/**
* @return the path
*/
public String getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class HealthOpenAPIFilter implements OASFilter {
private final String rootPath;
private final String livenessPath;
private final String readinessPath;
private final String startupPath;

public HealthOpenAPIFilter(String rootPath, String livenessPath, String readinessPath) {
public HealthOpenAPIFilter(String rootPath, String livenessPath, String readinessPath, String startupPath) {
this.rootPath = rootPath;
this.livenessPath = livenessPath;
this.readinessPath = readinessPath;
this.startupPath = startupPath;
}

@Override
Expand All @@ -64,6 +66,9 @@ public void filterOpenAPI(OpenAPI openAPI) {

// Readiness
paths.addPathItem(readinessPath, createReadinessPathItem());

// Startup
paths.addPathItem(startupPath, createStartupPathItem());
}

private PathItem createHealthPathItem() {
Expand Down Expand Up @@ -93,12 +98,21 @@ private PathItem createReadinessPathItem() {
return pathItem;
}

private PathItem createStartupPathItem() {
PathItem pathItem = new PathItemImpl();
pathItem.setDescription("MicroProfile Health - Startup Endpoint");
pathItem.setSummary(
"Startup checks are an used to tell when the application has started");
pathItem.setGET(createStartupOperation());
return pathItem;
}

private Operation createHealthOperation() {
Operation operation = new OperationImpl();
operation.setDescription("Check the health of the application");
operation.setOperationId("microprofile_health_root");
operation.setTags(MICROPROFILE_HEALTH_TAG);
operation.setSummary("An aggregated view of the Liveness and Readiness of this application");
operation.setSummary("An aggregated view of the Liveness, Readiness and Startup of this application");
operation.setResponses(createAPIResponses());
return operation;
}
Expand All @@ -123,6 +137,16 @@ private Operation createReadinessOperation() {
return operation;
}

private Operation createStartupOperation() {
Operation operation = new OperationImpl();
operation.setDescription("Check the startup of the application");
operation.setOperationId("microprofile_health_startup");
operation.setTags(MICROPROFILE_HEALTH_TAG);
operation.setSummary("The Startup check of this application");
operation.setResponses(createAPIResponses());
return operation;
}

private APIResponses createAPIResponses() {
APIResponses responses = new APIResponsesImpl();
responses.addAPIResponse("200", createAPIResponse("OK"));
Expand Down
Loading