Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shoryuken does not respect empty_queue_fetch_delay in batch mode #568

Closed
snoopie opened this issue Jun 17, 2019 · 4 comments · Fixed by #569
Closed

Shoryuken does not respect empty_queue_fetch_delay in batch mode #568

snoopie opened this issue Jun 17, 2019 · 4 comments · Fixed by #569

Comments

@snoopie
Copy link
Contributor

snoopie commented Jun 17, 2019

I am implementing a batch processor like this:

class DummyBatchProcessor
  include Shoryuken::Worker

  shoryuken_options body_parser: Oj,
                    auto_delete: true,
                    batch: true,
                    queue: "myqueue"
...

and setting empty queue fetch delay at application startup like this:

      Shoryuken.groups.clear
      Shoryuken.add_group("default",
        configuration.concurrency,
        delay: configuration.empty_queue_fetch_delay)

What is strange is that delay works with normal "one-message" processors, but not when I set it to batch mode.

Is it some kind of a bug? If so, where should I look for the code which needs to be fixed?

Thanks!

📌 #569

@phstc
Copy link
Collaborator

phstc commented Jun 17, 2019

Hi @snoopie

delay with batch should work just fine. If you can debug it, the places you should look for are:

  • here the poller is initiated, and the delay should be configured to the group
  • here is where it tries to load the group delay or fallback to the default one

@snoopie
Copy link
Contributor Author

snoopie commented Jun 17, 2019

Hey @phstc, it seems like it never reaches the code which should pause the queue:
Here return exits the method before it proceeds to @polling_strategy.messages_found, and the reason is that @fetcher.fetch(queue, BATCH_LIMIT) returns empty array.

Does it make sense?

@phstc
Copy link
Collaborator

phstc commented Jun 17, 2019

Hi @snoopie

Thanks for investigating this. It does make sense.

This 👇 is probably the fix, I should have implemented in #376.

def dispatch_batch(queue)
  batch = @fetcher.fetch(queue, BATCH_LIMIT)
  @polling_strategy.messages_found(queue.name, batch.size)
  assign(queue.name, patch_batch!(batch)) if batch.any?
end

Can you PR a fix? ❤️ 🙏

@snoopie
Copy link
Contributor Author

snoopie commented Jun 18, 2019

@phstc Sure thing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants