Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt V5 improve automaticReconnect #3822

Closed
kroutal opened this issue Jun 7, 2022 · 5 comments
Closed

Mqtt V5 improve automaticReconnect #3822

kroutal opened this issue Jun 7, 2022 · 5 comments

Comments

@kroutal
Copy link

kroutal commented Jun 7, 2022

Expected Behavior
Mqtt V5 automaticReconnect should work when first connection fails.
This is related to : eclipse/paho.mqtt.java#930

Current Behavior
When application starts and server is down or something goes wrong during connection then In Mqttv5PahoMessageHandler/Mqttv5PahoMessageDrivenChannelAdapter there will never be a reconnection attempt even though automaticReconnect is set to true

Context

The only way to reconnect is to add a MqttConnectionFailedEvent listener. And recode the reconnection mechanism using Timers.
If this is the intended behavior then it should be documented.

@kroutal kroutal added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Jun 7, 2022
@artembilan
Copy link
Member

If such a behavior is not available in the Paho client, then I'm not sure what would you expect from us to here in our channel adapters.
And indeed the MqttConnectionFailedEvent you've mentioned should be a good workaround for time being.
This is the current logic in the Mqttv5PahoMessageHandler:

	protected void doStart() {
		try {
			this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
		}
		catch (MqttException ex) {
			ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
			if (applicationEventPublisher != null) {
				applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
			}
			logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'.");
		}
	}

so, if we fail to start/connect we emit that MqttConnectionFailedEvent.
And I believe it is explained well in docs:

MqttConnectionFailedEvent - published by both adapters if we fail to connect or a connection is subsequently lost.

Does it make sense?

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jun 7, 2022
@kroutal
Copy link
Author

kroutal commented Jun 7, 2022

Thanks for your answer.

I was thinking of something similar to what is done in the MQTT V3 channel adapters, where a scheduler is started when connection fails on startup:

Concerning the documentation, I was thinking of adding a disclaimer in this part of the documentation: https://docs.spring.io/spring-integration/docs/current/reference/html/mqtt.html#mqtt-v5.

  It is recommended to have the MqttConnectionOptions#setAutomaticReconnect(boolean) set to true to let an internal IMqttAsyncClient instance to handle reconnects. Otherwise, only the manual restart of these channel adapters can handle reconnects, e.g. via MqttConnectionFailedEvent handling on disconnection.

@artembilan
Copy link
Member

Yeah... I see your point now having that opened issue in Paho client if it fails to connect originally, it does not retry.
So, you probably are right and we need to retry ourselves.
Perhaps similar way as we do with v3 channel adapters.
And do that independently of the automaticReconnect option.

Let us know if you are willing to contribute the fix: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc !

@artembilan artembilan added backport 5.5.x and removed status: waiting-for-reporter Needs a feedback from the reporter labels Jun 7, 2022
@artembilan artembilan added this to the 6.0.0-M4 milestone Jun 7, 2022
@kroutal
Copy link
Author

kroutal commented Jun 9, 2022

I had a look at Paho Client implementation. The reconnect() will launch the scheduler. By calling this method once the behaviour will be as expected.
So an easier workaround could be

	protected void doStart() {
		try {
			this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
		}
		catch (MqttException ex) {
			if(this.connectionOptions.isAutomaticReconnect()){
                                //Add try catch
				this.mqttClient.reconnect();
			} else{
			ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
			if (applicationEventPublisher != null) {
				applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
			}
			logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'.");
                      }
		}
	}

I'm willing to contribute

@artembilan
Copy link
Member

Thanks, @kroutal !

Indeed the Paho Client does not move the connection to the proper state on a first attempt.
I see that they start ClientComms.ConnectBG which has a logic like:

		if (wasConnected && callback != null) {
			// Let the user know client has disconnected either normally or abnormally
			callback.connectionLost(reason, message);
		}

to call the mentioned reconnectTimer.schedule(new ReconnectTask(), reconnectDelay); background task.
But the problem that this wasConnected is not true on a first attempt:

wasConnected = (isConnected() || isDisconnecting());

It was not connected before and it is really not disconnecting at the moment.
The state is just CONNECTING 🤷

So, according to your investigation it is really correct to call the mentioned reconnect() from our doStart().
I think we need to check for this.connectionOptions.isAutomaticReconnect() when this reconnect() fails and only after that emit the event.
If it is automatic, it seems that the task is going to be scheduled and no error are thrown back to end-user.

So, it really would be great if you can contribute the fix and we will continue discuss details already in code.

kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 9, 2022
kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 11, 2022
kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 14, 2022
kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 14, 2022
kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 17, 2022
kroutal pushed a commit to kroutal/spring-integration that referenced this issue Jun 17, 2022
artembilan pushed a commit that referenced this issue Jun 17, 2022
Fixes: #3822

* Apply spring-framework code style on modified class
* Remove unwanted formatting
* Take pull request comments into account
* Code and JavaDocs clean up
* Improve `Mqttv5BackToBackAutomaticReconnectTests` removing non-related code
* Improve `mqtt.adoc` for this new manual reconnection feature

**Cherry-pick to `5.5.x`**
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Aug 9, 2022
Fixes spring-projects#3685

Other fixes and improvements after code review:
* Get manual `reconnect` invocation back for v3/v5 adapters and client
managers (see bug spring-projectsGH-3822 for a reasoning)
* Remove unnecessary getters/setter for a listener and use adapter
class as listener instead
* Optimize MessageListener: remove redundant inner class and use a
single method reference instead of N instances per each subscribe
* Javadocs improvements
artembilan pushed a commit that referenced this issue Aug 11, 2022
Fixes #3685

Introduce some initial design.
Add a new interface `ClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
Add a new interface `ClientManager` which will manage clients and
connections.
Add different implementations for v3 and v5 MQTT clients.
Use this manager in v3/v5 topic adapters and message handlers.
Add a couple of unit/integration tests to cover client manager usage.
Several small code improvements after the code review:
* Improve client manager usage via providing several mutual exclusive
constructors, whether the users provides `url` or `connectionOptions`
or `clientFactory` for v3.
* Move the logger to `AbstractMqttClientManager`
* Do not inject TaskScheduler in constructor for v3 client manager
but use lazy init via `BeanFactory` and `IntegrationContextUtils`
* Other smaller code readability improvements

Add new tests with reconnect cases.
Other code improvements after the code review:
* Adjust javadocs according to standards
* Remove `setClientManager` and use exclusive ctors
* Make automatic reconnects using the v3 client instead of manually
using task scheduler

Some fixes and improvements after another code review iteration:
* Rearrange the code according to the code style guides
* Move client instance to `AbstractClientManager` with `isRunning`
method
* Fix abstract adapter/handler fields visibility and `final`ize them
where we can
* Send application event if automatic reconnect is not enabled for the
client manager

Other fixes and improvements after code review:
* Changes around fields, methods, ctors visibility
* Removed contradictory ctors
* Reduce amount of unnecessary `getClientManager() != null` checks
in logic and make it as similar as possible for client manager and the
old approach
* Use auto-reconnect where possible
* Remove manual reconnect trigger and rely on events instead to know
where to subscribe
* Do not close the connection in adapter to be able to use reconnect
logic without lose of subscriptions
* Make `ClientManager` extend `MqttComponent` so that it knows about
connection options as part of its contract
* Remove not relevant auto test cases (relying on connection close or
manual reconnect)
* Other code style smaller changes

Other fixes and improvements after code review:
* Get manual `reconnect` invocation back for v3/v5 adapters and client
managers (see bug GH-3822 for a reasoning)
* Remove unnecessary getters/setter for a listener and use adapter
class as listener instead
* Optimize MessageListener: remove redundant inner class and use a
single method reference instead of N instances per each subscribe
* Javadocs improvements

* Add Javadocs to abstract client manager
* Extract common callback add/rm logic to abstract adapter class
* Small code cleanups/fixes related to code style & simplicity, ctor
inits and unnecessary methods; eliminate unnecessary logs noise
* Remove `@LongRunningTest` for `ClientManagerBackToBackTests` as test
run time is ~6-7 secs

* Remove client factory as dependency for v3 client manager and use
plain connection properties and client persistence instead
* Add missed javadocs
* Other code style & cleanup improvements
* More code cleanup
* More Javadocs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants