An RxJava API for handling MQTT messages with an implementation using the Paho Java MQTT library.
The main interface is ObservableMqttClient
, which can be used to connect, publish and subscribe to a broker in a reactive way.
In previous versions, the same MqttMessage
class was used for both publishing and subscribing. However, some of the fields in a message received by a subscriber are not needed by a publisher - specifically, the message id. Message identifier is set by the underlying MQTT server, not the publishing client.
Additionally, the topic that the message was received on is useful to a subscriber but redundant for a publisher.
Hence MqttMessage
is now split into sub-interfaces: PublishMessage
and SubscribeMessage
. Only SubscribeMessage
requires a message id or topic.
In the 1.0.x branch, all of the ObservableMqttClient
methods returned an Observable<T>
. This was not ideal as some did not return any data (e.g. Observable<Void>
) or returned just a single value (e.g Observable<PublishToken>
).
With the release of RxJava 2.0 there are new reactive types that can be used to better model these methods. In the 1.1.0 release, the ObservableMqttClient
API has changed to make use of more appropriate reactive types from RxJava 2.
- Methods that previously returned
Observable<Void>
now returnCompletable
. This new type is described in the What's Different in 2.0 document for RxJava. This is better from an idiomatic reactive programming perspective. In fact, this change is essential because RxJava 2 no longer supports null. Methods affected are connect(), disconnect(), unsubscribe() and close(). If you were handling onNext() for some reason, move this code to onComplete(). - The publish() method that returned a 'one-shot'
Observable<PublishToken>
now usesSingle<PublishToken>
. If you were handling both onComplete() and onNext(), merge this code together as aSingle<T>
does both at the same time. - The subscribe() method now returns a
Flowable<MqttMessage>
rather than anObservable<MqttMessage>
.Flowable<T>
is a more suitable choice for supporting backpressure. You shouldn't need to change your code unless you want to support backpressure, in which case you should pass aFlowableSubscriber<MqttMessage>
toFlowable.subscribe(FlowableSubscriber<MqttMessage>)
(see below for more on backpressure).
The subscribe() methods of ObservableMqttClient
return a Flowable<MqttMessage>
using the BackpressureStrategy
provided (the default will be BackpressureStrategy.BUFFER
). Behind the scenes, this is done using the Flowable.create(FlowableEmitter<T>, BackpressureStrategy)
factory method described in the RxJava 2 documentation.
If you subscribe to this Flowable<MqttMessage>
with a FlowableSubscriber<MqttMessage>
the BackpressureStrategy
will be applied if you are unable to request messages fast enough. A FlowableSubscriber<T>
signals that it is ready to receive data by calling request() on the Subscription
to the Publisher<T>
. This is described in the RxJava documentation.
Note that the other subscribe(xxx) methods of Flowable<T>
do not apply backpressure.
The default Paho implementation of ObservableMqttClient
can be obtained like this:
final IMqttAsyncClient paho = new MqttAsyncClient(...);
final ObservableMqttClient client = PahoObservableMqttClient.builder(paho)
... // Customise
.build();
The builder allows you to override the default BackpressureStrategy
if desired (see above for a description of how backpressure works).
Asynchronously connect to the broker using an RxJava Completable
.
client.connect().subscribe(() -> {
// do something on completion
}, e -> {
// do something on error
});
Asynchronously publish a message to the broker using an RxJava Single
.
final PublishMessage msg = PublishMessage.create(...);
client.publish("mytopic", msg).subscribe(t -> {
// do something on success
}, e -> {
// do something on error
});
Asynchronously subscribe to a topic (or topics) using an RxJava Flowable<T>
. For each message received, the subscriber is called with the message. The QOS level desired can be passed along with the topic.
client.subscribe("mytopic", 1).subscribe(msg -> {
// do something with message
final byte[] body = msg.getPayload();
}, e -> {
// do something on error
});
Asynchronously unsubscribe from a topic (or topics) using an RxJava Completable
.
client.unsubscribe("mytopic").subscribe(() -> {
// do something on disconnect completion
}, e -> {
// do something on error
});
Asynchronously disconnect from the broker using an RxJava Completable
.
client.disconnect().subscribe(() -> {
// do something on disconnect completion
}, e -> {
// do something on error
});
Asynchronously close the client and relase all resources using an RxJava Completable
.
client.close().subscribe(() -> {
// do something on close completion
}, e -> {
// do something on error
});
The binaries for each release should be available in Maven Central.
<dependency>
<groupId>net.eusashead.mqtt</groupId>
<artifactId>rxmqtt</artifactId>
<version>x.y.z.RELEASE</version>
</dependency>