Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add thread reaping to thread pool
  • Loading branch information
OleMchls committed May 20, 2015
1 parent 6479e6b commit 5e95791
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/puma/server.rb
Expand Up @@ -39,6 +39,7 @@ class Server
attr_accessor :max_threads
attr_accessor :persistent_timeout
attr_accessor :auto_trim_time
attr_accessor :reaping_time
attr_accessor :first_data_timeout

# Create a server for the rack app +app+.
Expand All @@ -60,6 +61,7 @@ def initialize(app, events=Events.stdio, options={})
@min_threads = 0
@max_threads = 16
@auto_trim_time = 1
@reaping_time = 1

@thread = nil
@thread_pool = nil
Expand Down Expand Up @@ -274,6 +276,10 @@ def run(background=true)
@reactor.run_in_thread
end

if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end

if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
Expand Down
46 changes: 46 additions & 0 deletions lib/puma/thread_pool.rb
Expand Up @@ -33,6 +33,7 @@ def initialize(min, max, *extra, &block)
@workers = []

@auto_trim = nil
@reaper = nil

@mutex.synchronize do
@min.times { spawn_thread }
Expand Down Expand Up @@ -155,6 +156,21 @@ def trim(force=false)
end
end

# If there are dead threads in the pool make them go away while decreasing
# spwaned counter so that new healty threads could be created again.
def reap
@mutex.synchronize do
dead_workers = @workers.reject(&:alive?)

dead_workers.each do |worker|
worker.kill
@spawned -= 1
end

@workers -= dead_workers
end
end

class AutoTrim
def initialize(pool, timeout)
@pool = pool
Expand Down Expand Up @@ -184,6 +200,35 @@ def auto_trim!(timeout=5)
@auto_trim.start!
end

class Reaper
def initialize(pool, timeout)
@pool = pool
@timeout = timeout
@running = false
end

def start!
@running = true

@thread = Thread.new do
while @running
@pool.reap
sleep @timeout
end
end
end

def stop
@running = false
@thread.wakeup
end
end

def auto_reap!(timeout=5)
@reaper = Reaper.new(self, timeout)
@reaper.start!
end

# Tell all threads in the pool to exit and wait for them to finish.
#
def shutdown
Expand All @@ -193,6 +238,7 @@ def shutdown
@not_full.broadcast

@auto_trim.stop if @auto_trim
@reaper.stop if @reaper
end

# Use this instead of #each so that we don't stop in the middle
Expand Down
47 changes: 47 additions & 0 deletions test/test_thread_pool.rb
Expand Up @@ -179,4 +179,51 @@ def test_cleanliness

assert_equal [], values.compact
end

def test_reap_only_dead_threads
pool = new_pool(2,2) { Thread.current.kill }

assert_equal 2, pool.spawned

pool << 1

pause

assert_equal 2, pool.spawned

pool.reap

assert_equal 1, pool.spawned

pool << 2

pause

assert_equal 1, pool.spawned

pool.reap

assert_equal 0, pool.spawned
end

def test_auto_reap_dead_threads
pool = new_pool(2,2) { Thread.current.kill }

assert_equal 2, pool.spawned

pool << 1
pool << 2

pause

assert_equal 2, pool.spawned

pool.auto_reap! 1

sleep 1

pause

assert_equal 0, pool.spawned
end
end

0 comments on commit 5e95791

Please sign in to comment.