Skip to content

Commit

Permalink
Refactor Processor to use bare threads
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Oct 5, 2015
1 parent b182b11 commit 182db32
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 24 deletions.
63 changes: 46 additions & 17 deletions lib/sidekiq/processor.rb
@@ -1,5 +1,4 @@
require 'sidekiq/util'
require 'sidekiq/actor'

require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'
Expand All @@ -10,12 +9,12 @@ module Sidekiq
# processes it. It instantiates the worker, runs the middleware
# chain and then calls Sidekiq::Worker#perform.
class Processor
# To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage
# so keep them around for a long time
# To prevent a memory leak, ensure that stats expire. However, they
# should take up a minimal amount of storage so keep them around
# for a long time.
STATS_TIMEOUT = 24 * 60 * 60 * 365 * 5

include Util
include Actor

def self.default_middleware
Middleware::Chain.new do |m|
Expand All @@ -28,18 +27,54 @@ def self.default_middleware
end
end

attr_accessor :proxy_id
attr_reader :thread

def initialize(boss)
@boss = boss
def initialize(mgr)
@mgr = mgr
@done = false
@work = ::Queue.new
@thread = safe_thread("processor", &method(:run))
end

def terminate(wait=false)
@done = true
@work << nil
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.value if wait
end

def kill(wait=false)
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end

def process(work)
raise ArgumentError, "Processor is shut down!" if @done
@work << work
end

private

def run
begin
while !@done
job = @work.pop
go(job) if job
end
rescue Exception => ex
Sidekiq.logger.warn(ex.message)
@mgr.processor_died(self, ex)
end
end

def go(work)
msgstr = work.message
queue = work.queue_name

@boss.async.real_thread(proxy_id, Thread.current)

ack = false
begin
msg = Sidekiq.load_json(msgstr)
Expand Down Expand Up @@ -69,19 +104,13 @@ def process(work)
work.acknowledge if ack
end

@boss.async.processor_done(current_actor)
end

def inspect
"<Processor##{object_id.to_s(16)}>"
@mgr.processor_done(self)
end

def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end

private

def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
Expand All @@ -94,7 +123,7 @@ def stats(worker, msg, queue)
Sidekiq.redis do |conn|
conn.multi do
conn.hmset("#{identity}:workers", thread_identity, hash)
conn.expire("#{identity}:workers", 60*60*4)
conn.expire("#{identity}:workers", 14400)
end
end
end
Expand Down
107 changes: 100 additions & 7 deletions test/test_actors.rb
@@ -1,9 +1,16 @@
require_relative 'helper'
require 'sidekiq/cli'
require 'sidekiq/fetch'
require 'sidekiq/processor'

class TestActors < Sidekiq::Test
class SomeWorker
include Sidekiq::Worker
def perform(slp)
raise "boom" if slp == "boom"
sleep(slp) if slp > 0
$count += 1
end
end

describe 'fetcher' do
Expand All @@ -14,7 +21,7 @@ class SomeWorker
end

it 'can fetch' do
SomeWorker.perform_async
SomeWorker.perform_async(0)

mgr = Minitest::Mock.new
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
Expand All @@ -24,8 +31,6 @@ class SomeWorker
sleep 0.001
f.terminate
mgr.verify

#assert_equal Sidekiq::BasicFetch::UnitOfWork, job.class
end
end

Expand All @@ -37,13 +42,12 @@ class SomeWorker
end

it 'can schedule' do
ss = Sidekiq::ScheduledSet.new
ss.clear
Sidekiq.redis {|c| c.flushdb}

ss = Sidekiq::ScheduledSet.new
q = Sidekiq::Queue.new
q.clear

SomeWorker.perform_in(0.01)
SomeWorker.perform_in(0.01, 0)

assert_equal 0, q.size
assert_equal 1, ss.size
Expand All @@ -57,4 +61,93 @@ class SomeWorker
end
end

describe 'processor' do
before do
$count = 0
end

it 'can start and stop' do
f = Sidekiq::Processor.new(nil)
f.terminate
end

class Mgr
attr_reader :mutex
attr_reader :cond
def initialize
@mutex = ::Mutex.new
@cond = ::ConditionVariable.new
end
def processor_done(inst)
@mutex.synchronize do
@cond.signal
end
end
def processor_died(inst, err)
@mutex.synchronize do
@cond.signal
end
end
end

it 'can process' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async(0)

job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
mgr.mutex.synchronize do
p.process(uow)
mgr.cond.wait(mgr.mutex)
end
b = $count
assert_equal a + 1, b

assert_equal "sleep", p.thread.status
p.terminate(true)
assert_equal false, p.thread.status
end

it 'deals with errors' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async("boom")

job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
mgr.mutex.synchronize do
p.process(uow)
mgr.cond.wait(mgr.mutex)
end
b = $count
assert_equal a, b

assert_equal false, p.thread.status
p.terminate(true)
end

it 'gracefully kills' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async(0.1)

job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
p.process(uow)
sleep(0.02)
p.terminate
p.kill(true)

b = $count
assert_equal a, b
assert_equal false, p.thread.status
end
end
end

0 comments on commit 182db32

Please sign in to comment.