Skip to content

Commit

Permalink
Fit all stats into 2 redis pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelga committed Jan 17, 2015
1 parent 2207226 commit 20c616e
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 87 deletions.
155 changes: 86 additions & 69 deletions lib/sidekiq/api.rb
Expand Up @@ -3,28 +3,89 @@

module Sidekiq
class Stats
def initialize
fetch_stats!
end

def processed
stat 'processed'
stat :processed
end

def failed
stat 'failed'
stat :failed
end

def scheduled_size
stat 'scheduled_size'
stat :scheduled_size
end

def retry_size
stat 'retry_size'
stat :retry_size
end

def dead_size
stat 'dead_size'
stat :dead_size
end

def enqueued
stat :enqueued
end

def processes_size
stat :processes_size
end

def workers_size
stat :workers_size
end

def default_queue_latency
stat :default_queue_latency
end

def fetch_stats!
stats
pipe1_res = Sidekiq.redis do |conn|
conn.pipelined do
conn.get('stat:processed')
conn.get('stat:failed')
conn.zcard('schedule')
conn.zcard('retry')
conn.zcard('dead')
conn.scard('processes')

conn.lrange("queue:default", -1, -1)
conn.smembers('processes')
conn.smembers('queues'.freeze)
end
end

pipe2_res = Sidekiq.redis do |conn|
conn.pipelined do
pipe1_res[7].each {|key| conn.hget(key, 'busy') }
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
end
end

workers_size = pipe2_res.pop(pipe1_res[7].size).map(&:to_i).inject(0, &:+)
enqueued = pipe2_res.pop(pipe1_res[8].size).map(&:to_i).inject(0, &:+)

default_queue_latency = if (entry = pipe1_res[6].first)
Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
failed: pipe1_res[1].to_i,
scheduled_size: pipe1_res[2],
retry_size: pipe1_res[3],
dead_size: pipe1_res[4],
processes_size: pipe1_res[5],

default_queue_latency: default_queue_latency,
workers_size: workers_size,
enqueued: enqueued
}
end

def reset(*stats)
Expand All @@ -41,77 +102,33 @@ def reset(*stats)
end
end

def queues
Sidekiq.redis do |conn|
queues = conn.smembers('queues'.freeze)

lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end

i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }

Hash[array_of_arrays.reverse]
end
end

def enqueued
queues.values.inject(&:+) || 0
end

private

def stats(*only)
all = %w(processed failed queues enqueued scheduled_size retry_size dead_size)
metrics = only.any? ? only : all

if @all_stats
@all_stats.slice(*metrics)
else
if metrics == all
@all_stats = load_stats(*all)
else
load_stats(*metrics)
end
end
def stat(s)
@stats[s]
end

def stat(s)
stats(s)[s]
end

def load_stats(*metrics)
read_pipelined = %w(processed failed scheduled_size retry_size dead_size) & metrics

loaded_stats = {}
if read_pipelined.any?
results = Sidekiq.redis do |conn|
conn.pipelined do
read_pipelined.each do |key|
case key
when 'processed' then conn.get('stat:processed')
when 'failed' then conn.get('stat:failed')
when 'scheduled_size' then conn.zcard('schedule')
when 'retry_size' then conn.zcard('retry')
when 'dead_size' then conn.zcard('dead')
end
class Queues
def lengths
Sidekiq.redis do |conn|
queues = conn.smembers('queues'.freeze)

lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end
end
read_pipelined.zip(results).each {|metric, v| loaded_stats[metric] = v.to_i }
end

(metrics - read_pipelined).each do |metric|
loaded_stats[metric] = public_send(metric)
i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }

Hash[array_of_arrays.reverse]
end
end
loaded_stats
end

class History
Expand Down
24 changes: 11 additions & 13 deletions lib/sidekiq/web.rb
Expand Up @@ -210,33 +210,31 @@ def custom_tabs

get '/stats' do
sidekiq_stats = Sidekiq::Stats.new
sidekiq_stats.fetch_stats!
queue = Sidekiq::Queue.new
redis_stats = redis_info.select { |k, v| REDIS_KEYS.include? k }

content_type :json
Sidekiq.dump_json(
sidekiq: {
processed: sidekiq_stats.processed,
failed: sidekiq_stats.failed,
busy: workers.size,
processes: processes.size,
enqueued: sidekiq_stats.enqueued,
scheduled: sidekiq_stats.scheduled_size,
retries: sidekiq_stats.retry_size,
dead: sidekiq_stats.dead_size,
default_latency: queue.latency
processed: sidekiq_stats.processed,
failed: sidekiq_stats.failed,
busy: sidekiq_stats.workers_size,
processes: sidekiq_stats.processes_size,
enqueued: sidekiq_stats.enqueued,
scheduled: sidekiq_stats.scheduled_size,
retries: sidekiq_stats.retry_size,
dead: sidekiq_stats.dead_size,
default_latency: sidekiq_stats.default_queue_latency
},
redis: redis_stats
)
end

get '/stats/queues' do
stats = Sidekiq::Stats.new
queue_stats = Sidekiq::Stats::Queues.new

content_type :json
Sidekiq.dump_json(
stats.queues
queue_stats.lengths
)
end

Expand Down
10 changes: 5 additions & 5 deletions test/test_api.rb
Expand Up @@ -77,8 +77,8 @@ class TestApi < Sidekiq::Test

describe "queues" do
it "is initially empty" do
s = Sidekiq::Stats.new
assert_equal 0, s.queues.size
s = Sidekiq::Stats::Queues.new
assert_equal 0, s.lengths.size
end

it "returns a hash of queue and size in order" do
Expand All @@ -90,9 +90,9 @@ class TestApi < Sidekiq::Test
conn.sadd 'queues', 'bar'
end

s = Sidekiq::Stats.new
assert_equal ({ "foo" => 1, "bar" => 3 }), s.queues
assert_equal "bar", s.queues.first.first
s = Sidekiq::Stats::Queues.new
assert_equal ({ "foo" => 1, "bar" => 3 }), s.lengths
assert_equal "bar", s.lengths.first.first
end
end

Expand Down

0 comments on commit 20c616e

Please sign in to comment.