Skip to content

Commit

Permalink
threadpool passing specs, much nicer shutdown conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
slyphon committed May 17, 2012
1 parent 30b3e5f commit 9a9e576
Showing 1 changed file with 101 additions and 40 deletions.
141 changes: 101 additions & 40 deletions lib/zk/threadpool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ def initialize(size=nil)
@size = size || self.class.default_size

@threadpool = []
@threadqueue = ::Queue.new
@state = :new
@queue = []

@mutex = Monitor.new
@mutex = Mutex.new
@cond = ConditionVariable.new

@error_callbacks = []

Expand All @@ -36,45 +38,83 @@ def initialize(size=nil)
def defer(callable=nil, &blk)
callable ||= blk

# XXX(slyphon): do we care if the threadpool is not running?
# raise Exceptions::ThreadpoolIsNotRunningException unless running?
raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call)

@threadqueue << callable
@mutex.lock
begin
@queue << callable
@cond.broadcast
ensure
@mutex.unlock rescue nil
end

nil
end

def running?
@mutex.synchronize { @running }
@mutex.lock
begin
@state == :running
ensure
@mutex.unlock rescue nil
end
end

# returns true if the current thread is one of the threadpool threads
def on_threadpool?
tp = @mutex.synchronize { @threadpool.dup }
tp and tp.respond_to?(:include?) and tp.include?(Thread.current)
tp = nil

@mutex.synchronize do
return false unless @threadpool # you can't dup nil
tp = @threadpool.dup
end

tp.respond_to?(:include?) and tp.include?(Thread.current)
end

# starts the threadpool if not already running
def start!
@mutex.synchronize do
return false if @running
@running = true
spawn_threadpool
return false if @state == :running
@state = :running
end

spawn_threadpool
true
end

# like the start! method, but checks for dead threads in the threadpool
# (which will happen after a fork())
#
# This will reset the state of the pool and any blocks registered will be
# lost
#
# @private
def reopen_after_fork!
return false unless @running
@mutex = Monitor.new
@threadqueue = Queue.new
return false unless @state == :running
@mutex = Mutex.new
@cond = ConditionVariable.new
@queue = []
prune_dead_threads
spawn_threadpool
end

def pause_before_fork_in_parent
@mutex.lock
begin


ensure
@mutex.unlock rescue nil
end
end

def pause_before_fork_in_parent
end

def resume_after_fork_in_parent
end

# register a block to be called back with unhandled exceptions that occur
# in the threadpool.
#
Expand All @@ -95,24 +135,26 @@ def on_exception(&blk)
# the default timeout is 2 seconds per thread
#
def shutdown(timeout=2)
@mutex.synchronize do
return unless @running
@running = false
@threadqueue.clear
@size.times { @threadqueue << KILL_TOKEN }
threads = []

@mutex.lock
begin
return unless @state == :running
@state = :shutdown
@queue.clear
threads, @threadpool = @threadpool, []
@cond.broadcast
ensure
@mutex.unlock rescue nil
end

while th = threads.shift
begin
th.join(timeout)
rescue Exception => e
logger.error { "Caught exception shutting down threadpool" }
logger.error { e.to_std_format }
end
while th = threads.shift
begin
th.join(timeout)
rescue Exception => e
logger.error { "Caught exception shutting down threadpool" }
logger.error { e.to_std_format }
end

@threadqueue = ::Queue.new
end

nil
Expand Down Expand Up @@ -150,7 +192,8 @@ def default_exception_handler(e, msg=nil)
end

def prune_dead_threads
@mutex.synchronize do
@mutex.lock
begin
threads, @threadpool = @threadpool, []
return if threads.empty?

Expand All @@ -164,25 +207,43 @@ def prune_dead_threads
logger.error { e.to_std_format }
end
end
ensure
@mutex.unlock rescue nil
end
end

def spawn_threadpool #:nodoc:
@mutex.synchronize do
until @threadpool.size >= @size.to_i
thread = Thread.new do
while @running
begin
op = @threadqueue.pop
break if op == KILL_TOKEN
op.call
rescue Exception => e
dispatch_to_error_handler(e)
end
end
@threadpool << Thread.new(&method(:worker_thread_body))
end
logger.debug { "spawn threadpool complete" }
end
end

def worker_thread_body
while true
op = nil

@mutex.lock
begin
return if @state != :running

unless op = @queue.shift
@cond.wait(@mutex) if @queue.empty? and (@state == :running)
end
ensure
@mutex.unlock rescue nil
end

@threadpool << thread
next unless op

logger.debug { "got #{op.inspect} in thread" }

begin
op.call if op
rescue Exception => e
dispatch_to_error_handler(e)
end
end
end
Expand Down

0 comments on commit 9a9e576

Please sign in to comment.