From 5590669023121733b6bd5a078d7311e5f4cabfdc Mon Sep 17 00:00:00 2001 From: Chris TenHarmsel Date: Sat, 4 Oct 2014 08:36:19 -0500 Subject: [PATCH] Added method to Util module to clean up process records that don't have a matching heartbeat record, indicating that they're no longer alive. Added call to this new util method in the Schedule::Poller.poll_interval method because the number of live processes is used as a multiplier for the default wait interval. Since the value for poll_interval is memoized this call to 'cleanup_dead_process_records' should only be called once at startup. --- lib/sidekiq/scheduled.rb | 1 + lib/sidekiq/util.rb | 16 +++++++++++ test/test_util.rb | 58 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) create mode 100644 test/test_util.rb diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 971470661..39e9b4351 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -75,6 +75,7 @@ def poll(first_time=false) # We only do this if poll_interval is unset (the default). def poll_interval Sidekiq.options[:poll_interval] ||= begin + cleanup_dead_process_records pcount = Sidekiq.redis {|c| c.scard('processes') } || 1 pcount * 15 end diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 1b3d3f944..6bd9bb799 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -44,5 +44,21 @@ def fire_event(event) end end + # Cleans up dead processes recorded in Redis. + def cleanup_dead_process_records + Sidekiq.redis do |conn| + procs = conn.smembers('processes').sort + heartbeats = conn.pipelined do + procs.each do |key| + conn.hget(key, 'beat') + end + end + + heartbeats.each_with_index do |beat, i| + conn.srem('processes', procs[i]) if beat.nil? + end + end + end + end end diff --git a/test/test_util.rb b/test/test_util.rb new file mode 100644 index 000000000..2339525a7 --- /dev/null +++ b/test/test_util.rb @@ -0,0 +1,58 @@ +require 'helper' +require 'sidekiq/util' + +class TestUtil < Sidekiq::Test + class UtilClass + include Sidekiq::Util + end + + describe 'util' do + before do + @orig_redis = Sidekiq.redis_pool + Sidekiq.redis = REDIS + Sidekiq.redis { |conn| conn.flushdb } + end + + after do + Sidekiq.redis = @orig_redis + end + + # In real code that manages the hash sets for process keys + # sets their expiration time to 60 seconds, so processes + # who don't have a set under their name are considered 'dead' + # because they haven't reported in + describe '#cleanup_dead_process_records' do + before do + # Set up some live and dead processes + @live_members = ['localhost-123', 'localhost-125'] + @dead_members = ['localhost-124'] + + Sidekiq.redis do |conn| + conn.sadd('processes', @live_members + @dead_members) + # Add Heartbeats for the live processes + @live_members.each do |m| + conn.hset(m, 'beat', Time.now.to_f) + end + end + + @util = UtilClass.new + end + + after do + Sidekiq.redis do |conn| + conn.srem('processes', @live_members + @dead_members) + @live_members.each do |m| + conn.hdel(m, 'beat') + end + end + end + + it "should remove dead process records" do + assert_equal 3, Sidekiq.redis{ |r| r.scard('processes') } + @util.cleanup_dead_process_records + still_alive = Sidekiq.redis{|r| r.smembers('processes')} + assert_equal still_alive.sort, @live_members.sort + end + end + end +end