Skip to content

Commit

Permalink
Rejigger raw_push for Pro's atomic batch fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Aug 31, 2014
1 parent d8a50c6 commit ac69c7e
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions lib/sidekiq/client.rb
Expand Up @@ -176,23 +176,26 @@ def enqueue_in(interval, klass, *args)
private

def raw_push(payloads)
pushed = false
@redis_pool.with do |conn|
if payloads.first['at']
pushed = conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
_, pushed = conn.multi do
conn.sadd('queues', q)
conn.lpush("queue:#{q}", to_push)
end
conn.multi do
atomic_push(conn, payloads)
end
end
pushed
true
end

def atomic_push(conn, payloads)
if payloads.first['at']
conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
conn.sadd('queues', q)
conn.lpush("queue:#{q}", to_push)
end
end

def process_single(worker_class, item)
Expand Down

0 comments on commit ac69c7e

Please sign in to comment.