Skip to content

Commit

Permalink
Speedup iterating over WorkSet (#5559)
Browse files Browse the repository at this point in the history
  • Loading branch information
fatkodima committed Oct 3, 2022
1 parent d424e45 commit 7037533
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1103,24 +1103,31 @@ class WorkSet

def each(&block)
results = []
procs = nil
all_works = nil

Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
procs.sort.each do |key|
valid, workers = conn.pipelined { |pipeline|
pipeline.exists?(key)
procs = conn.sscan_each("processes").to_a.sort

all_works = conn.pipelined do |pipeline|
procs.each do |key|
pipeline.hgetall("#{key}:work")
}
next unless valid
workers.each_pair do |tid, json|
hsh = Sidekiq.load_json(json)
p = hsh["payload"]
# avoid breaking API, this is a side effect of the JSON optimization in #4316
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
results << [key, tid, hsh]
end
end
end

procs.zip(all_works).each do |key, workers|
workers.each_pair do |tid, json|
next if json.empty?

hsh = Sidekiq.load_json(json)
p = hsh["payload"]
# avoid breaking API, this is a side effect of the JSON optimization in #4316
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
results << [key, tid, hsh]
end
end

results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end

Expand Down

0 comments on commit 7037533

Please sign in to comment.