Skip to content

Commit

Permalink
Add Sidekiq::Workers#prune API to remove orphaned records
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 28, 2014
1 parent 08d277c commit 404069a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 33 deletions.
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -2,6 +2,7 @@
-----------

- Auto-prune jobs older than one hour from the Workers page [#1508]
- Add Sidekiq::Workers#prune which can perform the auto-pruning.
- Fix issue where a job could be lost when an exception occurs updating
Redis stats before the job executes [#1511]

Expand Down
37 changes: 34 additions & 3 deletions lib/sidekiq/api.rb
Expand Up @@ -424,9 +424,10 @@ def each(&block)
Sidekiq.redis do |conn|
workers = conn.smembers("workers")
workers.each do |w|
msg, time = conn.mget("worker:#{w}", "worker:#{w}:started")
next unless msg
block.call(w, Sidekiq.load_json(msg), time)
json = conn.get("worker:#{w}")
next unless json
msg = Sidekiq.load_json(json)
block.call(w, msg, Time.at(msg['run_at']).to_s)
end
end
end
Expand All @@ -436,6 +437,36 @@ def size
conn.scard("workers")
end.to_i
end

# Prune old worker entries from the Busy set. Worker entries
# can be orphaned if Sidekiq hard crashes while processing jobs.
# Default is to delete worker entries older than one hour.
#
# Returns the number of records removed.
def prune(older_than=60*60)
to_rem = []
Sidekiq.redis do |conn|
conn.smembers('workers').each do |w|
msg = conn.get("worker:#{w}")
if !msg
to_rem << w
else
m = Sidekiq.load_json(msg)
run_at = Time.at(m['run_at'])
# prune jobs older than one hour
if run_at < (Time.now - older_than)
to_rem << w
else
end
end
end
end

if to_rem.size > 0
Sidekiq.redis { |conn| conn.srem('workers', to_rem) }
end
to_rem.size
end
end

end
31 changes: 2 additions & 29 deletions lib/sidekiq/web_helpers.rb
Expand Up @@ -47,38 +47,11 @@ def workers_size
end
end

MAX_JOB_DURATION = 60*60

def workers
@workers ||= begin
to_rem = []
workers = Sidekiq.redis do |conn|
conn.smembers('workers').map do |w|
msg = conn.get("worker:#{w}")
if !msg
to_rem << w
nil
else
m = Sidekiq.load_json(msg)
run_at = Time.at(m['run_at'])
# prune jobs older than one hour
if run_at < (Time.now - MAX_JOB_DURATION)
to_rem << w
nil
else
[w, m, run_at]
end
end
end.compact.sort { |x| x[1] ? -1 : 1 }
end

# Detect and clear out any orphaned worker records.
# These can be left in Redis if Sidekiq crashes hard
# while processing jobs.
if to_rem.size > 0
Sidekiq.redis { |conn| conn.srem('workers', to_rem) }
Sidekiq::Workers.new.tap do |w|
w.prune
end
workers
end
end

Expand Down
16 changes: 16 additions & 0 deletions test/test_api.rb
Expand Up @@ -362,6 +362,22 @@ class ApiWorker
assert_equal 'default', y['queue']
assert_equal Time.now.year, DateTime.parse(z).year
end

s = '12346'
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
Sidekiq.redis do |c|
c.multi do
c.sadd('workers', s)
c.set("worker:#{s}", data)
c.set("worker:#{s}:started", Time.now.to_s)
c.sadd('workers', '123457')
end
end

assert_equal 3, w.size
count = w.prune
assert_equal 1, w.size
assert_equal 2, count
end

it 'can reschedule jobs' do
Expand Down
2 changes: 1 addition & 1 deletion web/views/_workers.erb
Expand Up @@ -16,7 +16,7 @@
<td>
<div class="args"><%= display_args(msg['payload']['args']) %></div>
</td>
<td><%= relative_time(run_at) %></td>
<td><%= relative_time(run_at.is_a?(String) ? DateTime.parse(run_at) : run_at) %></td>
</tr>
<% end %>
</table>

0 comments on commit 404069a

Please sign in to comment.