Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic reconnection #287

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Implement automatic reconnection #287

wants to merge 1 commit into from

Conversation

empicano
Copy link
Collaborator

Hi there @frederikaalund @JonathanPlasse 😊

Frederik drafted how reconnection could look like a while ago already (thank you again, master of asyncio 🙏😄). I finally had some time to hack around with this! Some thoughts:

  • I'd like to avoid having two clients (one that reconnects and one that doesn't) because I suspect that this is going to be confusing to use and lead to more maintenance in the future. I'm thus trying to implement it as a reconnect=True parameter to the client and leave all method signatures the same.
  • Consequently, this means that all publish/subscribe/etc. calls have to be asynchronous, contrary to Frederik's original design. Instead of queuing the calls up, I'm thinking of blocking them and returning only when the connection returns and the call goes through.
  • This would eliminate the flip side that you mentioned as well, Frederik: "The user does not know if/when their events actually go through."

For now, I've implemented the reconnection background task and adapted the publish method to wait until the connection returns and the message could actually be published (both are probably still full of bugs). You can play around with it by shutting a local MQTT broker on and off (e.g. with ./scripts/develop) and by running:

import aiomqtt
import asyncio


async def main():
    client = aiomqtt.Client("localhost", 1883, reconnect=True, timeout=2)
    async with client:
        while True:
            await asyncio.sleep(1)
            print("Publishing message...")
            await client.publish("test", "Hello, World!")
            print("Message published.")


asyncio.run(main())

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

Happy to hear what your thoughts are on this (or anyone else's who wants to chip in) 😊

@frederikaalund
Copy link
Member

Thank you for looking into this! 👍 Automatic reconnect is definitely on the wish list of many of our users. :)

I do like the idea to have "everything in one client" as opposed to my approach with a separate low-level and high-level client. At least the single-client approach is easier to grok for our users. 👍

I looked through the code, and it looks good. I'm (as always) a little bit concerned about adding extra internal state to our client (the new reconnect task). Increases the maintenance burden. In any case, for this feature I don't think we can avoid it if we want to use a single client. 😄

Aside: There is a lot of "reset state" going on (re-creating the futures). That's not your fault (or the fault of this PR) but my own fault (using futures in the first place). 😅 I strongly suggest that we look at alternatives for all the internal futures and background tasks. I'm thinking anyio (to no ones surprise I guess). :)

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

That is a concern of mine as well! Indeed, we would have to save all the subscriptions inside the client and then "resubscribe" when the connection is back online. These subscriptions are even more state to manage. :)


Here is a suggestion that is a slight variation of the current approach:

  • Single public client (that now can also automatically reconnect)
    • Rename the existing Client to _InternalClient (or something like that)
    • Create a new Client class with the exact same API as the existing client (our users won't see a difference!)
    • Use streams and broadcast mechanisms in the new Client to implement the reconnect logic.

This way, we get the separation of high-level and low-level and the easier maintenance that follows. It also allows allows to replace the _InternalClient with a new implementation in the future (maybe written in rust, using anyio, or something else).

Consequently, this means that all publish/subscribe/etc. calls have to be asynchronous, contrary to Frederik's original design. Instead of queuing the calls up, I'm thinking of blocking them and returning only when the connection returns and the call goes through.

I see the benefits of this approach! 👍 I suggest that we do both 😄 That is, to have both async def publish() and def publish_nowait(). This mimicks how both asyncio and anyio handles the "do you want to wait or not" question. In turn, it let's the user get to decide whether they want to wait for the connection or not. :)

Again, thank you for looking into this issue and very well done on the draft implementation. 👍 Let me know what you think of my comments above and do say if you have any questions. 😄

@empicano
Copy link
Collaborator Author

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me 😉

You're right about the internal state getting slightly out of control. While implementing this draft I already got bitten resetting futures while they were awaited elsewhere (which raises CancelledError) 😅

Use streams and broadcast mechanisms in the new Client to implement the reconnect logic.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class 🙂

I'm on board with the publish and publish_nowait design as well 👍 I can imagine how these can be implemented in the reconnection case, but I wonder how publish_nowait could work when the client shouldn't reconnect. (How) Do we fail in the case the client is disconnected? Given that we probably work through the queued messages in the background without awaiting the task anywhere, throwing an exception won't show to the user, or can we propagate it somehow?

@frederikaalund
Copy link
Member

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me 😉

Thank you for saying that. 😄 I don't have that much time these days but I do try to find it anyhow to at least do these reviews. :) It's a bit easier here during Easter.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class 🙂

I'm was leaning towards doing "streams and broadcasts" to do the reconnection itself (like in my sample code). E.g., keep the current Client class more or less as is. Mostly, I'm worried about whether we (not just you and the code in this PR but all of us maintainers) can maintain the solution going forward. It's difficult enough as it is with all the internal futures and "partial reset state". Specifically, I'm concerned whether some task may await a future (e.g., _connected or _disconnected) while another task accidentally resets it (assigns a new value to _connected leaving the task to hang forever). I have similar worries for our _lock that we manually acquire and release. Do you get me? 😄


In any case, let's be pragmatic and review our options here:

  1. Continue with the current implementation in this PR (single client with reconnect=True flag, futures, background tasks, state resets).
    • Pro: We already have most of the implementation (thanks to this PR! 🙏)
    • Con: Client is more difficult to maintain now that the logic is more involved.
  2. Address the maintenance concerns first as another PR and build on that. E.g., split the client into a low-level and high-level implementation, and/or, use anyio instead of all the futures/asyncio.Tasks.
    • Pro: It's easier to maintain this project going forward and it's easier to add new features without accidentally breaking stuff.
    • Con: It takes a lot of time to write this. It'll delay the reconnection feature.

So in the larger perspective (time and resources being essential) I do actually lean towards option (1). Option 1 provides value here and now at the cost of future maintenance. 😄

If you agree, I think the next steps is to write out some test cases for this (to mitigate the maintenance cost). With tests in place, the pro-con calculation becomes easy since the maintenance cost goes towards zero. 😉

Again, thank you for all the time that you put into this PR (and the aiomqtt project in general). 👍 Let me know if you have any comments or questions. :)

@empicano
Copy link
Collaborator Author

Thanks for elaborating, I think I understand what you mean now 😊 I'll play around with streams and broadcasts to understand them a little better and see how much work the second option would bring! You convinced me that that's the better option 😄

I'll report back once I have more, could take a bit, though, as I'm busy the next few days 😋

@spacemanspiff2007
Copy link
Contributor

spacemanspiff2007 commented Apr 25, 2024

I just stumbled across this so sorry if I am late to the party. From my experience it might be better to split the client into a low level and high level because as mentioned it makes maintenance much easier, however I have no strong opinion on that.

While looking through the PR I am not sure I understood everything correctly:
It seems like publish will never return until the client is connected again possibly locking up the program. This is imho very unexpected.
Here I have a small program that only publishes and I just discard all messages on disconnect. I even made publish non-blocking and even non async through the use of a Queue.
On the other hand _messages still raises an error when the connection is lost. I think it would be good if the behavior of these two methods is aligned.
In case of a disconnect there will be a retry every two seconds. It would be nice if there is the possibility to provide something (e.g. generator) that backs up gracefully (e.g. 2, 4, 8 ... 300) secs delay.
Additionally it would be nice to have the possibility to specify an additional message that gets sent on graceful disconnect on the last will topic because last will is only used on abnormal disconnect.

I think he usage of the client is hard because there are multiple places where the disconnect error can occur. Typically I have a publish and a messages worker task and both can be the cause for a reconnect which I have to sync back to the task where I create the client and do the connect. Do you know by chance a good solution for that? I've come up with a quite complex solution, a stateful connection manager which connects and then creates the tasks. If one task throws an MqttError I cancel both tasks and try to trigger a reconnect.
Example from one application:
connection handler
publish task
messages task

Edit:
I think one of the reasons why it's currently hard is that the client object can not be reused.
On reconnect it's a new client and the method for processing messages has also be entered again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants