Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultMessageListenerContainer should interrupt worker threads when not returning on shutdown [SPR-16536] #21079

Closed
spring-issuemaster opened this issue Feb 26, 2018 · 17 comments
Assignees
Milestone

Comments

@spring-issuemaster
Copy link
Collaborator

@spring-issuemaster spring-issuemaster commented Feb 26, 2018

Maxim Popov opened SPR-16536 and commented

[configuration]
The problem is reproduced whem JMS prefetch size is set to 0.

connectionFactory.getPrefetchPolicy().setQueuePrefetch(0);

[description]
@DirtiesContext(classMode=DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) annotations triggers org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup.stop() method which tries to shutdown bean org.springframework.jms.config.JmsListenerEndpointRegistry.

When JmsListenerEndpointRegistry stops, it calls stop on DefaultMessageListenerContainer which is marked stopped by flag 'running'

this.running = false;

But ActiveMQMessageConsumer is blocked by waiting a message from broker when prefetch size is 0

if (info.getPrefetchSize() == 0) {
    md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
    md = dequeue(timeout);
}

So DefaultMessageListenerContainer can't react on flag 'running' and continue to wait a message.

As result DefaultLifecycleProcessor.LifecycleGroup.stop() is waiting 30 seconds to stop normally without any success.

latch.await(this.timeout, TimeUnit.MILLISECONDS);

Affects: 4.3.14

Issue Links:

  • #21128 Application Server Fail to Stop Application and server while stopping transactions with spring
  • #21148 ThreadPoolTaskExecutor should cancel all remaining Future handles on shutdown

Referenced from: commits c6bd0c9, 59f1263, 95aad9c

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 26, 2018

Gary Russell commented

You have to interrupt the dequeue() - here's one way...

@SpringBootApplication
public class Int4415Application {

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext ctx = SpringApplication.run(Int4415Application.class, args);
		ctx.getBean("exec", ThreadPoolTaskExecutor.class).shutdown();
		ctx.close();
	}

	@Bean
	public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
		return args -> {
			template.convertAndSend("foo", "bar");
			Thread.sleep(10_000);
		};
	}

	@JmsListener(destination = "foo")
	public void in(Message in) {
		System.out.println(in);
	}

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
			DefaultJmsListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		factory.setTaskExecutor(exec());
		return factory;
	}

	@Bean
	public TaskExecutor exec() {
		return new ThreadPoolTaskExecutor();
	}

}
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Maxim Popov commented

Solution with ThreadPoolTaskExecutor shutdown works!
Thanks

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Juergen Hoeller commented

Gary Russell, you're primarily triggering that ThreadPoolTaskExecutor.shutdown() call for interrupting all of the managed threads there? I suppose a manual Thread.interrupt() call on the affected Thread handles would also work? Not sure where we could even do that, just trying to understand the issue a bit better...

Also, does this mean that ActiveMQ completely ignores a local JMS receiveTimeout when its prefetch size is 0? That seems rather odd and is arguably rather unexpected behavior for a JMS provider.

If there's no good way to automatically interrupt the consumers on shutdown, we'll have to turn this into a documentation task for that specific ActiveMQ configuration.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Juergen Hoeller commented

Assuming that a Thread.interrupt() call is all it takes: We could store the Thread handle for a current receive attempt in each AsyncMessageListenerInvoker instance... and interrupt that Thread handle for every active invoker on shutdown before we wait for the invokers to deactivate themselves. This might not make much difference if receive timeouts are tight anyway but for longer timeouts or for this ActiveMQ case it would be an additional signal for the consumers to stop.

Gary Russell, does this sound feasible to you? I've locally implemented this and it seems to work so far. I haven't tried it against a full ActiveMQ setup yet though.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Gary Russell commented

Juergen Hoeller Yes, the shutdown in the the work around was just to interrupt the threads.

I agree it's an ActiveMQ problem; seems weird that they just ignore the timeout, the comment is misleading too

    public Message receive(long timeout) throws JMSException {
        checkClosed();
        checkMessageListener();
        if (timeout == 0) {
            return this.receive();
        }

        sendPullCommand(timeout);
        while (timeout > 0) {

            MessageDispatch md;
            if (info.getPrefetchSize() == 0) {
                md = dequeue(-1); // We let the broker let us know when we timeout.
            } else {
                md = dequeue(timeout);
            }
        ...

I am not sure that we need to do anything, except maybe document it; perhaps Maxim Popov might want to raise an issue there?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Gary Russell commented

Oh; I see sendPullCommand(timeout);

Maybe that's broken?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Gary Russell commented

No; it's definitely a bug since they block in the dequeue() if the queue is empty; the pull command might timeout but they wait on monitor obect (wait()) indefinitely here MessageDispatch md = unconsumedMessages.dequeue(timeout);

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Juergen Hoeller commented

I wonder if there could be a negative effect from interrupting a MessageConsumer.receive call on shutdown. All that really happens is that Object.wait calls within the consumer return at that point, and I guess that never hurts on shutdown... since we're going to reject any such late-received messages anyway (unless acceptMessagesWhileStopping is set to true, so we shouldn't do the consumer interruption in that case and just let the receive calls return normally as before).

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Juergen Hoeller commented

I've fine-tuned the interruption algorithm to only kick in after one round of waiting. In other words, once DefaultMessageListenerContainer.doShutdown has waited for the receive timeout period, it'll turn more aggressive and interrupt remaining invoker threads before waiting for another receive timeout period. This should be sensible since it lets current receive attempts return regularly, only starting to interrupt when invokers are unexpectedly still active then (as long as acceptMessagesWhileStopping remains at its default false which allows to assume that it's only the receive attempts themselves being stuck). This also makes the timing configurable: If desirable for any reason, the shutdown interruption step can be deferred by setting a longer receive timeout.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 27, 2018

Juergen Hoeller commented

I've pushed my revision to master. Please give the upcoming 5.0.5.BUILD-SNAPSHOT a try against your ActiveMQ setup... I'll backport this to 4.3.15 once we know that it actually helps.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 28, 2018

Juergen Hoeller commented

I've backported this to the 4.3.x branch in the meantime, so you could also give the upcoming 4.3.15.BUILD-SNAPSHOT a try.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 28, 2018

Gary Russell commented

Hi Juergen Hoeller - this still doesn't work in my test case.

The problem is that shutDown() is not called until the JmsListenerEndpointRegistry is destroy() ed, which is after the stop() has given up after 30 seconds:

2018-02-28 12:41:30.634  INFO 1100 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
2018-02-28 12:42:00.634  WARN 1100 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Failed to shut down 1 bean with phase value 2147483647 within timeout of 30000: [org.springframework.jms.config.internalJmsListenerEndpointRegistry]
2018-02-28 12:42:00.635  INFO 1100 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2018-02-28 12:42:00.635  INFO 1100 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'exec'
2018-02-28 12:42:00.635 DEBUG 1100 --- [           main] o.s.j.l.DefaultMessageListenerContainer  : Shutting down JMS listener container
2018-02-28 12:42:00.635 DEBUG 1100 --- [           main] o.s.j.l.DefaultMessageListenerContainer  : Waiting for shutdown of message listener invokers

In any case, I wonder if it's safe to do the interrupt in the container; don't we run the risk of interrupting the thread while it's in user code? (Which would be a behavior change, even if the right thing to do).

@SpringBootApplication
public class Int4415Application {

	public static void main(String[] args) throws Exception {
		new DefaultMessageListenerContainer();
		ConfigurableApplicationContext ctx = SpringApplication.run(Int4415Application.class, args);
//		ctx.getBean("exec", ThreadPoolTaskExecutor.class).shutdown();
		ctx.close();
	}

	@Bean
	public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
		return args -> {
			template.convertAndSend("foo", "bar");
			Thread.sleep(10_000);
		};
	}

	@JmsListener(destination = "foo")
	public void in(Message in) {
		System.out.println(in);
	}

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
			DefaultJmsListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
//		factory.setTaskExecutor(exec());
		return factory;
	}

	@Bean
	public TaskExecutor exec() {
		return new ThreadPoolTaskExecutor();
	}

}
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 28, 2018

Juergen Hoeller commented

Hmm, all we're really doing there is to switch the running flag and call stop on the shared Connection... So I assume ActiveMQ's implementation of Connection.stop() hangs while it's waiting for one of its MessageConsumer.receive calls to return? That's unfortunate since there's nothing we can do about it at that level. It would really have to be addressed within ActiveMQ then.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 28, 2018

Gary Russell commented

Juergen Hoeller It looks like we might have a race condition in the container; if I set a breakpoint in stopSharedConnection() and resume it immediately, the container stops cleanly.

I looked at the ActiveMQ code and the stop() does wake the receive() but perhaps we loop around again somehow?

2018-02-28 13:28:56.369  INFO 9764 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
2018-02-28 13:28:56.391 TRACE 9764 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Consumer [ActiveMQMessageConsumer { value=ID:gollum.local-50369-1519842526165-4:1:1:1, started=true }] of session [ActiveMQSession {id=ID:gollum.local-50369-1519842526165-4:1:1,started=true} java.lang.Object@31af4909] did not receive a message
2018-02-28 13:28:57.574  INFO 9764 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2018-02-28 13:28:57.574  INFO 9764 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'exec'
2018-02-28 13:28:57.574 DEBUG 9764 --- [           main] o.s.j.l.DefaultMessageListenerContainer  : Shutting down JMS listener container
2018-02-28 13:28:57.574 DEBUG 9764 --- [           main] o.s.j.l.DefaultMessageListenerContainer  : Waiting for shutdown of message listener invokers
2018-02-28 13:28:57.574 DEBUG 9764 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Lowered scheduled invoker count: 0
2018-02-28 13:28:57.579  INFO 9764 --- [           main] o.a.activemq.broker.TransportConnector   : Connector vm://localhost stopped
2018-02-28 13:28:57.580  INFO 9764 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.14.5 (localhost, ID:gollum.local-50369-1519842526165-0:1) is shutting down
2018-02-28 13:28:57.585  INFO 9764 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.14.5 (localhost, ID:gollum.local-50369-1519842526165-0:1) uptime 11.485 seconds
2018-02-28 13:28:57.586  INFO 9764 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.14.5 (localhost, ID:gollum.local-50369-1519842526165-0:1) is shutdown
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Feb 28, 2018

Gary Russell commented

No, the race is in ActiveMQ

while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
    if (timeout == -1) {
        mutex.wait();
    } else {
        mutex.wait(timeout);
        break;
    }

The thread is on the mutex.wait() and running is false.

Their stop() method looks like this

public void stop() {
    synchronized (mutex) {
        running = false;
        mutex.notifyAll();
    }
}

I think having putting the break point in stopSharedConnection() simply allows the container thread to go idle and therefore doesn't hit their race.

I am not inclined to debug their code further.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Mar 2, 2018

Juergen Hoeller commented

OK, since this is a race condition within ActiveMQ that we can't properly work around, I've reverted the interrupt call in 4.3.x, just keeping the wait iteration count in the log statements.

In general, it still seems to make sense to interrupt the consumers once a first round of waiting hasn't made them come back (and if we'd reject any received messages at that point anyway). I'm willing to keep that logic in 5.0.x, not least of it all since an external ThreadPoolExecutor shutdown would interrupt any remaining worker threads as well... and if there is no external Executor, DefaultMessageListenerContainer internally manages those threads and should eventually shut them down the same way. Even in user code, interruption just affects blocking monitor waits and I/O operations in any case, and we really want those to stop at that point.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Mar 16, 2018

Juergen Hoeller commented

I'm repurposing this ticket for the worker thread interruption step in DefaultMessageListenerContainer, even if it doesn't fully address the ActiveMQ case.

Any remaining race conditions should get reported to the ActiveMQ project. If we get specific feedback from them, we may do a further round of fine-tuning on our end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.