Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mperham/sidekiq
Browse files Browse the repository at this point in the history
  • Loading branch information
autouncle-ci-user committed Sep 7, 2012
2 parents fb53a27 + 941098a commit 07930b8
Show file tree
Hide file tree
Showing 45 changed files with 6,642 additions and 3,484 deletions.
14 changes: 13 additions & 1 deletion Changes.md
@@ -1,8 +1,20 @@
HEAD
2.3.0
-----------

- Upgrade Celluloid to 0.12
- Upgrade Twitter Bootstrap to 2.1.0
- Rescue more Exceptions
- Change Job ID to be Hex, rather than Base64, for HTTP safety
- Use `Airbrake#notify_or_ignore`

2.2.1
-----------

- Add support for custom tabs to Sidekiq::Web [#346]
- Change capistrano recipe to run 'quiet' before deploy:update\_code so
it is run upon both 'deploy' and 'deploy:migrations'. [#352]
- Rescue Exception rather than StandardError to catch and log any sort
of Processor death.

2.2.0
-----------
Expand Down
3 changes: 1 addition & 2 deletions Gemfile
@@ -1,9 +1,8 @@
source 'http://rubygems.org'
gemspec

gem 'celluloid'
gem 'celluloid', "~> 0.12.0"
gem 'slim'
gem 'sprockets'
gem 'sass'
gem 'rails', '3.2.8'
gem 'sqlite3'
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -7,8 +7,8 @@ Sidekiq
Simple, efficient message processing for Ruby.

Sidekiq uses threads to handle many messages at the same time in the
same process. It integrates tightly with Rails 3 to make background
message processing dead simple.
same process. It does not require Rails but will integrate tightly with
Rails 3 to make background message processing dead simple.

Sidekiq is compatible with Resque. It uses the exact same
message format as Resque so it can integrate into an existing Resque processing farm.
Expand Down
4 changes: 2 additions & 2 deletions examples/sinkiq.rb
Expand Up @@ -19,8 +19,8 @@ def perform(msg="lulz you forgot a msg!")
end

get '/' do
@failed = $redis.get('stat:failed')
@processed = $redis.get('stat:processed')
@failed = Sidekiq::Stats.failed
@processed = Sidekiq::Stats.processed
@messages = $redis.lrange('sinkiq-example-messages', 0, -1)
erb :index
end
Expand Down
2 changes: 2 additions & 0 deletions lib/sidekiq.rb
Expand Up @@ -4,6 +4,7 @@
require 'sidekiq/worker'
require 'sidekiq/redis_connection'
require 'sidekiq/util'
require 'sidekiq/stats'

require 'sidekiq/extensions/class_methods'
require 'sidekiq/extensions/action_mailer'
Expand All @@ -13,6 +14,7 @@
require 'multi_json'

module Sidekiq
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'

DEFAULTS = {
:queues => [],
Expand Down
9 changes: 5 additions & 4 deletions lib/sidekiq/capistrano.rb
Expand Up @@ -5,24 +5,25 @@
after "deploy:restart", "sidekiq:restart"

_cset(:sidekiq_timeout) { 10 }
_cset(:sidekiq_role) { :app }
_cset(:sidekiq_role) { :app }
_cset(:sidekiq_pid) { "#{current_path}/tmp/pids/sidekiq.pid" }

namespace :sidekiq do

desc "Quiet sidekiq (stop accepting new work)"
task :quiet, :roles => lambda { fetch(:sidekiq_role) }, :on_no_matching_servers => :continue do
run "if [ -d #{current_path} ] && [ -f #{current_path}/tmp/pids/sidekiq.pid ]; then cd #{current_path} && #{fetch(:bundle_cmd, "bundle")} exec sidekiqctl quiet #{current_path}/tmp/pids/sidekiq.pid ; fi"
run "if [ -d #{current_path} ] && [ -f #{fetch :sidekiq_pid} ]; then cd #{current_path} && #{fetch(:bundle_cmd, "bundle")} exec sidekiqctl quiet #{fetch :sidekiq_pid} ; fi"
end

desc "Stop sidekiq"
task :stop, :roles => lambda { fetch(:sidekiq_role) }, :on_no_matching_servers => :continue do
run "if [ -d #{current_path} ] && [ -f #{current_path}/tmp/pids/sidekiq.pid ]; then cd #{current_path} && #{fetch(:bundle_cmd, "bundle")} exec sidekiqctl stop #{current_path}/tmp/pids/sidekiq.pid #{fetch :sidekiq_timeout} ; fi"
run "if [ -d #{current_path} ] && [ -f #{fetch :sidekiq_pid} ]; then cd #{current_path} && #{fetch(:bundle_cmd, "bundle")} exec sidekiqctl stop #{fetch :sidekiq_pid} #{fetch :sidekiq_timeout} ; fi"
end

desc "Start sidekiq"
task :start, :roles => lambda { fetch(:sidekiq_role) }, :on_no_matching_servers => :continue do
rails_env = fetch(:rails_env, "production")
run "cd #{current_path} ; nohup #{fetch(:bundle_cmd, "bundle")} exec sidekiq -e #{rails_env} -C #{current_path}/config/sidekiq.yml -P #{current_path}/tmp/pids/sidekiq.pid >> #{current_path}/log/sidekiq.log 2>&1 &", :pty => false
run "cd #{current_path} ; nohup #{fetch(:bundle_cmd, "bundle")} exec sidekiq -e #{rails_env} -C #{current_path}/config/sidekiq.yml -P #{fetch :sidekiq_pid} >> #{current_path}/log/sidekiq.log 2>&1 &", :pty => false
end

desc "Restart sidekiq"
Expand Down
4 changes: 4 additions & 0 deletions lib/sidekiq/cli.rb
Expand Up @@ -66,6 +66,10 @@ def parse(args=ARGV)
end

def run
logger.info "Booting Sidekiq #{Sidekiq::VERSION} with Redis at #{redis {|x| x.client.id}}"
logger.info "Running in #{RUBY_DESCRIPTION}"
logger.info Sidekiq::LICENSE

@manager = Sidekiq::Manager.new(options)
poller = Sidekiq::Scheduled::Poller.new
begin
Expand Down
4 changes: 3 additions & 1 deletion lib/sidekiq/client.rb
@@ -1,3 +1,5 @@
require 'securerandom'

require 'sidekiq/middleware/chain'

module Sidekiq
Expand Down Expand Up @@ -44,7 +46,7 @@ def self.push(item)
item = worker_class.get_sidekiq_options.merge(item)
item['retry'] = !!item['retry']
queue = item['queue']
item['jid'] = SecureRandom.base64
item['jid'] = SecureRandom.hex(12)

pushed = false
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/exception_handler.rb
Expand Up @@ -13,7 +13,7 @@ def handle_exception(ex, msg)
private

def send_to_airbrake(msg, ex)
::Airbrake.notify(ex, :parameters => msg)
::Airbrake.notify_or_ignore(ex, :parameters => msg)
end

def send_to_exceptional(msg, ex)
Expand Down
3 changes: 1 addition & 2 deletions lib/sidekiq/manager.rb
Expand Up @@ -18,8 +18,6 @@ class Manager
trap_exit :processor_died

def initialize(options={})
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis {|x| x.client.id}}"
logger.info "Running in #{RUBY_DESCRIPTION}"
logger.debug { options.inspect }
@count = options[:concurrency] || 25
@done_callback = nil
Expand Down Expand Up @@ -129,6 +127,7 @@ def hard_shutdown_in(delay)
# processor is an actor proxy and we can't call any methods
# that would go to the actor (since it's busy). Instead
# we'll use the object_id to track the worker's data here.
processor.terminate if processor.alive?
msg, queue = @in_progress[processor.object_id]
conn.lpush("queue:#{queue}", msg)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/middleware/server/logging.rb
Expand Up @@ -10,7 +10,7 @@ def call(*args)
logger.info { "start" }
yield
logger.info { "done: #{elapsed(start)} sec" }
rescue
rescue Exception
logger.info { "fail: #{elapsed(start)} sec" }
raise
end
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq/middleware/server/retry_jobs.rb
Expand Up @@ -40,8 +40,8 @@ class RetryJobs

def call(worker, msg, queue)
yield
rescue => e
raise unless msg['retry']
rescue Exception => e
raise e unless msg['retry']

msg['queue'] = queue
msg['error_message'] = e.message
Expand Down Expand Up @@ -72,7 +72,7 @@ def call(worker, msg, queue)
# Goodbye dear message, you (re)tried your best I'm sure.
logger.debug { "Dropping message after hitting the retry maximum: #{msg}" }
end
raise
raise e
end

end
Expand Down
27 changes: 12 additions & 15 deletions lib/sidekiq/processor.rb
Expand Up @@ -15,6 +15,8 @@ class Processor
include Util
include Celluloid

task_class Celluloid::TaskThread

def self.default_middleware
Middleware::Chain.new do |m|
m.add Middleware::Server::Logging
Expand All @@ -29,24 +31,19 @@ def initialize(boss)
end

def process(msgstr, queue)
# Defer worker execution to Celluloid's thread pool since all actor
# invocations are run within a Fiber, which dramatically limits
# our stack size.
defer do
begin
msg = Sidekiq.load_json(msgstr)
klass = constantize(msg['class'])
worker = klass.new
begin
msg = Sidekiq.load_json(msgstr)
klass = constantize(msg['class'])
worker = klass.new

stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
worker.perform(*cloned(msg['args']))
end
stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
worker.perform(*cloned(msg['args']))
end
rescue => ex
handle_exception(ex, msg || { :message => msgstr })
raise
end
rescue Exception => ex
handle_exception(ex, msg || { :message => msgstr })
raise
end
@boss.processor_done!(current_actor)
end
Expand Down
36 changes: 36 additions & 0 deletions lib/sidekiq/stats.rb
@@ -0,0 +1,36 @@
module Sidekiq
module_function

def info
results = {}
processed, failed, queues = Sidekiq.redis { |conn|
conn.multi do
conn.get('stat:processed')
conn.get('stat:failed')
conn.smembers('queues')
end
}
results[:queues_with_sizes] = Sidekiq.redis do |conn|
queues.inject({}) { |memo, q|
memo[q] = conn.llen("queue:#{q}")
memo
}.sort_by { |_, size| size }
end
results[:processed] = (processed || 0).to_i
results[:failed] = (failed || 0).to_i
results[:backlog] = results[:queues_with_sizes].
map {|_, size| size }.
inject(0) {|memo, val| memo + val }
results
end

def size(*queues)
return info[:backlog] if queues.empty?

Sidekiq.redis { |conn|
conn.multi {
queues.map { |q| conn.llen("queue:#{q}") }
}
}.inject(0) { |memo, count| memo += count }
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/util.rb
Expand Up @@ -22,7 +22,7 @@ def constantize(camel_cased_word)

def watchdog(last_words)
yield
rescue => ex
rescue Exception => ex
handle_exception(ex, { :context => last_words })
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/version.rb
@@ -1,3 +1,3 @@
module Sidekiq
VERSION = "2.2.0"
VERSION = "2.3.0"
end
37 changes: 29 additions & 8 deletions lib/sidekiq/web.rb
Expand Up @@ -60,28 +60,28 @@ def workers
end
end

def info
@info ||= Sidekiq.info
end

def processed
Sidekiq.redis { |conn| conn.get('stat:processed') } || 0
info[:processed]
end

def failed
Sidekiq.redis { |conn| conn.get('stat:failed') } || 0
info[:failed]
end

def zcard(name)
Sidekiq.redis { |conn| conn.zcard(name) }
end

def queues
@queues ||= Sidekiq.redis do |conn|
conn.smembers('queues').map do |q|
[q, conn.llen("queue:#{q}") || 0]
end.sort { |x,y| x[1] <=> y[1] }
end
@queues ||= Sidekiq.info[:queues_with_sizes]
end

def backlog
queues.map {|name, size| size }.inject(0) {|memo, val| memo + val }
info[:backlog]
end

def retries_with_score(score)
Expand Down Expand Up @@ -111,6 +111,23 @@ def relative_time(time)
def display_args(args, count=100)
args.map { |arg| a = arg.inspect; a.size > count ? "#{a[0..count]}..." : a }.join(", ")
end

def tabs
self.class.tabs
end

def number_with_delimiter(number)
begin
Float(number)
rescue ArgumentError, TypeError
return number
end

options = {:delimiter => ',', :separator => '.'}
parts = number.to_s.to_str.split('.')
parts[0].gsub!(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1#{options[:delimiter]}")
parts.join(options[:separator])
end
end

get "/" do
Expand Down Expand Up @@ -223,6 +240,10 @@ def process_score(set, score, operation)
end
end

def self.tabs
@tabs ||= ["Queues", "Retries", "Scheduled"]
end

end

end
3 changes: 2 additions & 1 deletion sidekiq.gemspec
Expand Up @@ -17,12 +17,13 @@ Gem::Specification.new do |gem|
gem.add_dependency 'redis', '~> 3'
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool', '~> 0.9.2'
gem.add_dependency 'celluloid', '~> 0.11.1'
gem.add_dependency 'celluloid', '~> 0.12.0'
gem.add_dependency 'multi_json', '~> 1'
gem.add_development_dependency 'minitest', '~> 3'
gem.add_development_dependency 'sinatra'
gem.add_development_dependency 'slim'
gem.add_development_dependency 'rake'
gem.add_development_dependency 'actionmailer', '~> 3'
gem.add_development_dependency 'activerecord', '~> 3'
gem.add_development_dependency 'pry'
end
2 changes: 2 additions & 0 deletions test/helper.rb
Expand Up @@ -4,6 +4,8 @@
SimpleCov.start
end

require 'pry'

require 'minitest/unit'
require 'minitest/pride'
require 'minitest/autorun'
Expand Down
2 changes: 1 addition & 1 deletion test/test_exception_handler.rb
Expand Up @@ -49,7 +49,7 @@ class TestExceptionHandler < MiniTest::Unit::TestCase
end

it "notifies Airbrake" do
::Airbrake.expect(:notify,nil,[TEST_EXCEPTION,:parameters => { :a => 1 }])
::Airbrake.expect(:notify_or_ignore,nil,[TEST_EXCEPTION,:parameters => { :a => 1 }])
Component.new.invoke_exception(:a => 1)
::Airbrake.verify
end
Expand Down

0 comments on commit 07930b8

Please sign in to comment.