Skip to content

Commit

Permalink
Avoid calling processor during hard shutdown, fixes #997
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Jun 11, 2013
1 parent 84172d5 commit 06acbd4
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 13 deletions.
1 change: 1 addition & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

- Revert back to Celluloid's TaskFiber for job processing which has proven to be more
stable than TaskThread. [#985]
- Avoid possible lockup during hard shutdown [#997]

At this point, if you are experiencing stability issues with Sidekiq in
Ruby 1.9, please try Ruby 2.0. It seems to be more stable.
Expand Down
22 changes: 17 additions & 5 deletions lib/sidekiq/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ def initialize(options={})
@done_callback = nil

@in_progress = {}
@threads = {}
@done = false
@busy = []
@fetcher = Fetcher.new(current_actor, options)
@ready = @count.times.map { Processor.new_link(current_actor) }
@ready = @count.times.map { Processor.new_link(current_actor).tap {|p| p.proxy_id = p.object_id} }
end

def stop(options={})
Expand Down Expand Up @@ -63,6 +64,7 @@ def processor_done(processor)
watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
if stopped?
processor.terminate if processor.alive?
Expand All @@ -77,10 +79,13 @@ def processor_done(processor)
def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)

unless stopped?
@ready << Processor.new_link(current_actor)
@ready << Processor.new_link(current_actor).tap do |p|
p.proxy_id = p.object_id
end
dispatch
else
signal(:shutdown) if @busy.empty?
Expand All @@ -105,6 +110,14 @@ def assign(work)
end
end

# A hack worthy of Rube Goldberg. We need to be able
# to hard stop a working thread. But there's no way for us to
# get handle to the underlying thread performing work for a processor
# so we have it call us and tell us.
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end

def procline(tag)
"sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
end
Expand Down Expand Up @@ -145,10 +158,9 @@ def hard_shutdown_in(delay)
# it is worse to lose a job than to run it twice.
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)

logger.debug { "Terminating worker threads" }
logger.debug { "Terminating #{@busy.size} busy worker threads" }
@busy.each do |processor|
if processor.alive?
t = processor.bare_object.actual_work_thread
if processor.alive? && t = @threads.delete(processor.object_id)
t.raise Shutdown
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq/middleware/server/retry_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ module Server
class RetryJobs
include Sidekiq::Util

# delayed_job uses the same basic formula
DEFAULT_MAX_RETRY_ATTEMPTS = 25

def call(worker, msg, queue)
yield
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
raise e unless msg['retry']
Expand Down Expand Up @@ -110,6 +109,7 @@ def retry_attempts_from(msg_retry, default)
end
end

# delayed_job uses the same basic formula
def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1))
end
Expand Down
8 changes: 3 additions & 5 deletions lib/sidekiq/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ def self.default_middleware
end
end

# store the actual working thread so we
# can later kill if it necessary during
# hard shutdown.
attr_accessor :actual_work_thread
attr_accessor :proxy_id

def initialize(boss)
@boss = boss
Expand All @@ -37,8 +34,9 @@ def process(work)
msgstr = work.message
queue = work.queue_name

@actual_work_thread = Thread.current
do_defer do
@boss.async.real_thread(proxy_id, Thread.current)

begin
msg = Sidekiq.load_json(msgstr)
klass = msg['class'].constantize
Expand Down
2 changes: 1 addition & 1 deletion myapp/app/controllers/work_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def bulk

def long
50.times do |x|
HardWorker.perform_async('bob', 10, x)
HardWorker.perform_async('bob', 15, x)
end
render :text => 'enqueued'
end
Expand Down
2 changes: 2 additions & 0 deletions test/test_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def call(*args)
processor = Sidekiq::Processor.new(boss)
actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
boss.expect(:async, actor, [])
boss.expect(:async, actor, [])
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 0 before work_performed 0 after 3 after 2 after), $recorder.flatten
Expand Down
15 changes: 15 additions & 0 deletions test/test_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ def work(msg, queue='queue:default')
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [@processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg))
@boss.verify
assert_equal 1, $invokes
end

it 'passes exceptions to ExceptionHandler' do
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
Expand All @@ -51,6 +56,9 @@ def work(msg, queue='queue:default')
it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])

begin
@processor.process(work(msg))
Expand All @@ -67,6 +75,8 @@ def work(msg, queue='queue:default')
processor = ::Sidekiq::Processor.new(@boss)
actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
processor.process(work(msgstr))
assert_equal [['myarg']], msg['args']
Expand All @@ -93,8 +103,10 @@ def with_expire(time)
def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg))
end

Expand All @@ -118,6 +130,9 @@ def successful_job
let(:failed_today_key) { "stat:failed:#{Time.now.utc.to_date}" }

def failed_job
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
Expand Down

1 comment on commit 06acbd4

@kevinzen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In each of the areas above where you use actor.expect(:real_thread, nil, [nil, Celluloid::Thread]), I get a failure in JRuby 1.7.2, 1.7.4 and 1.7.6 -- but it passes on MRI 1.9.3.

The error is:

MockExpectationError: mocked method :real_thread called with unexpected arguments [nil, #<Thread:0x2f54bdb run>]

When I change it to be actor.expect(:real_thread, nil, [nil, Thread]) it works.

All I did was change it to expect Thread instead of Celluloid::Thread.

Any idea what could be different in my configuration? Did this pass on JRuby for you?

Please sign in to comment.