Skip to content

Commit

Permalink
[WFLY-15405] Add support for SmallRye Reactive Messaging AMQP Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
kabir committed Dec 11, 2023
1 parent a138b93 commit f859204
Show file tree
Hide file tree
Showing 53 changed files with 1,941 additions and 255 deletions.
66 changes: 66 additions & 0 deletions boms/common-expansion/pom.xml
Expand Up @@ -177,6 +177,18 @@
<artifactId>wildfly-microprofile-reactive-messaging</artifactId>
<version>${full.maven.version}</version>
</dependency>
<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down Expand Up @@ -882,6 +894,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<version>${version.io.smallrye.smallrye-mutiny-vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -954,7 +977,26 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
Expand Down Expand Up @@ -989,6 +1031,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down Expand Up @@ -1057,6 +1111,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
@@ -1,8 +1,8 @@
[[MicroProfile_Reactive_Messaging_SmallRye]]
= MicroProfile Reactive Messaging Subsystem Configuration
----
:smallrye-reactive-messaging-version: 3.6
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}.0
:smallrye-reactive-messaging-version: 4.5.0
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}
:eclipse-mp-reactive-messaging-api-version: 2.0
----

Expand All @@ -15,7 +15,7 @@ ifdef::env-github[]
endif::[]

Support for https://microprofile.io/project/eclipse/microprofile-reactive-messaging[MicroProfile Reactive Messaging] is
provided as a Tech Preview feature by the _microprofile-reactive-messaging-smallrye_ subsystem.
provided by the _microprofile-reactive-messaging-smallrye_ subsystem.

[[required-extension-microprofile-reactive-messaging-smallrye]]
== Required Extension
Expand Down Expand Up @@ -47,6 +47,8 @@ If you provision your own server and include the `microprofile-reactive-messagin

If you provision the `microprofile-reactive-messaging-kafka` Galleon layer it includes the modules to enable the Kafka connector functionality. The `microprofile-reactive-messaging-kafka` layer includes the `microprofile-reactive-messaging` layer which provides the core MicroProfile Reactive Messaging functionality.

Similarly, to enable the AMQP connector functionality, you need to provision the `microprofile-reactive-messaging-amqp` layer, which in turn includes the `microprofile-reactive-messaging` layer.

== Specification

WildFly's MicroProfile Reactive Messaging subsystem implements MicroProfile Reactive Messaging {eclipse-mp-reactive-messaging-api-version}, which adds support for asynchronous messaging support based on MicroProfile Reactive Streams Operators.
Expand Down Expand Up @@ -116,7 +118,7 @@ User's applications intended to return published values to users via e.g. Jakart
=== Connectors
MicroProfile Reactive Messaging is designed to be flexible enough to integrate with a wide variety of external messaging systems. This functionality is provided via 'connectors'.

The only included connector at the moment is the Kafka connector.
The only included connectors at the moment are the Kafka connector, and the AMQP connector.

Connectors are configured using MicroProfile Config. The property keys for the methods have some prefixes mandated by the MicroProfile Reactive Messaging Specification which lists these as:

Expand Down Expand Up @@ -181,7 +183,7 @@ Next we will briefly discuss each of these entries. Remember the `to` channel is

`mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer` tells the connector to use `IntegerDeserializer` to deserialize the values from the topic before calling the `receive()` method. You may implement your own deserializer by writing a class implementing `org.apache.kafka.common.serialization.Deserializer` and including it in the deployment.

In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka.html[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].
In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka/[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].

The prefixes discussed above are stripped off before passing the property to Kafka. The same happens for other configuration properties. See the Kafka documentation for more details about how to configure Kafka consumers and producers.

Expand Down Expand Up @@ -289,6 +291,72 @@ While we do expose the Kafka Clients jar in our BOMs, its usage is limited to
** `org.apache.kafka.common.serialization.Serializer`
** Implementatations of `org.apache.kafka.common.serialization.Deserializer` and `org.apache.kafka.common.serialization.Serializer` in the `org.apache.kafka.common.serialization` package

// New
==== AMQP Connector

An example of a minimal `microprofile-config.properties` file for AMQP for the example application shown previously:

```
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
```

Next we will briefly discuss each of these entries. Remember the `to` channel is on the `send()` method, and the `from` channel is on the `receive()` method.


'amqp-host=localhost' '`amqp-port=5672` points the connector to an AMQP broker running on `localhost:5672`. As before we could also have done these for an individual channel by for example specifying `mp.messaging.outgoing.to.host=localhost` instead. If the host is not specified, it defaults to `localhost.

`mp.messaging.outgoing.to.connector=smallrye-amqp` says that we want to use AMQP to back the `to` channel. Note that the value `smallrye-amqp` is SmallRye Reactive Messaging specific, and will only be understood if the AMQP connector is enabled.

`mp.messaging.outgoing.to.address=my-topic` says that we will send data via `to` channel to the AMQP queue on address called `my-topic`.

`mp.messaging.incoming.from.connector=smallrye-amqp` says that we want to use AMQP to back the `from` channel. As above, the value `smallrye-amqp` is SmallRye Reactive Messaging specific.

`mp.messaging.incoming.from.address=my-topic` says says that the channel named `from` will read data from the AMQP topic (or queue) on address called `my-topic`.


The full set of properties understood by the SmallRye Reactive Messaging's AMQP connector can be found in the SmallRye Reactive Messaging AMQP connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/amqp/amqp/[documentation].

The prefixes discussed above are stripped off before passing the property to the AMQP connector.


===== Connecting to a secure AMQP broker
If connecting to a Kafka instance secured with SSL and SASL, the following example 'microprofile-config.properties' will help you get started. There are a few new properties. We are showing them on the connector level but they could equally well be defined on the channel level (i.e. with the `mp.messaging.outgoing.to-amqp.` and `mp.messaging.incoming.from-amqp.` prefixes from the previous examples rather than the connector-wide `mp.messaging.connector.smallrye-kafka` prefix).

[source]
----
# As seen above
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis
# New entries
amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=test
# Channel configuration would follow here, but is left out for brevity
----
//
//mp.messaging.outgoing.to.connector=smallrye-amqp
//mp.messaging.outgoing.to.address=my-topic
//
//mp.messaging.incoming.from.connector=smallrye-amqp
//mp.messaging.incoming.from.address=my-queue
Each of the new lines has the following meaning:

* `amqp-use-ssl=true` - specifies that we want to use a secure connection when connecting to the broker.
* `mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=test` - this is not needed if AMQ broker is secured with a CA signed certificate. If you are using self-signed certificates, you will need to specify a truststore in the Elytron subsystem, and create an `SSLContext` referencing that. The value of this property is used to look up the `SSLContext` in the Elytron subsystem under `/subsystem=elytron/client-ssl-context=*` in the WildFly management model. In this case the property value is `test`, so we look up the `SSLContext` defined by `/subsystem=elytron/client-ssl-context=test` and use that configure the truststore to use for the connection to AMQ broker.

Instead of configuring these properties on the connector level, we could also have defined them on the individual channels. E.g.: `mp.messaging.incoming.from.wildfly.elytron.ssl.context=test` would choose the `test` SSLContext for the `from` incoming channel.

== Component Reference

Expand Down
55 changes: 55 additions & 0 deletions galleon-pack/galleon-shared/pom.xml
Expand Up @@ -363,6 +363,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -407,6 +418,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
Expand Down Expand Up @@ -451,6 +473,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand All @@ -473,6 +506,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -760,6 +804,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->
<layer-spec xmlns="urn:jboss:galleon:layer-spec:2.0" name="microprofile-reactive-messaging-amqp">
<props>
<prop name="org.wildfly.rule.add-on-depends-on" value="all-dependencies"/>
<prop name="org.wildfly.rule.add-on" value="reactive-messaging,amqp"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-property" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.connector.smallrye-amqp.*"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-outgoing" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.outgoing.*.connector,smallrye-amqp"/>
<prop name="org.wildfly.rule.properties-file-match-mp-amqp-incoming" value="[/META-INF/microprofile-config.properties,/WEB-INF/classes/META-INF/microprofile-config.properties],mp.messaging.incoming.*.connector,smallrye-amqp"/>
<prop name="org.wildfly.rule.add-on-description" value="Support for the MicroProfile Reactive Messaging AMQP connector."/>
</props>
<dependencies>
<layer name="microprofile-reactive-messaging"/>
</dependencies>

<packages>
<package name="io.smallrye.reactive.messaging.connector.amqp"/>
<package name="org.wildfly.reactive.messaging.amqp"/>
</packages>
</layer-spec>
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->
<module xmlns="urn:jboss:module:1.9" name="io.smallrye.reactive.messaging.connector.amqp">
<properties>
<property name="jboss.api" value="private"/>
</properties>

<resources>
<artifact name="${io.smallrye.reactive:smallrye-reactive-messaging-amqp}"/>
</resources>
<dependencies>
<module name="io.netty.netty-handler"/>
<module name="io.opentelemetry.api"/>
<module name="io.opentelemetry.context"/>
<module name="io.reactivex.rxjava2.rxjava"/>
<module name="io.smallrye.common.annotation"/>
<module name="io.smallrye.config" services="import"/>
<module name="io.smallrye.reactive.converters.api"/>
<module name="io.smallrye.reactive.messaging"/>
<module name="io.smallrye.reactive.mutiny"/>
<module name="io.smallrye.reactive.mutiny.reactive-streams-operators"/>
<module name="io.smallrye.reactive.mutiny.vertx-core"/>
<module name="io.smallrye.reactive.mutiny.vertx-amqp-client"/>
<module name="io.smallrye.reactive.mutiny.zero"/>
<module name="io.smallrye.reactive.mutiny.zero-flow-adapters"/>
<module name="io.vertx.client.amqp"/>
<module name="io.vertx.core"/>
<module name="io.vertx.proton"/>
<module name="javax.annotation.api"/>
<module name="javax.enterprise.api" />
<module name="javax.inject.api" />
<module name="org.apache.qpid.proton"/>
<module name="org.eclipse.microprofile.config.api"/>
<module name="org.eclipse.microprofile.reactive-messaging.api"/>
<module name="org.eclipse.microprofile.reactive-streams-operators.api"/>
<module name="org.eclipse.microprofile.reactive-streams-operators.core" services="import"/>
<module name="org.jboss.logging" />
<module name="org.jboss.weld.api"/>
<module name="org.jboss.weld.core"/>
<module name="org.jboss.weld.spi"/>
<module name="org.wildfly.reactive.messaging.amqp"/>
<module name="org.reactivestreams"/>
<module name="org.slf4j"/>
</dependencies>
</module>
Expand Up @@ -3,7 +3,6 @@
~ Copyright The WildFly Authors
~ SPDX-License-Identifier: Apache-2.0
-->

<module xmlns="urn:jboss:module:1.9" name="io.smallrye.reactive.messaging.connector">
<properties>
<property name="jboss.api" value="private"/>
Expand Down

0 comments on commit f859204

Please sign in to comment.