Permalink
Browse files

Merge branch 'no_robust_interruption'

  • Loading branch information...
2 parents 878a900 + a00cdf3 commit d6f7dd126887e2d38d9a0a68fb8d2b93ae341526 @FooBarWidget FooBarWidget committed May 8, 2013
@@ -30,7 +30,6 @@
require 'phusion_passenger/message_client'
require 'phusion_passenger/debug_logging'
require 'phusion_passenger/utils'
-require 'phusion_passenger/utils/robust_interruption'
require 'phusion_passenger/utils/tmpdir'
require 'phusion_passenger/ruby_core_enhancements'
require 'phusion_passenger/request_handler/thread_handler'
@@ -203,7 +202,6 @@ def main_loop
end
install_useful_signal_handlers
- RobustInterruption.install
start_threads
wait_until_termination
terminate_threads
@@ -440,17 +438,14 @@ def start_threads
@concurrency.times do |i|
thread = Thread.new(i) do |number|
Thread.current.abort_on_exception = true
- RobustInterruption.install
- RobustInterruption.disable_interruptions do
- begin
- Thread.current[:name] = "Worker #{number + 1}"
- handler = thread_handler.new(self, main_socket_options)
- handler.install
- handler.main_loop(set_initialization_state_to_true)
- ensure
- set_initialization_state.call(false)
- unregister_current_thread
- end
+ begin
+ Thread.current[:name] = "Worker #{number + 1}"
+ handler = thread_handler.new(self, main_socket_options)
+ handler.install
+ handler.main_loop(set_initialization_state_to_true)
+ ensure
+ set_initialization_state.call(false)
+ unregister_current_thread
end
end
@threads << thread
@@ -459,17 +454,14 @@ def start_threads
thread = Thread.new do
Thread.current.abort_on_exception = true
- RobustInterruption.install
- RobustInterruption.disable_interruptions do
- begin
- Thread.current[:name] = "HTTP helper worker"
- handler = thread_handler.new(self, http_socket_options)
- handler.install
- handler.main_loop(set_initialization_state_to_true)
- ensure
- set_initialization_state.call(false)
- unregister_current_thread
- end
+ begin
+ Thread.current[:name] = "HTTP helper worker"
+ handler = thread_handler.new(self, http_socket_options)
+ handler.install
+ handler.main_loop(set_initialization_state_to_true)
+ ensure
+ set_initialization_state.call(false)
+ unregister_current_thread
end
end
@threads << thread
@@ -530,12 +522,34 @@ def wait_until_termination
def terminate_threads
debug("Stopping all threads")
+
+ # Mark all threads as interrupted.
+ @threads_mutex.synchronize do
+ @threads.each do |thread|
+ thread[:passenger_thread_handler].set_interrupted!
+ end
+ end
+
+ # Wake up all threads by connecting to the sockets.
+ @concurrency.times do
+ Thread.new(@server_sockets[:main][:address]) do |address|
+ begin
+ connect_to_server(address).close
+ rescue SystemCalLError, IOError
+ end
+ end
+ end
+ Thread.new(@server_sockets[:http][:address]) do |address|
+ begin
+ connect_to_server(address).close
+ rescue SystemCalLError, IOError
+ end
+ end
+
+ # Wait until threads have unregistered themselves.
done = false
while !done
@threads_mutex.synchronize do
- @threads.each do |thread|
- Utils::RobustInterruption.raise(thread)
- end
done = @threads.empty?
end
sleep 0.02 if !done
@@ -549,7 +563,7 @@ def wait_until_all_threads_are_idle
while !done
@threads_mutex.synchronize do
done = @threads.all? do |thread|
- thread[:handler].idle?
+ thread[:passenger_thread_handler].idle?
end
end
sleep 0.02 if !done
@@ -26,7 +26,6 @@
require 'phusion_passenger/message_channel'
require 'phusion_passenger/utils'
require 'phusion_passenger/utils/unseekable_socket'
-require 'phusion_passenger/utils/robust_interruption'
module PhusionPassenger
class RequestHandler
@@ -36,7 +35,9 @@ class RequestHandler
class ThreadHandler
include DebugLogging
include Utils
- include Utils::RobustInterruption
+
+ class Interrupted < StandardError
+ end
REQUEST_METHOD = 'REQUEST_METHOD'.freeze
PING = 'PING'.freeze
@@ -89,8 +90,7 @@ def initialize(request_handler, options = {})
end
def install
- Thread.current[:handler] = self
- install_robust_interruption
+ Thread.current[:passenger_thread_handler] = self
PhusionPassenger.call_event(:starting_request_handler_thread)
end
@@ -102,16 +102,20 @@ def main_loop(finish_callback)
begin
finish_callback.call
- while !Utils::RobustInterruption.interrupted?
+ while !stats_mutex.synchronize { @interrupted }
hijacked = accept_and_process_next_request(socket_wrapper, channel, buffer)
socket_wrapper = Utils::UnseekableSocket.new if hijacked
end
- rescue Utils::RobustInterruption::Interrupted
+ rescue Interrupted
# Do nothing.
end
debug("Thread handler main loop exited normally")
end
+ def set_interrupted!
+ @stats_mutex.synchronize { @interrupted = true }
+ end
+
def idle?
@stats_mutex.synchronize { return !@processing }
end
@@ -120,8 +124,11 @@ def idle?
# Returns true if the socket has been hijacked, false otherwise.
def accept_and_process_next_request(socket_wrapper, channel, buffer)
@stats_mutex.synchronize { @iterations += 1 }
- connection = enable_interruptions { socket_wrapper.wrap(@server_socket.accept) }
- @stats_mutex.synchronize { @processing = true }
+ connection = socket_wrapper.wrap(@server_socket.accept)
+ @stats_mutex.synchronize do
+ raise Interrupted if @interrupted
+ @processing = true
+ end
trace(3, "Accepted new request on socket #{@socket_name}")
channel.io = connection
if headers = parse_request(connection, channel, buffer)
@@ -1,173 +0,0 @@
-require 'thread'
-
-module PhusionPassenger
-module Utils
-
-module RobustInterruption
- class Interrupted < StandardError
- def initialize
- @origin_thread = Thread.current
- @origin = caller
- end
-
- def set_backtrace(bt)
- @origin.reverse.each do |line|
- if bt.last == line
- bt.pop
- else
- break
- end
- end
- bt << "interruption initiator thread: #{RobustInterruption._get_thread_display_name @origin_thread}"
- bt.concat(@origin)
- super(bt)
- end
- end
-
- class NotInstalled < StandardError
- end
-
- class Data
- attr_reader :interruption_flags
- attr_accessor :interrupted
- alias interrupted? interrupted
-
- def initialize
- @mutex = Mutex.new
- @interruption_flags = [true]
- @interrupted = false
- end
-
- def lock
- @mutex.lock
- end
-
- def try_lock
- return @mutex.try_lock
- end
-
- def unlock
- @mutex.unlock
- end
-
- def interruptable?
- return @interruption_flags.last
- end
-
- def push_interruption_flag(value)
- @interruption_flags.push(value)
- end
-
- def pop_interruption_flag
- Kernel.raise "BUG - cannot pop interruption flag in this state" if @interruption_flags.size <= 1
- @interruption_flags.pop
- end
- end
-
- def self.install(thread = Thread.current)
- thread[:robust_interruption] = Data.new
- end
-
- def install_robust_interruption
- RobustInterruption.install
- end
-
- def interrupted?(thread = Thread.current)
- if data = thread[:robust_interruption]
- return data.interrupted?
- else
- Kernel.raise NotInstalled, "RobustThreadInterruption not installed for #{_get_thread_display_name thread}"
- end
- end
- module_function :interrupted?
-
- def self.raise(thread, exception = Interrupted)
- if data = thread[:robust_interruption]
- RobustInterruption.disable_interruptions(Thread.current) do
- data.interrupted = true
- if data.try_lock
- begin
- thread.raise(exception)
- ensure
- data.unlock
- end
- end
- end
- else
- Kernel.raise NotInstalled, "RobustThreadInterruption not installed for #{_get_thread_display_name thread}"
- end
- end
-
- def disable_interruptions(thread = Thread.current)
- data = thread[:robust_interruption]
- if data
- was_interruptable = data.interruptable?
- data.lock if was_interruptable
- data.push_interruption_flag(false)
- begin
- yield
- ensure
- data.pop_interruption_flag
- data.unlock if was_interruptable
- end
- else
- Kernel.raise NotInstalled, "RobustThreadInterruption not installed for #{_get_thread_display_name thread}"
- end
- end
- module_function :disable_interruptions
-
- def enable_interruptions(thread = Thread.current)
- data = thread[:robust_interruption]
- if data
- was_interruptable = data.interruptable?
- data.push_interruption_flag(true)
- data.unlock if !was_interruptable
- begin
- yield
- ensure
- data.lock if !was_interruptable
- data.pop_interruption_flag
- end
- else
- Kernel.raise NotInstalled, "RobustThreadInterruption not installed for #{_get_thread_display_name thread}"
- end
- end
- module_function :enable_interruptions
-
- def restore_interruptions(thread = Thread.current)
- data = thread[:robust_interruption]
- if data
- if data.interruption_flags.size < 2
- Kernel.raise NotInstalled, "Cannot restore interruptions state to previous value - no previous value exists"
- end
- if data.interruption_flags[-2]
- enable_interruptions do
- yield
- end
- else
- disable_interruptions do
- yield
- end
- end
- else
- Kernel.raise NotInstalled, "RobustThreadInterruption not installed for #{_get_thread_display_name thread}"
- end
- end
- module_function :restore_interruptions
-
-private
- def _get_thread_display_name(thread)
- if !(thread_id = thread[:id])
- thread.to_s =~ /:(0x[0-9a-f]+)/i
- thread_id = $1 || '?'
- end
- if thread_name = thread[:name]
- thread_name = "(#{thread_name})"
- end
- return "#{thread_id}#{thread_name}"
- end
- module_function :_get_thread_display_name
-end
-
-end # module Utils
-end # module PhusionPassenger

0 comments on commit d6f7dd1

Please sign in to comment.