Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to guarantee that one message consumed exactly once? #52

Closed
vking34 opened this issue Dec 4, 2020 · 10 comments
Closed

How to guarantee that one message consumed exactly once? #52

vking34 opened this issue Dec 4, 2020 · 10 comments
Assignees
Labels
more information required More information required

Comments

@vking34
Copy link

vking34 commented Dec 4, 2020

Describe the bug

I have 3 job queues. Jobs in the first queue create jobs in the second queue.
Even I assign numRetries to 0. But the second RqueueListener seems to consume one message twice?
How can I configure to guarantee that one message consumed exactly once?
Thank you.

How to Reproduce

    @RqueueListener(value = FLASH_BID_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-bot-queue", concurrency = "3-5")
    public void executeFlashBidBot(Bot bot) {
    ...
    }

    @RqueueListener(value = FLASH_BID_AFTER_MIN_PRICE_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-after-min-price-bot-queue", concurrency = "1-3")
    public void makeInstantBidAfterMinPrice(LateBot lateBot) {
    ...
    }

   @RqueueListener(value = FLASH_BID_INSTANT_BID_PADDING_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-instant-bid-padding-queue", concurrency = "1-3")
    private void padInstantBid(PaddingInstantBid instantBid) {
    ...
    }

Screenshots

e3

Library Dependencies

  • Spring Boot:
  • Spring Data Redis:
@sonus21
Copy link
Owner

sonus21 commented Dec 4, 2020

This looks to be very strange to me unless you have enqueued multiple similar Instances f bid to the queue. It should not have duplicate consumption, also time interval is quite small ~ 20Ms.

Can you see if by any-chance you're enqueueing multiple messages from different place for the same bid?

How frequently you're observing this issue?

@sonus21 sonus21 added the more information required More information required label Dec 6, 2020
@vking34
Copy link
Author

vking34 commented Dec 7, 2020

@sonus21 My apology. It's my misunderstanding. There are still remaining jobs (messages) in the queue from the previous execution.
One more issue, I tried to delete all messages in a queue (a namespace) in redis:

rqueueMessageSender.deleteAllMessages(FLASH_BID_BOT_QUEUE);

But I inspected that the messages were not removed.
How exactly can I delete all messages in a specific queue?
Thank you.

@sonus21
Copy link
Owner

sonus21 commented Dec 7, 2020

That's the correct way to delete all messages, did this call return true/false? False means something was not correct.

@vking34
Copy link
Author

vking34 commented Dec 16, 2020

Redis DB before deleting:
g1

I am trying to delete messages from the beginning of execution.

@Component
@ConditionalOnProperty(
        value = "cz-auction-bot.scheduler",
        havingValue = "true"
)
@Slf4j
public class BotScheduler {
    @Autowired private ScheduledExecutorService executorService;
    @Autowired private RqueueMessageSender rqueueMessageSender;

    @PostConstruct
    private void loadAuctions() {
        log.warn("[ SCHEDULER ]");
        // clean up
        botService.deleteAllBots();

        try {
            clearOldJobs();
        }
        catch (QueueDoesNotExist e) {
            // wait for registering queue
            executorService.schedule(this::loadAuctions, 2000, TimeUnit.MILLISECONDS);
            return;
        }

    }

    private void clearOldJobs(){
        boolean deleted0 = rqueueMessageSender.deleteAllMessages(FLASH_BID_BOT_QUEUE);
        rqueueMessageSender.deleteAllMessages(FLASH_BID_AFTER_MIN_PRICE_BOT_QUEUE);
        rqueueMessageSender.deleteAllMessages(FLASH_BID_INSTANT_BID_PADDING_QUEUE);
        rqueueMessageSender.deleteAllMessages(FLASH_BID_LAST_BID_GUARANTEE_QUEUE);
        rqueueMessageSender.deleteAllMessages(FLASH_BID_LAST_BID_QUEUE);

        rqueueMessageSender.deleteAllMessages(NORMAL_BID_BOT_QUEUE);
        rqueueMessageSender.deleteAllMessages(NORMAL_BID_INSTANT_BID_PADDING_QUEUE);
        rqueueMessageSender.deleteAllMessages(NORMAL_BID_LAST_BID_GUARANTEE_QUEUE);
        rqueueMessageSender.deleteAllMessages(NORMAL_BID_LAST_BID_QUEUE);
        log.warn("Deleting old jobs: " + deleted0);
    }

}

Register the job queue:

@Component
@Slf4j
public class FlashBidBotWorker {

    @RqueueListener(value = FLASH_BID_BOT_QUEUE, numRetries = "0", concurrency = "5-9", active = "${cz-auction-bot.worker}")
    public void executeFlashBidBot(Bot bot) {
       ...
    }
   ...
}

Rqueue Configuration:

@Component
public class RqueueConfiguration {

    @Bean
    public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
        SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();

        RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration(System.getenv("REDIS_HOST"), Integer.parseInt(System.getenv("REDIS_PORT")));
        redisConfig.setDatabase(Integer.parseInt(System.getenv("REDIS_JOB_DB")));
        LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory(redisConfig);
        redisConnectionFactory.afterPropertiesSet();
        factory.setRedisConnectionFactory(redisConnectionFactory);

        return factory;
    }
}

But after execution, the messages in redis DB were not deleted.
g1

Thank you.

@sonus21
Copy link
Owner

sonus21 commented Dec 16, 2020

How do you know it's deleted or not? You should either check dashboard or redis keys to find that.

One small improvement to your code, you don't have to use ScheduledExecutorService to do something once Rqueue has started, you can do that using application event listener, you can listen to

public class RqueueBootstrapEvent extends ApplicationEvent {

@vking34
Copy link
Author

vking34 commented Dec 16, 2020

Thank you for your improvement.

I use RDM (GUI for redis) to inspect keys in the DB. Even deleteAllMessages() return true, but the keys are still remaining.

rqueueMessageSender.deleteAllMessages(FLASH_BID_BOT_QUEUE);

Did I configure Rqueue wrong?

@sonus21
Copy link
Owner

sonus21 commented Dec 16, 2020

IIUC, you're looking at wrong redis keys, you should check __rq::queue::flash-bid-bot-queue, __rq::d-queue::flash-bid-bot-queue and __rq::d-queue::flash-bid-bot-queue , these keys must not be either present or should be empty though if you've some background job than it might have some new entries. Also some entries can be added automatically due to failure in execution if you have enabled retry.

@sonus21
Copy link
Owner

sonus21 commented Dec 16, 2020

BTW m-datas are not deleted, this will be fixed in next upcoming released.

@vking34
Copy link
Author

vking34 commented Dec 16, 2020

What is d-queue, queue and m-mdata used for?
How can I deleted all namespaces?

@sonus21
Copy link
Owner

sonus21 commented Dec 16, 2020

d-queue is a ZSET used for scheduled message
queue is a LIST used for simple message
m-mdata is used for message metadata

@sonus21 sonus21 closed this as completed Dec 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
more information required More information required
Projects
None yet
Development

No branches or pull requests

2 participants