Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WFLY-15405] AMQP Connector for reactive messaging #16281

Merged
merged 3 commits into from Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 66 additions & 0 deletions boms/common-expansion/pom.xml
Expand Up @@ -177,6 +177,18 @@
<artifactId>wildfly-microprofile-reactive-messaging</artifactId>
<version>${full.maven.version}</version>
</dependency>
<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down Expand Up @@ -882,6 +894,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<version>${version.io.smallrye.smallrye-mutiny-vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -954,7 +977,26 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
Expand Down Expand Up @@ -989,6 +1031,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down Expand Up @@ -1079,6 +1133,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
@@ -1,8 +1,8 @@
[[MicroProfile_Reactive_Messaging_SmallRye]]
= MicroProfile Reactive Messaging Subsystem Configuration
----
:smallrye-reactive-messaging-version: 3.6
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}.0
:smallrye-reactive-messaging-version: 4.5.0
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}
:eclipse-mp-reactive-messaging-api-version: 2.0
----

Expand All @@ -15,7 +15,7 @@ ifdef::env-github[]
endif::[]

Support for https://microprofile.io/project/eclipse/microprofile-reactive-messaging[MicroProfile Reactive Messaging] is
provided as a Tech Preview feature by the _microprofile-reactive-messaging-smallrye_ subsystem.
provided by the _microprofile-reactive-messaging-smallrye_ subsystem.

[[required-extension-microprofile-reactive-messaging-smallrye]]
== Required Extension
Expand Down Expand Up @@ -47,6 +47,8 @@ If you provision your own server and include the `microprofile-reactive-messagin

If you provision the `microprofile-reactive-messaging-kafka` Galleon layer it includes the modules to enable the Kafka connector functionality. The `microprofile-reactive-messaging-kafka` layer includes the `microprofile-reactive-messaging` layer which provides the core MicroProfile Reactive Messaging functionality.

Similarly, to enable the AMQP connector functionality, you need to provision the `microprofile-reactive-messaging-amqp` layer, which in turn includes the `microprofile-reactive-messaging` layer.

== Specification

WildFly's MicroProfile Reactive Messaging subsystem implements MicroProfile Reactive Messaging {eclipse-mp-reactive-messaging-api-version}, which adds support for asynchronous messaging support based on MicroProfile Reactive Streams Operators.
Expand Down Expand Up @@ -116,7 +118,7 @@ User's applications intended to return published values to users via e.g. Jakart
=== Connectors
MicroProfile Reactive Messaging is designed to be flexible enough to integrate with a wide variety of external messaging systems. This functionality is provided via 'connectors'.

The only included connector at the moment is the Kafka connector.
The only included connectors at the moment are the Kafka connector, and the AMQP connector.

Connectors are configured using MicroProfile Config. The property keys for the methods have some prefixes mandated by the MicroProfile Reactive Messaging Specification which lists these as:

Expand Down Expand Up @@ -181,7 +183,7 @@ Next we will briefly discuss each of these entries. Remember the `to` channel is

`mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer` tells the connector to use `IntegerDeserializer` to deserialize the values from the topic before calling the `receive()` method. You may implement your own deserializer by writing a class implementing `org.apache.kafka.common.serialization.Deserializer` and including it in the deployment.

In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka.html[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].
In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka/[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].

The prefixes discussed above are stripped off before passing the property to Kafka. The same happens for other configuration properties. See the Kafka documentation for more details about how to configure Kafka consumers and producers.

Expand Down Expand Up @@ -289,6 +291,72 @@ While we do expose the Kafka Clients jar in our BOMs, its usage is limited to
** `org.apache.kafka.common.serialization.Serializer`
** Implementatations of `org.apache.kafka.common.serialization.Deserializer` and `org.apache.kafka.common.serialization.Serializer` in the `org.apache.kafka.common.serialization` package

// New
==== AMQP Connector

An example of a minimal `microprofile-config.properties` file for AMQP for the example application shown previously:

```
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
```

Next we will briefly discuss each of these entries. Remember the `to` channel is on the `send()` method, and the `from` channel is on the `receive()` method.


'amqp-host=localhost' '`amqp-port=5672` points the connector to an AMQP broker running on `localhost:5672`. As before we could also have done these for an individual channel by for example specifying `mp.messaging.outgoing.to.host=localhost` instead. If the host is not specified, it defaults to `localhost.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kabir This can be done in a follow up fix, but whatever turns on italics here needs to be fixed.

Note -- please don't make any docs fixes I mention here on this PR unless you have to change something that justifies new CI. They can come later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bstansberry I've never seen the source be italic before, but it turns out it was due to some unclosed quotes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #17491


`mp.messaging.outgoing.to.connector=smallrye-amqp` says that we want to use AMQP to back the `to` channel. Note that the value `smallrye-amqp` is SmallRye Reactive Messaging specific, and will only be understood if the AMQP connector is enabled.

`mp.messaging.outgoing.to.address=my-topic` says that we will send data via `to` channel to the AMQP queue on address called `my-topic`.

`mp.messaging.incoming.from.connector=smallrye-amqp` says that we want to use AMQP to back the `from` channel. As above, the value `smallrye-amqp` is SmallRye Reactive Messaging specific.

`mp.messaging.incoming.from.address=my-topic` says says that the channel named `from` will read data from the AMQP topic (or queue) on address called `my-topic`.


The full set of properties understood by the SmallRye Reactive Messaging's AMQP connector can be found in the SmallRye Reactive Messaging AMQP connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/amqp/amqp/[documentation].

The prefixes discussed above are stripped off before passing the property to the AMQP connector.


===== Connecting to a secure AMQP broker
If connecting to a Kafka instance secured with SSL and SASL, the following example 'microprofile-config.properties' will help you get started. There are a few new properties. We are showing them on the connector level but they could equally well be defined on the channel level (i.e. with the `mp.messaging.outgoing.to-amqp.` and `mp.messaging.incoming.from-amqp.` prefixes from the previous examples rather than the connector-wide `mp.messaging.connector.smallrye-kafka` prefix).

[source]
----
# As seen above
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

# New entries
amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=test

# Channel configuration would follow here, but is left out for brevity
----
//
//mp.messaging.outgoing.to.connector=smallrye-amqp
//mp.messaging.outgoing.to.address=my-topic
//
//mp.messaging.incoming.from.connector=smallrye-amqp
//mp.messaging.incoming.from.address=my-queue
Each of the new lines has the following meaning:

* `amqp-use-ssl=true` - specifies that we want to use a secure connection when connecting to the broker.
* `mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=test` - this is not needed if AMQ broker is secured with a CA signed certificate. If you are using self-signed certificates, you will need to specify a truststore in the Elytron subsystem, and create an `SSLContext` referencing that. The value of this property is used to look up the `SSLContext` in the Elytron subsystem under `/subsystem=elytron/client-ssl-context=*` in the WildFly management model. In this case the property value is `test`, so we look up the `SSLContext` defined by `/subsystem=elytron/client-ssl-context=test` and use that configure the truststore to use for the connection to AMQ broker.

Instead of configuring these properties on the connector level, we could also have defined them on the individual channels. E.g.: `mp.messaging.incoming.from.wildfly.elytron.ssl.context=test` would choose the `test` SSLContext for the `from` incoming channel.

== Component Reference

Expand Down
55 changes: 55 additions & 0 deletions galleon-pack/galleon-shared/pom.xml
Expand Up @@ -365,6 +365,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<exclusions>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future fix:

'exclusions' should not appear here unless there is a good reason they don't appear in the bom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICR those boms did not exist when I worked on this code 1.5 years ago :-D

I'll leave it as-is, and have opened https://issues.redhat.com/browse/WFLY-18852 to clean up this pom since I see other exclusions than mine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a start in #17491

<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -409,6 +420,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
Expand Down Expand Up @@ -453,6 +475,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down Expand Up @@ -483,6 +516,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -764,6 +808,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->
<layer-spec xmlns="urn:jboss:galleon:layer-spec:2.0" name="microprofile-reactive-messaging-amqp">
<props>
<prop name="org.wildfly.rule.add-on-depends-on" value="all-dependencies"/>
<prop name="org.wildfly.rule.add-on" value="reactive-messaging,amqp"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-property" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.connector.smallrye-amqp.*"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-outgoing" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.outgoing.*.connector,smallrye-amqp"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-incoming" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.incoming.*.connector,smallrye-amqp"/>
<prop name="org.wildfly.rule.add-on-description" value="Support for the MicroProfile Reactive Messaging AMQP connector."/>
</props>
<dependencies>
<layer name="microprofile-reactive-messaging"/>
</dependencies>

<packages>
<package name="io.smallrye.reactive.messaging.connector.amqp"/>
<package name="org.wildfly.reactive.messaging.amqp"/>
</packages>
</layer-spec>
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->
<module xmlns="urn:jboss:module:1.9" name="io.smallrye.reactive.messaging.connector.amqp">
<properties>
<property name="jboss.api" value="private"/>
</properties>

<resources>
<artifact name="${io.smallrye.reactive:smallrye-reactive-messaging-amqp}"/>
</resources>
<dependencies>
<module name="io.netty.netty-handler"/>
<module name="io.opentelemetry.api"/>
<module name="io.opentelemetry.context"/>
<module name="io.reactivex.rxjava2.rxjava"/>
<module name="io.smallrye.common.annotation"/>
<module name="io.smallrye.config" services="import"/>
<module name="io.smallrye.reactive.converters.api"/>
<module name="io.smallrye.reactive.messaging"/>
<module name="io.smallrye.reactive.mutiny"/>
<module name="io.smallrye.reactive.mutiny.reactive-streams-operators"/>
<module name="io.smallrye.reactive.mutiny.vertx-core"/>
<module name="io.smallrye.reactive.mutiny.vertx-amqp-client"/>
<module name="io.smallrye.reactive.mutiny.zero"/>
<module name="io.smallrye.reactive.mutiny.zero-flow-adapters"/>
<module name="io.vertx.client.amqp"/>
<module name="io.vertx.core"/>
<module name="io.vertx.proton"/>
<module name="javax.annotation.api"/>
<module name="javax.enterprise.api" />
<module name="javax.inject.api" />
<module name="org.apache.qpid.proton"/>
<module name="org.eclipse.microprofile.config.api"/>
<module name="org.eclipse.microprofile.reactive-messaging.api"/>
<module name="org.eclipse.microprofile.reactive-streams-operators.api"/>
<module name="org.eclipse.microprofile.reactive-streams-operators.core" services="import"/>
<module name="org.jboss.logging" />
<module name="org.jboss.weld.api"/>
<module name="org.jboss.weld.core"/>
<module name="org.jboss.weld.spi"/>
<module name="org.wildfly.reactive.messaging.amqp"/>
<module name="org.reactivestreams"/>
<module name="org.slf4j"/>
</dependencies>
</module>
Expand Up @@ -3,7 +3,6 @@
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->

<module xmlns="urn:jboss:module:1.9" name="io.smallrye.reactive.messaging.connector">
<properties>
<property name="jboss.api" value="private"/>
Expand Down