Skip to content

Loading…

Failing spec demonstrating a bug with recurring jobs of diff priority #62

Open
wants to merge 2 commits into from

2 participants

@myronmarston
Moz member

This is a bug we ran into due to the changes in seomoz/qless-core@bea9915. It's very subtle, so an example will demonstrate best:

  • Recurring jobs high_pri and low_pri are enqueued with the same recurrence schedule, but different priorities.
  • Previously, queue.pop would always pop a high_pri job before a low_pri job, as it should.
  • With the change in that commit, queue.pop only spawns 1 job from the recurring queue to the work queue. Given that the recurring queue is a sorted set scored by a timestamp indicating "when to spawn a job next", there's no guarantee about which of the two jobs will be the first one returned by the limitedzrangebyscore command used in pop. Thus, the low_pri job can be moved to the work queue first, and be popped first.

I'm not sure what the fix should be for this but I came up with a failing spec.

Let me know if you have any questions, @dlecocq.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 30 additions and 0 deletions.
  1. +30 −0 spec/integration/qless_spec.rb
View
30 spec/integration/qless_spec.rb
@@ -6,6 +6,8 @@
# Spec stuff
require 'spec_helper'
+require 'timecop'
+
module Qless
describe Client, :integration do
let(:queue) { client.queues['foo'] }
@@ -55,6 +57,34 @@ module Qless
client.deregister_workers(queue.worker_name)
expect(client.workers.counts).to eq({})
end
+
+ it 'pops a high pri job before a low pri job when they recur at the same moment' do
+ # some arbitrary time
+ time = Time.iso8601('2014-08-01T00:00:00Z')
+
+ Timecop.freeze(time) do
+ interval = 10
+
+ enqueue_with_priority = lambda do |priority, jid|
+ queue.recur(Qless::Job, {}, interval,
+ :jid => jid, :offset => (interval / 2),
+ :priority => priority)
+ end
+
+ enqueue_with_priority[10, "low_pri"]
+ enqueue_with_priority[1000, "high_pri"]
+ enqueue_with_priority[100, "med_pri"]
+ end
+
+ Timecop.freeze(time + 10) do
+ # Note: you can make this spec pass by uncommenting this line
+ # q.peek(3)
+
+ queue.pop.jid.should include('high_pri')
+ queue.pop.jid.should include('med_pri')
+ queue.pop.jid.should include('low_pri')
+ end
+ end
end
it 'exposes tracking jobs' do
Something went wrong with that request. Please try again.