Skip to content

Commit

Permalink
Fix stability of front server and child command connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
keita committed Dec 1, 2013
1 parent a83722f commit 431e04e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 28 deletions.
21 changes: 17 additions & 4 deletions lib/pione/command/basic-command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ def initialize(argv)
Global.command = self
end

# Return current phase name.
#
# @return [Symbol]
# :init, :setup, :execution, or :termination
def current_phase
@__phase_name__
end

# Run 4 phase lifecycle of the command. This fires actions in each phase.
def run
@running_thread = Thread.current
Expand Down Expand Up @@ -355,7 +363,7 @@ module CommonCommandAction
define_action(:terminate_child_process) do |cmd|
if Global.front
# send signal TERM to the child process
Global.front.child.each do |pid, uri|
Global.front.child_pids.each do |pid|
Util.ignore_exception {Process.kill(:TERM, pid)}
end

Expand All @@ -368,14 +376,19 @@ module CommonCommandAction
end

define_action(:setup_parent_process_connection) do |cmd|
cmd.option[:parent_front].add_child(Process.pid, Global.front.uri)
ParentFrontWatchDog.new(self) # start to watch parent process
begin
cmd.option[:parent_front].register_child(Process.pid, Global.front.uri)
ParentFrontWatchDog.new(self) # start to watch parent process
rescue Front::ChildRegistrationError
# terminate if the registration failed
Global.command.terminate
end
end

define_action(:terminate_parent_process_connection) do |cmd|
# maybe parent process is dead in this timing
Util.ignore_exception do
cmd.option[:parent_front].remove_child(Process.pid)
cmd.option[:parent_front].unregister_child(Process.pid)
end
end
end
Expand Down
5 changes: 1 addition & 4 deletions lib/pione/command/spawner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ def spawn
# create a new process and watch it
pid = Process.spawn(@name, *@args)

# register PID to front server for termination
Global.front.child[pid] = nil if Global.front

# keep to watch child process
thread = Process.detach(pid)

Expand Down Expand Up @@ -63,7 +60,7 @@ def when_terminated(&b)
# URI to children table of my front, so we get it from my front and create
# the reference.
def find_child_front(pid)
if child_front_uri = Global.front.child[pid]
if child_front_uri = Global.front.child_front_uri(pid)
return DRbObject.new_with_uri(child_front_uri).tap do |front|
timeout(1) {front.ping} # test connection
end
Expand Down
90 changes: 71 additions & 19 deletions lib/pione/front/basic-front.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module Pione
module Front
# This is base class for all PIONE front classes. PIONE fronts exist in each
# command and behave as remote interface.
# command and behave as remote control interface.
class BasicFront < PioneObject
include DRbUndumped

attr_reader :uri # front server's URI string
attr_reader :attrs
attr_reader :child # child process table

# Creates a front server as druby's service.
def initialize(port)
@uri = start_service(port, {}) # port is number or range
@attrs = {}
@attr = {}
@child = {}
@child_lock = Mutex.new
@child_watchers = ThreadGroup.new
end

# Return PID of the process.
Expand All @@ -22,26 +22,78 @@ def pid
end

def [](name)
@attrs[name]
@attr[name]
end

def []=(name, val)
@attrs[name] = val
@attr[name] = val
end

# Add child process.
def add_child(pid, front_uri)
@child[pid] = front_uri
# Register the process as a child of this process. It is unregistered when
# the child is terminated.
#
# @param pid [String]
# child process's PID
# @param front_uri [String]
# child process's front URI
# @return [void]
def register_child(pid, front_uri)
unless Global.command.current_phase == :termination
# register child's PID
@child_lock.synchronize {@child[pid] = front_uri}

# unregister automatically when the child is terminated
thread = Thread.new do
Util.ignore_exception(Errno::ECHILD) do
Process.waitpid(pid)
unregister_child(pid)
end
end
thread[:pid] = pid
@child_watchers.add(thread)

return nil
else
raise ChildRegistrationError.new
end
end

# Unregister the child process.
#
# @param pid [String]
# child's PID to be removed
def unregister_child(pid)
# unregister the pid
@child_lock.synchronize {@child.delete(pid)}

# kill the watcher thread
@child_watchers.list.each do |thread|
if thread[:pid] == pid
thread.kill
break
end
end
end

# Return list of child process's PIDs.
#
# @return [Array]
# list of child command's PIDs.
def child_pids
@child.keys
end

# Delete child process.
def remove_child(pid)
@child.delete(pid)
# Return a front URI of the child PID.
#
# @return [String]
# URI of the child PID
def child_front_uri(pid)
@child[pid]
end

# Terminate the front server. This method assumes to be not called from
# other process. Note that front servers have no responsibility of killing
# child processes.
# other processes. Note that front servers have no responsibility of
# killing child processes.
def terminate
DRb.stop_service
end
Expand All @@ -61,24 +113,24 @@ def start_service(port, config)
if port.kind_of?(Range)
enum = port.each
begin
DRb.start_service(build_front_server_uri(enum.next), self, config)
DRb.start_service(build_uri(enum.next), self, config)
rescue StopIteration => e
raise
rescue
retry
end
else
DRb.start_service(build_front_server_uri(port), self, config)
DRb.start_service(build_uri(port), self, config)
end

return DRb.uri
rescue => e
raise FrontError.new(self, e)
raise FrontServerError.new(self, e)
end

# Build front server URI. Note that the address is configured by
# +Global.my_ip_address+.
def build_front_server_uri(port)
# `Global.my_ip_address`.
def build_uri(port)
"druby://%s:%s" % [Global.my_ip_address, port]
end
end
Expand Down
8 changes: 7 additions & 1 deletion lib/pione/front/front-exception.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module Pione
module Front
# `Front::Error` is a base exception class.
class Error < StandardError; end

# FrontError is raised when front server cannnot start.
class FrontError < StandardError
class FrontServerError < Error
def initialize(front, exception)
@front = front
@exception = exception
Expand All @@ -12,5 +15,8 @@ def message
end
end

# `ChildRegistrationError` is raised when child process failed to register to
# parent front.
class ChildRegistrationError < Error; end
end
end

0 comments on commit 431e04e

Please sign in to comment.