Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Deduplication #27

Closed
sonus21 opened this issue Jul 17, 2020 · 9 comments
Closed

Message Deduplication #27

sonus21 opened this issue Jul 17, 2020 · 9 comments
Labels
enhancement New feature or request

Comments

@sonus21
Copy link
Owner

sonus21 commented Jul 17, 2020

is there a way to see if a message object with id: 124 (having the property id) is queued already or not? ideally, a queued message should have a unique tag that helps filtering queued messages for uniqueness and return a message status (... i'm inspired by Android Work Manager ). ...If this isn't implemented already in RQueue, i'm thinking of supporting it in my internal source code. (by checking the status of objects in @RqueueListener) and reschedule it from the listener if needed to wait for more before executing it. what do you think?

Originally posted by @chlegou in #25 (comment)

@sonus21
Copy link
Owner Author

sonus21 commented Jul 17, 2020

is there a way to see if a message object with id: 124 (having the property id) is queued already or not?

No, currently there's no way to identify this. As you said you can implement this easily by making a lookup in the listener.
The reason this is not implemented is some of the users might be interesting in using id fields to uniquely identify the message while others user might be considering one or more fields to uniquely identify the message. Also we need to consider whether the new message should be run at the oldest scheduled time or at the new time, it makes more sense to run this as per the latest time, it's more of like ignore previous message, we need to also consider parallel consumption of the same message ids.

and reschedule it from the listener if needed to wait for more before executing it

I won't recommend you to schedule messages from listener as it can cause duplicate message queueing problem, now you need to work on idempotency. I would suggest you use pre execution message processor. Using pre execution processor, you skip the execution if you find there's a new message for the given id, it would be as simple as tracking the enqueue time corresponding to a message.

#25 (comment)

@sonus21
Copy link
Owner Author

sonus21 commented Jul 17, 2020

Also we need to consider whether the new message should be run at the oldest scheduled time or at the new time, it makes more sense to run this as per the latest time,

Well, scheduling with a unique tag, would discard old messages having same tag. so we're always talking about a single message. In Android workmanager, there's 2 concepts: unique tags and normal tags. messages could have many identification tags, but unique messages are having a unique tag and unique message in the queue.

I would suggest you use pre execution message processor.

could you provide an example of it? i didn't understand how to implement it.

i might go with the lookup in the listener, as it seems to be the most easy and logical as it's related to the app logic not to the rqueue library. and from the listener i chose what code i need to execute.

Thanks for your time and help. they're really appreciated. :)

Originally posted by @chlegou in #25 (comment)

@sonus21
Copy link
Owner Author

sonus21 commented Jul 17, 2020

Well, scheduling with a unique tag, would discard old messages having same tag. so we're always talking about a single message. In Android workmanager, there's 2 concepts: unique tags and normal tags. messages could have many identification tags, but unique messages are having a unique tag and unique message in the queue.

This looks to be promising, what about the execution time? For example Message M1 having tag t was scheduled at 5:20AM to run in 10 minutest that's 5:30AM, at time 5:25AM we need to update message M1 with new data but it will have same tag t, as per the application logic the code would just enqueue the message to run in 10 minutes, so now new execution time would be 5:35AM, which should be considered as final time. Should we execute the new Message at 5:30AM or 5:35AM?

could you provide an example of it? i didn't understand how to implement it.

interface MessageRepository {
    Long getLatestEnqueueAt(String messageId);
}
 
class UniqueMessageProcessor implements MessageProcessor{
  @Autowired MessageRepository messageRepository;
  @Override 
  boolean process(Object message, RqueueMessage rqueueMessage){
     if(message instanceof SimpleMessage){
       // here you can get id using tags
      String messageId = ((SimpleMessage)message).getId();
      Long latestEnqueueTime= messageRepository.getLatestEnqueueAt(messageId) ;
      if( latestEnqueueTime != null && latestEnqueueTime > rqueueMessage.getQueuedTime() ) {
          return false; 
      }
    }
   return true;
  }
}

class RqueueConfiguration{
 
 @Bean
  public MessageProcessor preExecutorMessageProcessor(){
     return new UniqueMessageProcessor()
  }

  @Bean
  public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(){
      SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); 
       MessageProcessor preExecutorMessageProcessor = preExecutorMessageProcessor();
     factory.setPreExecutionMessageProcessor(preExecutorMessageProcessor);
    return factory;
  } 
}

I'll add this in the wiki in a new section "Message deduplication section".

#25 (comment)

@sonus21
Copy link
Owner Author

sonus21 commented Jul 17, 2020

Well in our case, i believe that the most logical use case is Cancel the existing chain and REPLACE it with the new one.
This means, if we post a message M1 with unique tag M_USER_BOB_TAG at 05:20 and needs to run in 05:30 (after 10 minutes). and in 05:25 we decided to queue a new message M2 with the same unique tag M_USER_BOB_TAG. since there's a message queued already, we remove it instantly (moving it to trash-queue, which is internal preconfigured queue, in 05:25 ) and schedule the message M2 which should run in 05:35.

With this being said, let's say we have a mechanism that allow as to fetch a message details with a specific unique tag: M_USER_BOB_TAG:

EXEC_TIME: 05:20 :: rqueueMessageSender.enqueueUniqueIn( uniqueTag, queue, messageObject, time, timeUnit); // enqueue unique  message "M1" with unique tag. no old messages having this uniqueTag, so we just enqueue it.
EXEC_TIME: 05:23 :: messageRepository.getUniqueMessage(uniqueTag); // should return "M1" message with his details and execution time of 05:30.
EXEC_TIME: 05:25 :: rqueueMessageSender.enqueueUniqueIn( uniqueTag, queue, messageObject, time, timeUnit); // enqueue unique  message with unique tag. having old message having this uniqueTag, we cancel/ move it to "trash-queue" instantly, and enqueue this new message "M2".
EXEC_TIME: 05:28 :: messageRepository.getUniqueMessage(uniqueTag); // should return "M2" message with his details and execution time of 05:35.
EXEC_TIME: 05:35 :: // the job is executed
EXEC_TIME: 05:40 :: messageRepository.getUniqueMessage(uniqueTag); // return null, since no job are enqueued with this unique tag.

Following the Android Work Manager Unique Work policies, it support 3 conflict resolution policies:

  • Cancel the existing chain and REPLACE it with the new one.
  • KEEP the existing sequence and ignore your new request.
  • APPEND your new sequence to the existing one, running the new sequence's first task after the existing sequence's last task finishes. You cannot use APPEND with PeriodicWorkRequests.

Since RQueue is a server implementation which should handle/have access to data directly, i believe that the most logical use case is by replacing old messages by the new one. since enqueueing same job messages comes after fresh data check which is not the same case with Android Work Manager. (... let's see if other users might address different use cases in the future that requires old queued unique jobs mandatory execution...)

As i also recommend adding the feature of fetching queued messages info by these 2 methods: messageRepository.getUniqueMessage(uniqueTag) and messageRepository.getMessage(tag) they're really helpful. :)

for the preExecutorMessageProcessor, i will test it later. thanks for providing this implementation :)

Originally posted by @chlegou in #25 (comment)

@chlegou
Copy link

chlegou commented Jul 17, 2020

it's great that you moved it to a new thread :)

sonus21 added a commit that referenced this issue Aug 23, 2020
* Provide a way to add HTTP proxy for external api calls #34
* Provide a way to fetch messages based on the id and queue name
* Provide api to delete any enqueued messages #33
sonus21 added a commit that referenced this issue Sep 16, 2020
* Provide a way to enqueue unique messages #27
* Provide a way to add HTTP proxy for external api calls #34
* Provide a way to fetch messages based on the id and queue name
* Provide api to delete any enqueued messages #33
* Enqueue a list of Objects #37
* Do not delete queues if mode is PRODUCER #24
* Add csp meta data #38
@sonus21
Copy link
Owner Author

sonus21 commented Sep 16, 2020

Version 2.1.0-RELEASE have addressed this.

@sonus21 sonus21 closed this as completed Sep 16, 2020
@chlegou
Copy link

chlegou commented Sep 17, 2020

Thanks, i will test it out.

@sonus21
Copy link
Owner Author

sonus21 commented Apr 30, 2021

Hi @chlegou
It's a long time, I hope you're doing well.. How's Rqueue doing in production? I would love to get your feedback

@chlegou
Copy link

chlegou commented Apr 30, 2021 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants