Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dependency registration for Messaging annotation parsers #8654

Closed
benmaes opened this issue Jun 22, 2023 · 5 comments · Fixed by #8657
Closed

Fix dependency registration for Messaging annotation parsers #8654

benmaes opened this issue Jun 22, 2023 · 5 comments · Fixed by #8657

Comments

@benmaes
Copy link

benmaes commented Jun 22, 2023

Expected Behavior

I'm building an application which transfers/transforms messages between TCP and MQTT.
I would like to monitor the connections, to be able to configure some alerts when connections stay down for a certain amount of time/consecutive negative checks.

Current Behavior

I implemented multiple @scheduled methods with a fixed delay of x seconds to check both the status of Tcp and Mqtt connections. For Tcp I'm checking if there are open connections by calling getOpenConnectionIds() on either AbstractClientConnectionFactory or AbstractServerConnectionFactory which works fine and does the job.

I'm trying to do something similar for checking the connection (NOT subscription) with the Mqtt-broker but without success. The most obvious way would be to call the isConnected() method on IMqttAsyncClient in either Mqttv5PahoMessageDrivenChannelAdapter or Mqttv5PahoMessageHandler, but since this client has private access, I'm not able to check the connection.

Context

Is there any way of achieving this with the existing codebase? If not, would it be possible to:

  • provide a getter which returns if the client is connected in both Mqttv5PahoMessageDrivenChannelAdapter and Mqttv5PahoMessageHandler
	public boolean isConnected() {
		return mqttClient.isConnected();
	}

  • publish internal application events similar to the TcpConnectionOpenEvent once the Mqtt client is connected successfully to the broker?
@benmaes benmaes added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Jun 22, 2023
@artembilan
Copy link
Member

Starting with version 6.0 we have a ClientManager abstraction which provides a getClient().
And that one can be used for requested isConnected() check.
We also have a bunch of MqttIntegrationEvent implementations for your consideration:
https://docs.spring.io/spring-integration/docs/current/reference/html/mqtt.html#mqtt-events
https://docs.spring.io/spring-integration/docs/current/reference/html/mqtt.html#mqtt-shared-client

Let us know if that works for you!

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jun 22, 2023
@benmaes
Copy link
Author

benmaes commented Jun 23, 2023

I'm able to check the connection using the ClientManager, but I see a strange side-effect when publishing to Mqtt.
On application startup, the doStart methods are called in this order Mqttv5PahoMessageHandler -> Mqttv5ClientManager -> Mqttv5PahoMessageDrivenChannelAdapter.
This means only the adapters are fully functional, but for the handlers, the client that is set in the mqttClient property is null, since the doStart method of the clientmanager was not executed yet and so no client was created.
This causes a nullpointer when trying to publish a method (Mqttv5PahoMessageHandler line 265 -> mqttClient.isConnected() check).

Is there a way to manage the order in which the doStart methods are triggered? That way I can make sure the ClientManagers are started first. Or am I doing something wrong here?

@artembilan
Copy link
Member

Well, according to my debugging it happens not the way you got it:

2023-06-23 12:29:30,161 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase -2147483648
2023-06-23 12:29:30,161 TRACE [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting bean '_org.springframework.integration.errorLogger' of type [org.springframework.integration.config.ConsumerEndpointFactoryBean]
2023-06-23 12:29:30,161 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Successfully started bean '_org.springframework.integration.errorLogger'
2023-06-23 12:29:30,161 TRACE [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting bean 'mqttv5ClientManager' of type [org.springframework.integration.mqtt.core.Mqttv5ClientManager]
2023-06-23 12:29:30,528 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Successfully started bean 'mqttv5ClientManager'
2023-06-23 12:29:30,529 TRACE [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting bean 'mqttOutFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [org.springframework.integration.config.ConsumerEndpointFactoryBean]
2023-06-23 12:29:30,529 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Successfully started bean 'mqttOutFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2023-06-23 12:29:30,529 TRACE [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting bean 'mqttInFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [org.springframework.integration.config.ConsumerEndpointFactoryBean]
2023-06-23 12:29:30,529 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Successfully started bean 'mqttInFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2023-06-23 12:29:30,529 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase 0
2023-06-23 12:29:30,529 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase 1073741823
2023-06-23 12:29:30,529 TRACE [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Starting bean 'mqttInFlow.mqtt:inbound-channel-adapter#0' of type [org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter]
2023-06-23 12:29:30,539 DEBUG [main] [org.springframework.context.support.DefaultLifecycleProcessor] - Successfully started bean 'mqttInFlow.mqtt:inbound-channel-adapter#0'

My config is just like this:

		@Bean
		public Mqttv5ClientManager mqttv5ClientManager() {
			return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5");
		}

		@Bean
		public IntegrationFlow mqttOutFlow(Mqttv5ClientManager mqttv5ClientManager) {
			return f -> f.handle(new Mqttv5PahoMessageHandler(mqttv5ClientManager));
		}

		@Bean
		public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
			return IntegrationFlow.from(new Mqttv5PahoMessageDrivenChannelAdapter(mqttv5ClientManager, TOPIC_NAME))
					.channel(c -> c.queue("fromMqttChannel"))
					.get();
		}

Since there is a dependency from the Mqttv5PahoMessageHandler on that Mqttv5ClientManager, that dependency is started first, then this event consumer.
This way no beans to start from phase 0 which is default for ClientManager.
And only after that we got Mqttv5PahoMessageDrivenChannelAdapter in its later phase.

It would be great to know what is your configuration to determine why dependency lifecycle is not triggered in time.
However as a workaround you ca override the phase for your Mqttv5ClientManager. See its JavaDocs:

	 * Set the phase of component autostart in {@link SmartLifecycle}.
	 * If the custom one is required, note that for the correct behavior it should be less than phase of
	 * {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
	 * @see #getPhase
	 */
	public void setPhase(int phase) {

The EventDrivenConsumer for the Mqttv5PahoMessageHandler has a default phase as Integer.MIN_VALUE. So, you may consider to use that one for Mqttv5ClientManager as well.

You may need to revise your publishing logic to not do that too early. Kinda let the whole application to be started before do your hard logic.

@benmaes
Copy link
Author

benmaes commented Jun 26, 2023

Thanks for the answer. The workaround (putting phase of ClientManager to Integer.MIN_VALUE) works, now the ClientManagers are started first, before the MessageHandlers.

These are the logs WITHOUT the workaround:


2023-06-26 09:23:39,808 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor , - Starting beans in phase -2147483648 
2023-06-26 09:23:39,808 [main] TRACE o.s.c.s.DefaultLifecycleProcessor , - Starting bean 'myMqttv5PahoMessageHandler.serviceActivator' of type [org.springframework.integration.config.ConsumerEndpointFactoryBean] 
2023-06-26 09:23:39,808 [main] INFO  o.s.i.endpoint.EventDrivenConsumer , - Adding {message-handler:myMqttv5PahoMessageHandler.serviceActivator} as a subscriber to the 'myMqttOutboundChannel' channel 
2023-06-26 09:23:39,808 [main] INFO  o.s.i.channel.DirectChannel , - Channel 'application.myMqttOutboundChannel' has 1 subscriber(s). 
...
2023-06-26 09:23:39,815 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor , - Starting beans in phase 0 
2023-06-26 09:23:39,816 [main] TRACE o.s.c.s.DefaultLifecycleProcessor , - Starting bean 'myClientManager' of type [org.springframework.integration.mqtt.core.Mqttv5ClientManager] 
2023-06-26 09:23:40,415 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor , - Successfully started bean 'myClientManager' 

And here is the config:


    @Bean
    public ClientManager<IMqttAsyncClient, MqttConnectionOptions> myClientManager() {
        return new Mqttv5ClientManager(mqttConnectionOptions, "myClientId");
    }

    @Bean
    @ServiceActivator(inputChannel = "myMqttOutboundChannel")
    public Mqttv5PahoMessageHandler myMqttv5PahoMessageHandler(ClientManager<IMqttAsyncClient, MqttConnectionOptions> myClientManager) {
        Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(myClientManager);

        messageHandler.setConverter(new StringMessageConverter());
        messageHandler.setDefaultQos(mqttProducerProperties.getQos());

        return messageHandler;
    }

    @Bean
    public MessageChannel myMqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public TcpInboundGatewaySpec tcpInboundFlow(flowProperties flowProperties) {
        return getTcpInboundGatewaySpec(flowProperties.getTcpConsumer().getPort());
    }

    private TcpInboundGatewaySpec getTcpInboundGatewaySpec(Integer port) {
        return Tcp.inboundGateway(Tcp.nioServer(port)
                        .id("nioServerId")
                        .deserializer(TcpCodecs.stxetx())
                        .serializer(TcpCodecs.stxetx())
                        .mapper(tcpMessageMapper)
                        .backlog(30))
                .errorChannel("errorChannel")
                .id("tcpInboundGatewayId");
    }


    @Bean
    public IntegrationFlow myFlow(TcpInboundGatewaySpec tcpInboundFlow) {
        return IntegrationFlow.from(tcpInboundFlow)
                .handle(Message.class, (message, headers) -> {
                    String input = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);

                    log.info(
                            "Incoming message from TCP",
                            StructuredArgumentsUtil.tcpMessageReceived()
                    );

                    myMqttOutboundChannel().send(MessageBuilder
                            .withPayload(getParsedBody(input))
                            .setHeader(MqttHeaders.TOPIC, topic)
                            .setHeader(MqttHeaders.MESSAGE_EXPIRY_INTERVAL, mqttBrokerProperties.getPublishMessageExpiration())
                            .setHeader(Headers.CONTENT_TYPE, Headers.CONTENT_TYPE_JSON)
                            .build());

                    log.info(
                            "Sent message to MQTT",
                            StructuredArgumentsUtil.mqttMessagePublished()
                    );

                    return Tcp.ACK;
                })
                .get();
    }

@artembilan
Copy link
Member

OK. I see. So, the @ServiceActivator mechanism doesn't register a dependency between a ConsumerEndpointFactoryBean created on the fly with that @Bean for Mqttv5PahoMessageHandler.
And therefore a dependent ClientManager is not started together with that consumer endpoint.

So, I'm treating this as a bug and will fix it soon.

I'll repurpose your issue to respective bug since, essentially, your one has been just a question which we don't handle in issues.

@artembilan artembilan added this to the 6.2.0-M1 milestone Jun 26, 2023
@artembilan artembilan changed the title Provide possibility to check Mqtt connection status with broker ~ checking Tcp connection status Fix dependency registration for Messaging annotation parsers Jun 26, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Jun 26, 2023
Fixes spring-projects#8654

Spring Framework has an ability to start dependant beans automatically
when we start the current one.

The `AbstractMethodAnnotationPostProcessor` is missing a bean dependency
registration causing errors in target applications when Messaging Annotations configuration is used.

* Add `registerDependentBean()` into an `AbstractMethodAnnotationPostProcessor` when we generate
and register a `ConsumerEndpointFactoryBean`
* Change one of the `ClientManagerBackToBackTests` configuration to rely on a `@ServiceActivator`
for `Mqttv5PahoMessageHandler` bean to ensure that change in the `AbstractMethodAnnotationPostProcessor`
has a proper effect

**Cherry-pick to `6.1.x` & `6.0.x`**
garyrussell pushed a commit that referenced this issue Jun 26, 2023
Fixes #8654

Spring Framework has an ability to start dependant beans automatically
when we start the current one.

The `AbstractMethodAnnotationPostProcessor` is missing a bean dependency
registration causing errors in target applications when Messaging Annotations configuration is used.

* Add `registerDependentBean()` into an `AbstractMethodAnnotationPostProcessor` when we generate
and register a `ConsumerEndpointFactoryBean`
* Change one of the `ClientManagerBackToBackTests` configuration to rely on a `@ServiceActivator`
for `Mqttv5PahoMessageHandler` bean to ensure that change in the `AbstractMethodAnnotationPostProcessor`
has a proper effect

**Cherry-pick to `6.1.x` & `6.0.x`**
garyrussell pushed a commit that referenced this issue Jun 26, 2023
Fixes #8654

Spring Framework has an ability to start dependant beans automatically
when we start the current one.

The `AbstractMethodAnnotationPostProcessor` is missing a bean dependency
registration causing errors in target applications when Messaging Annotations configuration is used.

* Add `registerDependentBean()` into an `AbstractMethodAnnotationPostProcessor` when we generate
and register a `ConsumerEndpointFactoryBean`
* Change one of the `ClientManagerBackToBackTests` configuration to rely on a `@ServiceActivator`
for `Mqttv5PahoMessageHandler` bean to ensure that change in the `AbstractMethodAnnotationPostProcessor`
has a proper effect

**Cherry-pick to `6.1.x` & `6.0.x`**
rainboyan pushed a commit to rainboyan/spring-integration that referenced this issue Nov 24, 2023
…ng-projects#8657)

Fixes spring-projects#8654

Spring Framework has an ability to start dependant beans automatically
when we start the current one.

The `AbstractMethodAnnotationPostProcessor` is missing a bean dependency
registration causing errors in target applications when Messaging Annotations configuration is used.

* Add `registerDependentBean()` into an `AbstractMethodAnnotationPostProcessor` when we generate
and register a `ConsumerEndpointFactoryBean`
* Change one of the `ClientManagerBackToBackTests` configuration to rely on a `@ServiceActivator`
for `Mqttv5PahoMessageHandler` bean to ensure that change in the `AbstractMethodAnnotationPostProcessor`
has a proper effect

**Cherry-pick to `6.1.x` & `6.0.x`**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants