diff --git a/Changes.md b/Changes.md index 0b10795a8..db2bf616c 100644 --- a/Changes.md +++ b/Changes.md @@ -2,6 +2,7 @@ ----------- - Auto-prune jobs older than one hour from the Workers page [#1508] +- Add Sidekiq::Workers#prune which can perform the auto-pruning. - Fix issue where a job could be lost when an exception occurs updating Redis stats before the job executes [#1511] diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 460b1b69b..1f8a8ac8c 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -424,9 +424,10 @@ def each(&block) Sidekiq.redis do |conn| workers = conn.smembers("workers") workers.each do |w| - msg, time = conn.mget("worker:#{w}", "worker:#{w}:started") - next unless msg - block.call(w, Sidekiq.load_json(msg), time) + json = conn.get("worker:#{w}") + next unless json + msg = Sidekiq.load_json(json) + block.call(w, msg, Time.at(msg['run_at']).to_s) end end end @@ -436,6 +437,36 @@ def size conn.scard("workers") end.to_i end + + # Prune old worker entries from the Busy set. Worker entries + # can be orphaned if Sidekiq hard crashes while processing jobs. + # Default is to delete worker entries older than one hour. + # + # Returns the number of records removed. + def prune(older_than=60*60) + to_rem = [] + Sidekiq.redis do |conn| + conn.smembers('workers').each do |w| + msg = conn.get("worker:#{w}") + if !msg + to_rem << w + else + m = Sidekiq.load_json(msg) + run_at = Time.at(m['run_at']) + # prune jobs older than one hour + if run_at < (Time.now - older_than) + to_rem << w + else + end + end + end + end + + if to_rem.size > 0 + Sidekiq.redis { |conn| conn.srem('workers', to_rem) } + end + to_rem.size + end end end diff --git a/lib/sidekiq/web_helpers.rb b/lib/sidekiq/web_helpers.rb index 3acecf2bb..36f02102c 100644 --- a/lib/sidekiq/web_helpers.rb +++ b/lib/sidekiq/web_helpers.rb @@ -47,38 +47,11 @@ def workers_size end end - MAX_JOB_DURATION = 60*60 - def workers @workers ||= begin - to_rem = [] - workers = Sidekiq.redis do |conn| - conn.smembers('workers').map do |w| - msg = conn.get("worker:#{w}") - if !msg - to_rem << w - nil - else - m = Sidekiq.load_json(msg) - run_at = Time.at(m['run_at']) - # prune jobs older than one hour - if run_at < (Time.now - MAX_JOB_DURATION) - to_rem << w - nil - else - [w, m, run_at] - end - end - end.compact.sort { |x| x[1] ? -1 : 1 } - end - - # Detect and clear out any orphaned worker records. - # These can be left in Redis if Sidekiq crashes hard - # while processing jobs. - if to_rem.size > 0 - Sidekiq.redis { |conn| conn.srem('workers', to_rem) } + Sidekiq::Workers.new.tap do |w| + w.prune end - workers end end diff --git a/test/test_api.rb b/test/test_api.rb index c08e30315..e8ec16ce8 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -362,6 +362,22 @@ class ApiWorker assert_equal 'default', y['queue'] assert_equal Time.now.year, DateTime.parse(z).year end + + s = '12346' + data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) }) + Sidekiq.redis do |c| + c.multi do + c.sadd('workers', s) + c.set("worker:#{s}", data) + c.set("worker:#{s}:started", Time.now.to_s) + c.sadd('workers', '123457') + end + end + + assert_equal 3, w.size + count = w.prune + assert_equal 1, w.size + assert_equal 2, count end it 'can reschedule jobs' do diff --git a/web/views/_workers.erb b/web/views/_workers.erb index d58966903..2e35d6b3e 100644 --- a/web/views/_workers.erb +++ b/web/views/_workers.erb @@ -16,7 +16,7 @@
<%= display_args(msg['payload']['args']) %>
- <%= relative_time(run_at) %> + <%= relative_time(run_at.is_a?(String) ? DateTime.parse(run_at) : run_at) %> <% end %>