Skip to content

Commit

Permalink
[WFLY-15405] Add support for SmallRye Reactive Messaging AMQP Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
kabir committed Apr 4, 2023
1 parent a2185bf commit 6857ad1
Show file tree
Hide file tree
Showing 52 changed files with 2,133 additions and 130 deletions.
95 changes: 95 additions & 0 deletions boms/common-expansion/pom.xml
Expand Up @@ -74,6 +74,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down Expand Up @@ -266,6 +278,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<version>${version.io.smallrye.smallrye-mutiny-vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -333,6 +356,66 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka-api</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${version.io.smallrye.smallrye-reactive-messaging}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down Expand Up @@ -401,6 +484,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${version.io.vertx.vertx}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
@@ -1,8 +1,8 @@
[[MicroProfile_Reactive_Messaging_SmallRye]]
= MicroProfile Reactive Messaging Subsystem Configuration
----
:smallrye-reactive-messaging-version: 3.6
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}.0
:smallrye-reactive-messaging-version: 4.1.0
:smallrye-reactive-messaging-tag: {smallrye-reactive-messaging-version}
:eclipse-mp-reactive-messaging-api-version: 2.0
----

Expand All @@ -15,7 +15,7 @@ ifdef::env-github[]
endif::[]

Support for https://microprofile.io/project/eclipse/microprofile-reactive-messaging[MicroProfile Reactive Messaging] is
provided as a Tech Preview feature by the _microprofile-reactive-messaging-smallrye_ subsystem.
provided _microprofile-reactive-messaging-smallrye_ subsystem.

[[required-extension-microprofile-reactive-messaging-smallrye]]
== Required Extension
Expand Down Expand Up @@ -47,6 +47,8 @@ If you provision your own server and include the `microprofile-reactive-messagin

If you provision the `microprofile-reactive-messaging-kafka` Galleon layer it includes the modules to enable the Kafka connector functionality. The `microprofile-reactive-messaging-kafka` layer includes the `microprofile-reactive-messaging` layer which provides the core MicroProfile Reactive Messaging functionality.

Similarly, to enable the AMQP connector functionality, you need to provision the `microprofile-reactive-messaging-amqp` layer, which in turn includes the `microprofile-reactive-messaging` layer.

== Specification

WildFly's MicroProfile Reactive Messaging subsystem implements MicroProfile Reactive Messaging {eclipse-mp-reactive-messaging-api-version}, which adds support for asynchronous messaging support based on MicroProfile Reactive Streams Operators.
Expand Down Expand Up @@ -116,7 +118,7 @@ User's applications intended to return published values to users via e.g. Jakart
=== Connectors
MicroProfile Reactive Messaging is designed to be flexible enough to integrate with a wide variety of external messaging systems. This functionality is provided via 'connectors'.

The only included connector at the moment is the Kafka connector.
The only included connectors at the moment are the Kafka connector, and the AMQP connector.

Connectors are configured using MicroProfile Config. The property keys for the methods have some prefixes mandated by the MicroProfile Reactive Messaging Specification which lists these as:

Expand Down Expand Up @@ -181,7 +183,7 @@ Next we will briefly discuss each of these entries. Remember the `to` channel is

`mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer` tells the connector to use `IntegerDeserializer` to deserialize the values from the topic before calling the `receive()` method. You may implement your own deserializer by writing a class implementing `org.apache.kafka.common.serialization.Deserializer` and including it in the deployment.

In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka.html[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].
In addition to the above, Apache Kafka, and SmallRye Reactive Messaging's Kafka connector understand a lot more properties. These can be found in the SmallRye Reactive Messaging Kafka connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/kafka/kafka/[documentation], and in the Apache Kafka documentation for the https://kafka.apache.org/documentation/#producerconfigs[producers] and the https://kafka.apache.org/documentation/#consumerconfigs[consumers].

The prefixes discussed above are stripped off before passing the property to Kafka. The same happens for other configuration properties. See the Kafka documentation for more details about how to configure Kafka consumers and producers.

Expand Down Expand Up @@ -289,6 +291,72 @@ While we do expose the Kafka Clients jar in our BOMs, its usage is limited to
** `org.apache.kafka.common.serialization.Serializer`
** Implementatations of `org.apache.kafka.common.serialization.Deserializer` and `org.apache.kafka.common.serialization.Serializer` in the `org.apache.kafka.common.serialization` package

// New
==== AMQP Connector

An example of a minimal `microprofile-config.properties` file for AMQP for the example application shown previously:

```
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
```

Next we will briefly discuss each of these entries. Remember the `to` channel is on the `send()` method, and the `from` channel is on the `receive()` method.


'amqp-host=localhost' '`amqp-port=5672` points the connector to an AMQP broker running on `localhost:5672`. As before we could also have done these for an individual channel by for example specifying `mp.messaging.outgoing.to.host=localhost` instead. If the host is not specified, it defaults to `localhost.

`mp.messaging.outgoing.to.connector=smallrye-amqp` says that we want to use AMQP to back the `to` channel. Note that the value `smallrye-amqp` is SmallRye Reactive Messaging specific, and will only be understood if the AMQP connector is enabled.

`mp.messaging.outgoing.to.address=my-topic` says that we will send data to the AMQP topic (or queue) called `my-topic`.

`mp.messaging.incoming.from.connector=smallrye-amqp` says that we want to use Kafka to back the `from` channel. As above, the value `smallrye-amqp` is SmallRye Reactive Messaging specific.

`mp.messaging.incoming.from.topic=my-topic` says that we will read data from the AMQP topic (or queue) called `my-topic`.


The full set of properties understood by the SmallRye Reactive Messaging's Kafka connector can be found in the SmallRye Reactive Messaging AMQP connector https://smallrye.io/smallrye-reactive-messaging/{smallrye-reactive-messaging-version}/amqp/amqp/[documentation].

The prefixes discussed above are stripped off before passing the property to the AMQP connector.


===== Connecting to a secure AMQP broker
If connecting to a Kafka instance secured with SSL and SASL, the following example 'microprofile-config.properties' will help you get started. There are a few new properties. We are showing them on the connector level but they could equally well be defined on the channel level (i.e. with the `mp.messaging.outgoing.to-kafka.` and `mp.messaging.incoming.from-kafka.` prefixes from the previous examples rather than the connector-wide `mp.messaging.connector.smallrye-kafka` prefix).

[source]
----
# As seen above
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis
# New entries
amqp-use-ssl=true
amqp-wildfly.elytron.ssl.context=test
# Channel configuration would follow here, but is left out for brevity
----
//
//mp.messaging.outgoing.to.connector=smallrye-amqp
//mp.messaging.outgoing.to.address=my-topic
//
//mp.messaging.incoming.from.connector=smallrye-amqp
//mp.messaging.incoming.from.address=my-topic
Each of the new lines has the following meaning:

* `amqp-use-ssl=true` - specifies that we want to use a secure connection when connecting to the broker.
* `amqp-wildfly.elytron.ssl.context=test` - this is not needed if Kafka is secured with a CA signed certificate. If you are using self-signed certificates, you will need to specify a truststore in the Elytron subsystem, and create an `SSLContext` referencing that. The value of this property is used to look up the `SSLContext` in the Elytron subsystem under `/subsystem=elytron/client-ssl-context=*` in the WildFly management model. In this case the property value is `test`, so we look up the `SSLContext` defined by `/subsystem=elytron/client-ssl-context=test` and use that configure the truststore to use for the connection to Kafka.

Instead of configuring these properties on the connector level, we could also have defined them on the individual connectors. E.g.: `mp.messaging.incoming.from.wildfly.elytron.ssl.context=test` would choose the `test` SSLContext for the `from` incoming connector.

== Component Reference

Expand Down
55 changes: 55 additions & 0 deletions microprofile/galleon-common/pom.xml
Expand Up @@ -470,6 +470,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-amqp-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
Expand Down Expand Up @@ -514,6 +525,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
Expand Down Expand Up @@ -583,6 +605,17 @@
<artifactId>lra-coordinator-jar</artifactId>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand All @@ -605,6 +638,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -742,6 +786,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-amqp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${full.maven.groupId}</groupId>
<artifactId>wildfly-microprofile-reactive-messaging-common</artifactId>
Expand Down
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ JBoss, Home of Professional Open Source.
~ Copyright 2020, Red Hat, Inc., and individual contributors
~ as indicated by the @author tags. See the copyright.txt file in the
~ distribution for a full listing of individual contributors.
~
~ This is free software; you can redistribute it and/or modify it
~ under the terms of the GNU Lesser General Public License as
~ published by the Free Software Foundation; either version 2.1 of
~ the License, or (at your option) any later version.
~
~ This software is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
~ Lesser General Public License for more details.
~
~ You should have received a copy of the GNU Lesser General Public
~ License along with this software; if not, write to the Free
~ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
~ 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-->
<layer-spec xmlns="urn:jboss:galleon:layer-spec:1.0" name="microprofile-reactive-messaging-amqp">
<dependencies>
<layer name="microprofile-reactive-messaging"/>
</dependencies>

<packages>
<package name="io.smallrye.reactive.messaging.connector.amqp"/>
<package name="org.wildfly.reactive.messaging.amqp"/>
</packages>
</layer-spec>

0 comments on commit 6857ad1

Please sign in to comment.