diff --git a/History.md b/History.md index 2fab78af9a..667ff59a91 100644 --- a/History.md +++ b/History.md @@ -24,6 +24,7 @@ * Rescue IO::WaitReadable instead of EAGAIN for blocking read (#2121) * Ensure `BUNDLE_GEMFILE` is unspecified in workers if unspecified in master when using `prune_bundler` (#2154) * Rescue and log exceptions in hooks defined by users (on_worker_boot, after_worker_fork etc) (#1551) + * Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177) * Refactor * Remove unused loader argument from Plugin initializer (#2095) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index fc5e24fd55..8ca3fabb87 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -63,6 +63,9 @@ def initialize(app, events=Events.stdio, options={}) @status = :stop + @all_worker_threads_free = false + @all_worker_threads_free_mutex = Mutex.new + @min_threads = 0 @max_threads = 16 @auto_trim_time = 30 @@ -189,6 +192,7 @@ def run(background=true) @thread_pool = ThreadPool.new(@min_threads, @max_threads, + Proc.new { signal_all_worker_threads_free }, ::Puma::IOBuffer) do |client, buffer| # Advertise this server into the thread @@ -290,10 +294,6 @@ def handle_servers end pool << client - busy_threads = pool.wait_until_not_full - if busy_threads == 0 - @options[:out_of_band].each(&:call) if @options[:out_of_band] - end end rescue SystemCallError # nothing @@ -310,6 +310,19 @@ def handle_servers rescue Object => e @events.unknown_error self, e, "Listen loop" end + + if @all_worker_threads_free_mutex.try_lock # Do not block the critical path if mutex isn't available + if @all_worker_threads_free + begin + @options[:out_of_band].each(&:call) if @options[:out_of_band] + rescue Exception => e + STDERR.puts "Exception handling OOB callbacks: #{e.message} (#{e.class})" + STDERR.puts e.backtrace + end + @all_worker_threads_free = false + end + @all_worker_threads_free_mutex.unlock + end end @events.fire :state, @status @@ -344,6 +357,8 @@ def handle_check when RESTART_COMMAND @status = :restart return true + when NOTIFY_THREADS_FREE_COMMAND + @all_worker_threads_free = true end return false @@ -945,5 +960,12 @@ def possible_header_injection?(header_value) HTTP_INJECTION_REGEX =~ header_value.to_s end private :possible_header_injection? + + def signal_all_worker_threads_free + @all_worker_threads_free_mutex.synchronize do + @all_worker_threads_free = true + end + end + private :signal_all_worker_threads_free end end diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 85ba68c96e..d90cf42189 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -29,7 +29,7 @@ class ForceShutdown < RuntimeError # The block passed is the work that will be performed in each # thread. # - def initialize(min, max, *extra, &block) + def initialize(min, max, all_threads_free_cb, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @@ -41,6 +41,7 @@ def initialize(min, max, *extra, &block) @min = Integer(min) @max = Integer(max) + @all_threads_free_cb = all_threads_free_cb @block = block @extra = extra @@ -116,6 +117,7 @@ def spawn_thread end @waiting += 1 + signal_threadpool_empty not_full.signal not_empty.wait mutex @waiting -= 1 @@ -213,6 +215,15 @@ def wait_until_not_full end end + # Call the callback +@all_threads_free_cb+ if all worker threads are free. + # + def signal_threadpool_empty + busy_threads = @spawned - @waiting + if busy_threads == 0 + @all_threads_free_cb.call + end + end + # If too many threads are in the pool, tell one to finish go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. diff --git a/test/test_thread_pool.rb b/test/test_thread_pool.rb index 18901d6a45..4815aca61a 100644 --- a/test/test_thread_pool.rb +++ b/test/test_thread_pool.rb @@ -10,9 +10,10 @@ def teardown def new_pool(min, max, &block) block = proc { } unless block + all_worker_threads_free_cb = proc { } @work_mutex = Mutex.new @work_done = ConditionVariable.new - @pool = Puma::ThreadPool.new(min, max, &block) + @pool = Puma::ThreadPool.new(min, max, all_worker_threads_free_cb, &block) end def pause