Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

After update to latest version SimpleMessageListenerContainer is warning #373

Closed
rotilho opened this issue Sep 12, 2018 · 12 comments
Closed
Assignees
Labels
component: sqs SQS integration related issue type: bug A general bug

Comments

@rotilho
Copy link

rotilho commented Sep 12, 2018

After update to 2.0.0 from 1.2.3 I usually to see frequently the following warn:

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@792e3eb0[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 1]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@6f49eae9
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:317)
	at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@6f49eae9 rejected from java.util.concurrent.ThreadPoolExecutor@792e3eb0[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 1]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:314)
	... 7 common frames omitted

@j-gurda
Copy link

j-gurda commented Sep 27, 2018

It looks like number of messages received from AWS by single batch receive operation (by default 10 - AbstractMessageListenerContainer.DEFAULT_MAX_NUMBER_OF_MESSAGES) is larger than size of taskExecutor (by default 2+1 -see SimpleMessageListenerContainer.createDefaultTaskExecutor) in SimpleMessageListenerContainer. Most likely bug introduced sometime between 1.2.3 and 2.0.0 (I don't know where exactly but 1.2.3 works fine even with the same defaults).

What I'd suggest is to create SimpleMessageListenerContainer "manually" (by providing bean of SimpleMessageListenerContainerFactory) and call setMaxNumberOfMessages with value of X. In that case when task executor is initialized it sees non-null value of 10 and creates X+1 receiver threads.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Oct 1, 2018
@hithwen
Copy link

hithwen commented Oct 30, 2018

I have same issue after updating from 1 to 2

@CCBow-501
Copy link

I have the same issue with version 2.0.1
even with a Bean of SimpleMessageListenerContainerFactory with the following configuration:

        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setMaxNumberOfMessages(5);
        factory.setDestinationResolver(destinationResolver);
        factory.setWaitTimeOut(20);
        return factory;

@j-gurda
Copy link

j-gurda commented Nov 7, 2018

Here's my code and it does not generate errors in runtime. As far as I remember the thing is that size of queueContainerTaskEecutor defined is larger than maxNumberOfMessages in SimpleMessageListenerContainerFactory

package io.xxxxxxxx.web.infrastructure.aws;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.client.builder.ExecutorFactory;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;

@Configuration
@EnableSqs
class SqsConfiguration {

    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";

    @Bean
    public ClientConfiguration sqsClientConfiguration() {
        return new ClientConfiguration()
                .withConnectionTimeout(30000)
                .withRequestTimeout(30000)
                .withClientExecutionTimeout(30000);
    }

    @Bean
    public ExecutorFactory sqsExecutorFactory() {
        return new ExecutorFactory() {
            @Override
            public ExecutorService newExecutor() {
                return new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            }
        };
    }

    @Bean
    public AmazonSQSAsync amazonSqs(ClientConfiguration sqsClientConfiguration, ExecutorFactory sqsExecutorFactory,
            EndpointConfiguration sqsEndpointConfiguration, AWSCredentialsProvider credentialsProvider) {
        return AmazonSQSAsyncClientBuilder.standard()
                .withClientConfiguration(sqsClientConfiguration)
                .withExecutorFactory(sqsExecutorFactory)
                .withEndpointConfiguration(sqsEndpointConfiguration)
                .withCredentials(credentialsProvider)
                .build();
    }

    @Bean
    public AsyncTaskExecutor queueContainerTaskEecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(DEFAULT_THREAD_NAME_PREFIX);
        threadPoolTaskExecutor.setCorePoolSize(2);
        threadPoolTaskExecutor.setMaxPoolSize(2);
        // No use of a thread pool executor queue to avoid retaining message to long in memory
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setAutoStartup(true);
        factory.setMaxNumberOfMessages(1);
        factory.setWaitTimeOut(20);
        factory.setTaskExecutor(queueContainerTaskEecutor);
        return factory;
    }

}

@spencergibb
Copy link
Contributor

Is a change required here or a documentation update?

@spencergibb spencergibb added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-triage An issue we've not yet triaged labels Jan 29, 2019
@rotilho
Copy link
Author

rotilho commented Jan 29, 2019

It is. As mentioned @j-gurda SimpleMessageListenerContainerFactory is allocating more threads than AsyncTaskExecutor supports. I had to create SimpleMessageListenerContainerFactory manually with one thread lower than AsyncTaskExecutor to workaround the problem.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 29, 2019
@spencergibb spencergibb added type: bug A general bug and removed status: feedback-provided Feedback has been provided labels Jan 29, 2019
@j-gurda
Copy link

j-gurda commented Jan 30, 2019

@rotilho actually number of messages received in single batch operation (by default 10 - AbstractMessageListenerContainer.DEFAULT_MAX_NUMBER_OF_MESSAGES) is larger than size of taskExecutor which processes these messages (by default 2+1 -see SimpleMessageListenerContainer.createDefaultTaskExecutor). So what we may do is:

  1. Decrease default receive batch size (to 1 or 2 I believe) - we don't make use of batch receive but keep threads number low
  2. Increase size of taskExecutor (to 11 I believe) - we receive as much as AWS allows but keep many threads running

These'd be the defaults which could be tuned with specific "manual" configuration so I'd suggest to go with less resources allocated i.e. receive by 1 message from AWS and have 2 threads handling receiving and processing of messages.

@MaximeFrancoeur
Copy link

MaximeFrancoeur commented Feb 18, 2019

So if I set setMaxNumberOfMessages to 10 this will create a higher ThreadScheduler :

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQS);
        factory.setMaxNumberOfMessages(10);
        factory.setAutoStartup(true);
        factory.setWaitTimeOut(20);

        return factory;
    }

Adding some explanation about how this works: Spring-cloud-aws will set the number of threads to the number of messages if the number of messages has ben explicitly set, otherwise it will set it to 2 even if the default number of messages is 10

from Stackoverflow -> https://stackoverflow.com/questions/51373082/springboot-sqslistener-not-working-with-exception-taskrejectedexception?rq=1

@michaelwiles
Copy link

Surely a change is required to bring this into line? As currently the default config causes these exceptions if the queue has more than 2 messages on it?

The DEFAULT_MAX_NUMBER_OF_MESSAGES = 10 but the default number of worker threads is 2 so you are always going to get this exception if your queue has more than 10 and then you're going to get a 10 second pause for no genuine reason.

Another option is to change the rejection policy from AbortPolicy (java.util.concurrent.ThreadPoolExecutor.AbortPolicy) to CallerRuns which makes a lot of sense to me as it will mean mean you won't get this message and it will only poll again once all messages have been processed (though there may be other reasons why this won't work).

@internetstaff
Copy link

It doesn't make sense that the default configuration seems doomed to failure. It leaves a poor first impression. :)

@maciejwalkowiak maciejwalkowiak added the component: sqs SQS integration related issue label May 29, 2020
@maciejwalkowiak maciejwalkowiak self-assigned this Jun 6, 2020
@maciejwalkowiak
Copy link
Contributor

I'll double check but I think this issue has been already fixed.

@maciejwalkowiak
Copy link
Contributor

Fixed in 9dcdd53

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
component: sqs SQS integration related issue type: bug A general bug
Development

No branches or pull requests

10 participants