Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle duplicated messages? #111

Closed
sharshenov opened this issue Apr 5, 2015 · 9 comments
Closed

How to handle duplicated messages? #111

sharshenov opened this issue Apr 5, 2015 · 9 comments

Comments

@sharshenov
Copy link
Contributor

As said in RMQ documentation:

In the event of network failure (or a node crashing), messages can be duplicated, and consumers must be prepared to handle them. If possible, the simplest way to handle this is to ensure that your consumers handle messages in an idempotent way rather than explicitly deal with deduplication.
If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one). This is a hint that a consumer may have seen this message before (although that's not guaranteed, the message may have made it out of the broker but not into a consumer before the connection dropped). Conversely if the redelivered flag is not set then it is guaranteed that the message has not been seen before. Therefore if a consumer finds it more expensive to deduplicate messages or process them in an idempotent manner, it can do this only for messages with the redelivered flag set.

There is some scenarios where consumers can not handle messages in a idempotent way (email delivery, file operations, etc). redelivered flag is not a silver bullet, I think. In order to perform deduplication the consumer should have a list of recent messages and reject a duplicated messages. All workers of the consumer should have an access to this list. I think, the best way is to use Redis for this purpose.

Ideally, there should be some option for Sneakers to enable messages check for uniqueness before processing them.

class MyWorker
  include Sneakers::Worker

  from_queue :email, unique: { timeout: 30.minutes }

  def work msg
    MyJobProcessor.new(msg).perform

    ack!
  end

end

What do you think?

@jondot
Copy link
Owner

jondot commented Apr 8, 2015

I think you're right. Redis with HyperLogLog should be great in this case. I have a lock/counting service I need to open source in like ages (half a year now), it would be ideal for this, and you would access it via a RESTful interface. This would also not be Sneakers specific so you can use it out-of-band with other processes in your architecture.
Your requirement might give me the extra urgency to prioritize open sourcing it (right now I have documentation left).
What do you think?

@sharshenov
Copy link
Contributor Author

I'm not familiar with HyperLogLog algorythm yet. But after some googling, I agree that it's can help a lot.
If I understand you correctly, messages deduplication system based on your project will require HTTP-server running somewhere. Maybe the direct redis connection is a better way?

@michaelklishin
Copy link
Collaborator

Note that HyperLogLog (and bloom filters) are approximations. Any strategy you choose will require having IDs on all messages: is this the case for Sneakers today?

@sharshenov
Copy link
Contributor Author

Another thought:

The only thing we need is to ensure that message is unique for some (n.minutes.ago)..Time.now period. So we can create a new record in the Redis with a key Digest::SHA1.hexdigest([worker_class_name, msg].join) and value foo with expiration Time.now + n.minutes. Job preprocessor can check an incoming message for uniqueness by a single request to Redis. If message is unique - create a new record in a list, if not - reject a message.

@justinmills
Copy link
Contributor

Could you use the correlationId that RabbitMQ already puts on each message to uniquely identify it? It would also allow for the case where msg is identical between two distinct messages.

If you were to extend Sneakers with some functionality supplied by Redis, I could also see adding support for running workers in some sort of isolation, similar to Resque's lonely_job (https://github.com/wallace/resque-lonely_job). In fact some ideas, such as the "key", might be useful even for the duplicate message case. The default could be to use correlationId, but you might want to override it to pick a specific value to "unique" upon, be in the msg itself, or some subset of it.

I find both of these use cases come up quite a lot, but more and more I'm leaning towards solving these outside of the queuing system as each of these techniques winds up causing bottlenecks and other performance related issues at the queue level.

@sharshenov
Copy link
Contributor Author

I'm totally agree that this functionality should be implemented somewhere outside of Sneakers.
But duplication checks should be executed before worker perform the job. May be here?

Oh. Here is another feature request: callbacks. It would be great to have ability do run some code before and after job processed. Is this feature a case for Sneakers?

@jondot
Copy link
Owner

jondot commented Apr 9, 2015

Yup, @michaelklishin is correct - HLL is an approximation. It will suit cases where you don't want to perform a redundant job - because you don't want to "pay" for it, and if you did perform a redudant job approx %0.18 of the times - it can be tolerated.
Otherwise exact IDs should be traced, always.

Callbacks and lonley_job are interesting - for a long while now I was wanting to also provide a generic base worker types such as:

RedisWorker
CountingWorker
etc.
a CallbackWorker may be due in this model. What do you think?

@sharshenov
Copy link
Contributor Author

That would be great.

@sharshenov
Copy link
Contributor Author

There is my temporary workaround with forever-alone gem:

class MyWorker
  include Sneakers::Worker

  from_queue :email

  def work msg
    begin
      ForeverAlone.new(msg, 30.minutes).ensure
      MyJobProcessor.new(msg).perform
      ack!
    rescue ForeverAlone::MessageIsNotUnique => ex
      reject!
    end
  end

end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants