diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 9a6dc84e2..e65d87e88 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -3,28 +3,89 @@ module Sidekiq class Stats + def initialize + fetch_stats! + end + def processed - stat 'processed' + stat :processed end def failed - stat 'failed' + stat :failed end def scheduled_size - stat 'scheduled_size' + stat :scheduled_size end def retry_size - stat 'retry_size' + stat :retry_size end def dead_size - stat 'dead_size' + stat :dead_size + end + + def enqueued + stat :enqueued + end + + def processes_size + stat :processes_size + end + + def workers_size + stat :workers_size + end + + def default_queue_latency + stat :default_queue_latency end def fetch_stats! - stats + pipe1_res = Sidekiq.redis do |conn| + conn.pipelined do + conn.get('stat:processed') + conn.get('stat:failed') + conn.zcard('schedule') + conn.zcard('retry') + conn.zcard('dead') + conn.scard('processes') + + conn.lrange("queue:default", -1, -1) + conn.smembers('processes') + conn.smembers('queues'.freeze) + end + end + + pipe2_res = Sidekiq.redis do |conn| + conn.pipelined do + pipe1_res[7].each {|key| conn.hget(key, 'busy') } + pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") } + end + end + + workers_size = pipe2_res.pop(pipe1_res[7].size).map(&:to_i).inject(0, &:+) + enqueued = pipe2_res.pop(pipe1_res[8].size).map(&:to_i).inject(0, &:+) + + default_queue_latency = if (entry = pipe1_res[6].first) + Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'] + else + 0 + end + @stats = { + processed: pipe1_res[0].to_i, + failed: pipe1_res[1].to_i, + scheduled_size: pipe1_res[2], + retry_size: pipe1_res[3], + dead_size: pipe1_res[4], + processes_size: pipe1_res[5], + + default_queue_latency: default_queue_latency, + workers_size: workers_size, + enqueued: enqueued + } end def reset(*stats) @@ -41,77 +102,33 @@ def reset(*stats) end end - def queues - Sidekiq.redis do |conn| - queues = conn.smembers('queues'.freeze) - - lengths = conn.pipelined do - queues.each do |queue| - conn.llen("queue:#{queue}") - end - end - - i = 0 - array_of_arrays = queues.inject({}) do |memo, queue| - memo[queue] = lengths[i] - i += 1 - memo - end.sort_by { |_, size| size } - - Hash[array_of_arrays.reverse] - end - end - - def enqueued - queues.values.inject(&:+) || 0 - end - private - def stats(*only) - all = %w(processed failed queues enqueued scheduled_size retry_size dead_size) - metrics = only.any? ? only : all - - if @all_stats - @all_stats.slice(*metrics) - else - if metrics == all - @all_stats = load_stats(*all) - else - load_stats(*metrics) - end - end + def stat(s) + @stats[s] end - def stat(s) - stats(s)[s] - end - - def load_stats(*metrics) - read_pipelined = %w(processed failed scheduled_size retry_size dead_size) & metrics - - loaded_stats = {} - if read_pipelined.any? - results = Sidekiq.redis do |conn| - conn.pipelined do - read_pipelined.each do |key| - case key - when 'processed' then conn.get('stat:processed') - when 'failed' then conn.get('stat:failed') - when 'scheduled_size' then conn.zcard('schedule') - when 'retry_size' then conn.zcard('retry') - when 'dead_size' then conn.zcard('dead') - end + class Queues + def lengths + Sidekiq.redis do |conn| + queues = conn.smembers('queues'.freeze) + + lengths = conn.pipelined do + queues.each do |queue| + conn.llen("queue:#{queue}") end end - end - read_pipelined.zip(results).each {|metric, v| loaded_stats[metric] = v.to_i } - end - (metrics - read_pipelined).each do |metric| - loaded_stats[metric] = public_send(metric) + i = 0 + array_of_arrays = queues.inject({}) do |memo, queue| + memo[queue] = lengths[i] + i += 1 + memo + end.sort_by { |_, size| size } + + Hash[array_of_arrays.reverse] + end end - loaded_stats end class History diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index 74533eca0..c7d10f78b 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -210,33 +210,31 @@ def custom_tabs get '/stats' do sidekiq_stats = Sidekiq::Stats.new - sidekiq_stats.fetch_stats! - queue = Sidekiq::Queue.new redis_stats = redis_info.select { |k, v| REDIS_KEYS.include? k } content_type :json Sidekiq.dump_json( sidekiq: { - processed: sidekiq_stats.processed, - failed: sidekiq_stats.failed, - busy: workers.size, - processes: processes.size, - enqueued: sidekiq_stats.enqueued, - scheduled: sidekiq_stats.scheduled_size, - retries: sidekiq_stats.retry_size, - dead: sidekiq_stats.dead_size, - default_latency: queue.latency + processed: sidekiq_stats.processed, + failed: sidekiq_stats.failed, + busy: sidekiq_stats.workers_size, + processes: sidekiq_stats.processes_size, + enqueued: sidekiq_stats.enqueued, + scheduled: sidekiq_stats.scheduled_size, + retries: sidekiq_stats.retry_size, + dead: sidekiq_stats.dead_size, + default_latency: sidekiq_stats.default_queue_latency }, redis: redis_stats ) end get '/stats/queues' do - stats = Sidekiq::Stats.new + queue_stats = Sidekiq::Stats::Queues.new content_type :json Sidekiq.dump_json( - stats.queues + queue_stats.lengths ) end diff --git a/test/test_api.rb b/test/test_api.rb index fa58d5ddf..349245175 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -77,8 +77,8 @@ class TestApi < Sidekiq::Test describe "queues" do it "is initially empty" do - s = Sidekiq::Stats.new - assert_equal 0, s.queues.size + s = Sidekiq::Stats::Queues.new + assert_equal 0, s.lengths.size end it "returns a hash of queue and size in order" do @@ -90,9 +90,9 @@ class TestApi < Sidekiq::Test conn.sadd 'queues', 'bar' end - s = Sidekiq::Stats.new - assert_equal ({ "foo" => 1, "bar" => 3 }), s.queues - assert_equal "bar", s.queues.first.first + s = Sidekiq::Stats::Queues.new + assert_equal ({ "foo" => 1, "bar" => 3 }), s.lengths + assert_equal "bar", s.lengths.first.first end end