Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client) add MQTT subscribe/unsubscribe APIs #154

Merged
merged 14 commits into from
Nov 24, 2020
Merged

feat(client) add MQTT subscribe/unsubscribe APIs #154

merged 14 commits into from
Nov 24, 2020

Conversation

lucasfernog
Copy link
Contributor

This PR introduces the integration with Hornet's MQTT interface. Here's an example of the usage:

use iota::{Client, Topic};

fn main() {
    let mut iota = Client::new() // Crate a client instance builder
        .node("http://0.0.0.0:14265") // Insert the node here
        .unwrap()
        .build()
        .unwrap();
    iota.subscriber()
        .topic(Topic::new("milestones/latest").unwrap())
        .topic(Topic::new("addresses/5d2684dfdbd52a2241e135c15f9c5ede2bca5599f810ddbc14c75948540345bf/outputs").unwrap())
        .subscribe(|event| println!("{:?}", event))
        .unwrap();
    loop {}
}

fn poll_mqtt(mqtt_topic_handlers: Arc<Mutex<TopicHandlerMap>>, client: &mut MqttClient) {
let receiver = client.start_consuming();
std::thread::spawn(move || {
while let Ok(message) = receiver.recv() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help check the while let loop will break when the all the topics are unsubscribed? (Like the mpsc Receiver will be dropped.) And it might be good to handle the corresponding Sender is disconnected.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why I point this out is that I saw this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should close the connection when all the topics are unsubscribed? for performance reasons I think we could expose a method to do it, because for instance the wallet will unsubscribe to all topics and then subscribe again frequently (on every sync).

Copy link
Collaborator

@bingyanglin bingyanglin Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we will subscribe/unsubscribe frequently I think it's good to remain the connection. Just we might need to expose a method to let the user disconnect and let the Receiver channel to be dropped, so as to break the while let loop and let the spawned thread be shut down gracefully. Or do we already have a way to let the spawned thread gracefully shut down if the user does not want to receive mqtt messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pushed the changes for it, yeah I had to drop the mqtt client instance in order to break from the while loop and shut down the thread. thanks for pointing that out, nice catch :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added a broker_options field to the client builder with a flag for the automatic disconnection when the subscriptions are empty (defaults to true), is that ok @fijter ?

@bingyanglin
Copy link
Collaborator

LGTM!

@lucasfernog lucasfernog merged commit 2367d71 into dev Nov 24, 2020
@lucasfernog lucasfernog deleted the feat/mqtt branch November 24, 2020 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants