Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Skip message for processing it later #36916

Closed
secit-pl opened this issue May 23, 2020 · 3 comments
Closed

[Messenger] Skip message for processing it later #36916

secit-pl opened this issue May 23, 2020 · 3 comments

Comments

@secit-pl
Copy link

secit-pl commented May 23, 2020

Description
In some cases we don't want to handle the message right now, but we'd delay its execution for a X seconds and then check once again is it right time to process the message (if not, once again we delay it).

We have UnrecoverableMessageHandlingException to prevent retrying.
A good thing might to be have sth like DelayedMessageHandlingException with argument which defines when at earliest the message should be processed once again.

Example
Let's say we have a message which tells that we should import the data to database from specified file (ImportMessage). We don't want to process many imports at the same time, but we have multiple consumers runned.

In current case if two messages ImportMessage are possible to process in specified moment of time both will be executed in the same time and processed parallel. To prevent this we can make internal lock (for example using semaphore) to prevent second import from execution and delay all other imports for example for 30 seconds to process once again by throwing proposed DelayedMessageHandlingException.

@secit-pl secit-pl changed the title Notifier - skip message for processing it later [Messenger] Skip message for processing it later May 23, 2020
@Tobion
Copy link
Member

Tobion commented May 23, 2020

This can be achieved in a custom middleware or in the message handler directly. Just re-dispatch the message with delay to the bus.
I don't think this needs to be added to the core.

@secit-pl
Copy link
Author

@Tobion could you please explain a bit more how to "re-dispatch the message with delay to the bus"? The current doctrine transport implementation is executing this query every second (as I understood this transport is working):

SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1

0="2020-05-24T04:11:46+00:00"
1="2020-05-24T05:11:46+00:00"
2="default"

and handle the found message.

I can't fid any simple way how to achieve the expected result without:
a) creating a new message in message handler sleeping (with sleep()) for a while before dispatching the message - this will block one consumer until the operation ends.

b) the same above but without sleep - this will spam the messages queue with many "nop" messages til the real expected execution time

@secit-pl
Copy link
Author

@Tobion I have read once again the whole documentation of the Messenger component and just noticed this https://symfony.com/doc/current/messenger.html#envelopes-stamps . With your explanation this is exactly what I'm looking for.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants