Skip to content

Commit

Permalink
GH-3685: Share MQTT connection across components
Browse files Browse the repository at this point in the history
Fixes #3685

Introduce some initial design.
Add a new interface `ClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
Add a new interface `ClientManager` which will manage clients and
connections.
Add different implementations for v3 and v5 MQTT clients.
Use this manager in v3/v5 topic adapters and message handlers.
Add a couple of unit/integration tests to cover client manager usage.
Several small code improvements after the code review:
* Improve client manager usage via providing several mutual exclusive
constructors, whether the users provides `url` or `connectionOptions`
or `clientFactory` for v3.
* Move the logger to `AbstractMqttClientManager`
* Do not inject TaskScheduler in constructor for v3 client manager
but use lazy init via `BeanFactory` and `IntegrationContextUtils`
* Other smaller code readability improvements

Add new tests with reconnect cases.
Other code improvements after the code review:
* Adjust javadocs according to standards
* Remove `setClientManager` and use exclusive ctors
* Make automatic reconnects using the v3 client instead of manually
using task scheduler

Some fixes and improvements after another code review iteration:
* Rearrange the code according to the code style guides
* Move client instance to `AbstractClientManager` with `isRunning`
method
* Fix abstract adapter/handler fields visibility and `final`ize them
where we can
* Send application event if automatic reconnect is not enabled for the
client manager

Other fixes and improvements after code review:
* Changes around fields, methods, ctors visibility
* Removed contradictory ctors
* Reduce amount of unnecessary `getClientManager() != null` checks
in logic and make it as similar as possible for client manager and the
old approach
* Use auto-reconnect where possible
* Remove manual reconnect trigger and rely on events instead to know
where to subscribe
* Do not close the connection in adapter to be able to use reconnect
logic without lose of subscriptions
* Make `ClientManager` extend `MqttComponent` so that it knows about
connection options as part of its contract
* Remove not relevant auto test cases (relying on connection close or
manual reconnect)
* Other code style smaller changes

Other fixes and improvements after code review:
* Get manual `reconnect` invocation back for v3/v5 adapters and client
managers (see bug GH-3822 for a reasoning)
* Remove unnecessary getters/setter for a listener and use adapter
class as listener instead
* Optimize MessageListener: remove redundant inner class and use a
single method reference instead of N instances per each subscribe
* Javadocs improvements

* Add Javadocs to abstract client manager
* Extract common callback add/rm logic to abstract adapter class
* Small code cleanups/fixes related to code style & simplicity, ctor
inits and unnecessary methods; eliminate unnecessary logs noise
* Remove `@LongRunningTest` for `ClientManagerBackToBackTests` as test
run time is ~6-7 secs

* Remove client factory as dependency for v3 client manager and use
plain connection properties and client persistence instead
* Add missed javadocs
* Other code style & cleanup improvements
* More code cleanup
* More Javadocs
  • Loading branch information
oxcafedead authored and artembilan committed Aug 11, 2022
1 parent 9996777 commit 5f12729
Show file tree
Hide file tree
Showing 16 changed files with 1,364 additions and 416 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
builder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");

return builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.util.Assert;

/**
* Abstract class for MQTT client managers which can be a base for any common v3/v5 client manager implementation.
* Contains some basic utility and implementation-agnostic fields and methods.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
* @author Artem Bilan
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private static final int DEFAULT_MANAGER_PHASE = 0;

private final Set<ConnectCallback> connectCallbacks = Collections.synchronizedSet(new HashSet<>());

private final String clientId;

private int phase = DEFAULT_MANAGER_PHASE;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private String url;

private String beanName;

private volatile T client;

protected AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
}

public void setManualAcks(boolean manualAcks) {
this.manualAcks = manualAcks;
}

protected String getUrl() {
return this.url;
}

protected void setUrl(String url) {
this.url = url;
}

protected String getClientId() {
return this.clientId;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

protected synchronized void setClient(T client) {
this.client = client;
}

protected Set<ConnectCallback> getCallbacks() {
return this.connectCallbacks;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

@Override
public T getClient() {
return this.client;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

@Override
public String getBeanName() {
return this.beanName;
}

/**
* The phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.
* @return {@link SmartLifecycle} autostart phase
* @see #setPhase
*/
@Override
public int getPhase() {
return this.phase;
}

@Override
public void addCallback(ConnectCallback connectCallback) {
this.connectCallbacks.add(connectCallback);
}

@Override
public boolean removeCallback(ConnectCallback connectCallback) {
return this.connectCallbacks.remove(connectCallback);
}

public synchronized boolean isRunning() {
return this.client != null;
}

/**
* Set the phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* @see #getPhase
*/
public void setPhase(int phase) {
this.phase = phase;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.springframework.context.SmartLifecycle;

/**
* A utility abstraction over MQTT client which can be used in any MQTT-related component
* without need to handle generic client callbacks, reconnects etc.
* Using this manager in multiple MQTT integrations will preserve a single connection.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
* @author Artem Bilan
*
* @since 6.0
*/
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {

/**
* Return the managed client.
* @return the managed client.
*/
T getClient();

/**
* If manual acknowledge has to be used; false by default.
* @return true if manual acknowledge has to be used.
*/
boolean isManualAcks();

/**
* Register a callback for the {@code connectComplete} event from the client.
* @param connectCallback a {@link ConnectCallback} to register.
*/
void addCallback(ConnectCallback connectCallback);

/**
* Remove the callback from registration.
* @param connectCallback a {@link ConnectCallback} to unregister.
* @return true if callback was removed.
*/
boolean removeCallback(ConnectCallback connectCallback);

/**
* A contract for a custom callback on {@code connectComplete} event from the client.
*
* @see org.eclipse.paho.mqttv5.client.MqttCallback#connectComplete
* @see org.eclipse.paho.client.mqttv3.MqttCallbackExtended#connectComplete
*/
@FunctionalInterface
interface ConnectCallback {

/**
* Called when the connection to the server is completed successfully.
* @param isReconnect if true, the connection was the result of automatic reconnect.
*/
void connectComplete(boolean isReconnect);

}

}

0 comments on commit 5f12729

Please sign in to comment.