API

Scott Watermasysk edited this page Nov 21, 2016 · 36 revisions

Sidekiq has a public API allowing access to real-time information about workers, queues and jobs. See sidekiq/api for RDoc. Require the API to get access to all of the functionality described below.

require 'sidekiq/api'

The Web UI uses the API exclusively - anything you can do in the UI can be scripted with the API.

Queue

Get all queues

Sidekiq::Queue.all

Get a queue

Sidekiq::Queue.new # the "default" queue
Sidekiq::Queue.new("mailer")

Gets the number of jobs within a queue.

Sidekiq::Queue.new.size # => 4

Deletes all Jobs in a Queue, by removing the queue.

Sidekiq::Queue.new.clear

Deletes jobs within the queue mailer with a jid of 'abcdef1234567890'

queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
  job.args # => [1, 2, 3]
  job.delete if job.jid == 'abcdef1234567890'
end

Calculate the latency (in seconds) of a queue (now - when the oldest job was enqueued):

> Sidekiq::Queue.new.latency
14.5

Find a job by JID (WARNING: this is very inefficient if your queue is big!)

> Sidekiq::Queue.new.find_job(somejid)

Named Queues

Scheduled

The scheduled sorted set holds all scheduled jobs in chronologically-sorted order. There's much more in the code, see sidekiq/api for more detail.

ss = Sidekiq::ScheduledSet.new
ss.size
ss.clear

Allows enumeration of scheduled jobs within Sidekiq. Based on this, you can search/filter for jobs. Here's an example where I'm selecting all jobs of a certain type and deleting them from the scheduled queue.

ss = Sidekiq::ScheduledSet.new
jobs = ss.select {|retri| retri.klass == 'SomeWorker' }
jobs.each(&:delete)

Retries

When a job raises an error, Sidekiq places it in the RetrySet for automatic retry later. Jobs are sorted based on when they will next retry.

rs = Sidekiq::RetrySet.new
rs.size
rs.clear

Allows enumeration of retries within Sidekiq. Based on this, you can search/filter for jobs. Here's an example where I'm selecting all jobs of a certain type and deleting them from the retry queue.

query = Sidekiq::RetrySet.new
query.select do |job|
  job.klass == 'Sidekiq::Extensions::DelayedClass' &&
    # For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
    # the context is serialized to YAML, and must
    # be deserialized to get to the original args
    ((klass, method, args) = YAML.load(job.args[0])) &&
    klass == User &&
    method == :setup_new_subscriber
end.map(&:delete)

Dead

Like RetrySet and ScheduledSet, the DeadSet holds all jobs considered dead by Sidekiq, ordered by when they died. It supports the same basic operations as the others.

ds = Sidekiq::DeadSet.new
ds.size
ds.clear

Processes

Sidekiq::ProcessSet gets you access to near real-time (updated every 5 sec) info about the current set of Sidekiq processes running. You can remotely control the processes also:

ps = Sidekiq::ProcessSet.new
ps.size # => 2
ps.each do |process|
  p process['busy']     # => 3
  p process['hostname'] # => 'myhost.local'
  p process['pid']      # => 16131
end
ps.each(&:quiet!) # equivalent to the USR1 signal
ps.each(&:stop!) # equivalent to the TERM signal

Remote control can be useful in situations where signals are not supported: Windows, JRuby and the JVM or Heroku for instance.

Workers

Programmatic access to the current active worker set for all Sidekiq processes. A 'worker' is defined as a thread currently processing a job.

WARNING:

This is live data that changes every millisecond. If you call #size => 5 and then expect #each to be called 5 times, you're going to have a bad time.

workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
  # process_id is a unique identifier per Sidekiq process
  # thread_id is a unique identifier per thread
  # work is a Hash which looks like:
  # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
  # run_at is an epoch Integer.
  # payload is a Hash which looks like:
  # { 'retry' => true,
  #   'queue' => 'default',
  #   'class' => 'Redacted',
  #   'args' => [1, 2, 'foo'],
  #   'jid' => '80b1e7e46381a20c0c567285',
  #   'enqueued_at' => 1427811033.2067106 }
end

Stats

Various stats about your Sidekiq installation.

stats = Sidekiq::Stats.new
stats.processed # => 100
stats.failed # => 3
stats.queues # => { "default" => 1001, "email" => 50 }

Gets the number of jobs enqueued in all queues (does NOT include retries and scheduled jobs).

stats.enqueued # => 5 

Stats History

All dates are UTC and history stats are cleared after 5 years.

Get history of failed/processed stats:

s = Sidekiq::Stats::History.new(2) # Indicates how many days of data you want starting from today (UTC)
s.failed # => { "2012-12-05" => 120, "2012-12-04" => 234 }
s.processed # => { "2012-12-05" => 1010, "2012-12-04" => 1500 }

Start from a different date:

s = Sidekiq::Stats::History.new( 3, Date.parse("2012-12-3") )
s.failed # => { "2012-12-03" => 10, "2012-12-02" => 24, "2012-12-01" => 4 }
s.processed # => { "2012-12-03" => 124, "2012-12-02" => 345, "2012-12-01" => 355 }

Previous: Internals Next: Middleware