Skip to content

Commit

Permalink
Merge pull request sidekiq#2376 from mrsimo/rename-enqueued-at-create…
Browse files Browse the repository at this point in the history
…d-at

Use created_at for the time schedule jobs are scheduled
  • Loading branch information
mperham committed Jun 3, 2015
2 parents acf61a6 + 224cfe5 commit 4889ce3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
7 changes: 7 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
3.3.5
-----------

- Set a created_at attribute when jobs are created, and the enqueued_at only
when they go into the queue. Aims to help with latency, specially for
scheduled jobs. [#2373, mrsimo]

3.3.4
-----------

Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,16 @@ def enqueued_at
Time.at(@item['enqueued_at'] || 0).utc
end

def created_at
Time.at(@item['created_at'] || @item['enqueued_at'] || 0).utc
end

def queue
@queue
end

def latency
Time.now.to_f - @item['enqueued_at']
Time.now.to_f - (@item['enqueued_at'] || @item['created_at'])
end

##
Expand Down
7 changes: 5 additions & 2 deletions lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ def atomic_push(conn, payloads)
end)
else
q = payloads.first['queue']
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
to_push = payloads.map do |entry|
entry['enqueued_at'.freeze] ||= Time.now.to_f
Sidekiq.dump_json(entry)
end
conn.sadd('queues'.freeze, q)
conn.lpush("queue:#{q}", to_push)
end
Expand All @@ -217,7 +220,7 @@ def normalize_item(item)
item['class'.freeze] = item['class'.freeze].to_s
item['queue'.freeze] = item['queue'.freeze].to_s
item['jid'.freeze] ||= SecureRandom.hex(12)
item['enqueued_at'.freeze] ||= Time.now.to_f
item['created_at'.freeze] ||= Time.now.to_f
item
end

Expand Down
31 changes: 16 additions & 15 deletions test/test_scheduled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,28 @@ def call(worker_class, message, queue, r)
end

it 'should empty the retry and scheduled queues up to the current time' do
created_time = Time.new(2013, 2, 3)
enqueued_time = Time.new(2013, 2, 4)

Time.stub(:now, enqueued_time) do
@retry.schedule (Time.now - 60).to_f, @error_1
@retry.schedule (Time.now - 50).to_f, @error_2
@retry.schedule (Time.now + 60).to_f, @error_3
@scheduled.schedule (Time.now - 60).to_f, @future_1
@scheduled.schedule (Time.now - 50).to_f, @future_2
@scheduled.schedule (Time.now + 60).to_f, @future_3
Time.stub(:now, created_time) do
@retry.schedule (enqueued_time - 60).to_f, @error_1.merge!('created_at' => created_time.to_f)
@retry.schedule (enqueued_time - 50).to_f, @error_2.merge!('created_at' => created_time.to_f)
@retry.schedule (enqueued_time + 60).to_f, @error_3.merge!('created_at' => created_time.to_f)
@scheduled.schedule (enqueued_time - 60).to_f, @future_1.merge!('created_at' => created_time.to_f)
@scheduled.schedule (enqueued_time - 50).to_f, @future_2.merge!('created_at' => created_time.to_f)
@scheduled.schedule (enqueued_time + 60).to_f, @future_3.merge!('created_at' => created_time.to_f)
end

Time.stub(:now, enqueued_time) do
@poller.poll

Sidekiq.redis do |conn|
assert_equal 1, conn.llen("queue:queue_1")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_1", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_2")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_2", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_4")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_4", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_5")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_5", 0, -1)[0])['enqueued_at']
%w(queue:queue_1 queue:queue_2 queue:queue_4 queue:queue_5).each do |queue_name|
assert_equal 1, conn.llen(queue_name)
job = Sidekiq.load_json(conn.lrange(queue_name, 0, -1)[0])
assert_equal enqueued_time.to_f, job['enqueued_at']
assert_equal created_time.to_f, job['created_at']
end
end

assert_equal 1, @retry.size
Expand Down
9 changes: 9 additions & 0 deletions test/test_scheduling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ def perform(x)
assert Sidekiq::Client.push_bulk('class' => ScheduledWorker, 'args' => [['mike'], ['mike']], 'at' => 600)
@redis.verify
end

it 'removes the enqueued_at field when scheduling' do
@redis.expect :zadd, true do |key, args|
job = Sidekiq.load_json(args.first.last)
job.key?('created_at') && !job.key?('enqueued_at')
end
assert ScheduledWorker.perform_in(1.month, 'mike')
@redis.verify
end
end

end

0 comments on commit 4889ce3

Please sign in to comment.