Skip to content

Commit

Permalink
Enable to separate tuple space from client side.
Browse files Browse the repository at this point in the history
  • Loading branch information
keita committed Feb 18, 2015
1 parent e2926fa commit a61e4f4
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 59 deletions.
8 changes: 8 additions & 0 deletions bin/pione-tuple-space-broker
@@ -0,0 +1,8 @@
#!/usr/bin/env ruby
# -*- ruby -*-

require 'pione'
require 'pione/front/tuple-space-broker-front'
require 'pione/command/pione-tuple-space-broker'

Pione::Command::PioneTupleSpaceBroker.run(ARGV)
2 changes: 1 addition & 1 deletion lib/pione/agent/notification-listener.rb
Expand Up @@ -75,7 +75,7 @@ def handle_notification(transmitter_host, message)
begin
Timeout.timeout(3) {DRb::DRbObject.new_with_uri(uri).notify(message)}
rescue Timeout::Error, DRb::DRbConnError, DRbPatch::ReplyReaderError => e
Log::Debug.notification("Notification recipient %s disconnected: %s" % [uri, e.message])
Log::SystemLog.debug("Notification recipient %s disconnected." % uri, e)
bad_recipients << uri
end
end
Expand Down
30 changes: 30 additions & 0 deletions lib/pione/agent/tuple-space-terminator.rb
@@ -0,0 +1,30 @@
module Pione
module Agent
# TupleSpaceTerminator is an agent that terminates tuple space's life.
class TupleSpaceTerminator < TupleSpaceClient
set_agent_type :tuple_space_terminator, self

def initialize(tuple_space, &b)
super(tuple_space)
@action = b
end

#
# transition definitions
#

define_transition :wait

chain :init => :wait
chain :wait => :terminate

#
# transition methods
#

def transit_to_wait
read(TupleSpace::CommandTuple.new(name: "terminate-tuple-space"))
end
end
end
end
125 changes: 78 additions & 47 deletions lib/pione/command/pione-client.rb
Expand Up @@ -155,6 +155,14 @@ class PioneClient < BasicCommand
item.desc = "Type of the client's user interface"
end

option(:delegate_tuple_space) do |item|
item.type = :boolean
item.long = '--delegate-tuple-space'
item.desc = 'Delegate tuple space to broker'
item.init = false
item.default = true
end

option_post(:validate_task_worker_size) do |item|
item.desc = "Validate task worker size"
item.process do
Expand Down Expand Up @@ -182,6 +190,7 @@ class PioneClient < BasicCommand
seq << ProcessAction.connect_parent
seq << :spawner_thread_group
seq << :ftp_server
seq << :tuple_space_provider
seq << :tuple_space
seq << :base_location
seq << :lang_environment
Expand Down Expand Up @@ -214,17 +223,63 @@ class PioneClient < BasicCommand
end
end

setup(:tuple_space) do |item|
item.desc = "Make a tuple space"
# Spawn a tuple space provider
setup(:tuple_space_provider_spawner) do |item|
item.process do
spawner = Command::PioneTupleSpaceProvider.spawn(cmd)
model[:tuple_space_provider_spawner] = spawner
model[:tuple_space_provider] = spawner.child_front

spawner.when_terminated do
if cmd.current_phase == :setup or cmd.current_phase == :execution
cmd.abort("%s is terminated because child tuple space provider is maybe dead." % cmd.name)
end
end
end

item.exception(SpawnError) do |e|
if cmd.current_phase == :termination
Log::Debug.system(e.message)
else
cmd.abort(e)
end
end
end

item.assign(:tuple_space) do
TupleSpace::TupleSpaceServer.new(task_worker_resource: model[:request_task_worker])
# Spawn a tuple space provider.
setup(:tuple_space_provider) do |item|
item.process do
if model[:delegate_tuple_space]
# get tuple space from tuple space broker
begin
Log::SystemLog.info("Find tuple space broker...")
Timeout.timeout(5) do
message = Notification::Message.new("CLIENT", "FIND_TUPLE_SPACE_BROKER", {"front" => model[:front].uri})
Notification::Transmitter.transmit(message)
model[:thread] = Thread.current
Thread.stop
end
Log::SystemLog.info('Tuple space broker "%s" has found.' % model[:tuple_space_broker_front_uri])
rescue Timeout::Error
cmd.abort("Tuple space broker has not found.")
end
else
# create tuple space by myself
cmd.phase(:setup).find_item(:tuple_space_provider_spawner).execute(cmd)
model[:tuple_space] = model[:tuple_space_provider].tuple_space()
end
end
end

setup(:tuple_space) do |item|
item.desc = "Make a tuple space"

item.process do
model[:front].set_tuple_space(model[:tuple_space])

# write tuples
resource = model[:request_task_worker] || 1
model[:tuple_space].write(TupleSpace::TaskWorkerResourceTuple.new(number: resource))
model[:tuple_space].write(TupleSpace::ProcessInfoTuple.new('standalone', 'Standalone'))
model[:tuple_space].write(TupleSpace::DryRunTuple.new(model[:dry_run]))

Expand Down Expand Up @@ -340,7 +395,6 @@ class PioneClient < BasicCommand
seq << :messenger
seq << :logger
seq << :input_generator
seq << :tuple_space_provider
seq << :task_worker
seq << :job_manager
seq << :check_rehearsal_result
Expand Down Expand Up @@ -396,41 +450,6 @@ class PioneClient < BasicCommand
end
end

execution(:tuple_space_provider_spawner) do |item|
item.desc = "Spawn a tuple space provider"

item.assign(:tuple_space_provider) do
spawner = Command::PioneTupleSpaceProvider.spawn(cmd)
spawner.when_terminated do
if cmd.current_phase == :execution
cmd.abort("%s is terminated because child tuple space provider is maybe dead." % cmd.name)
end
end
spawner.child_front
end

item.exception(SpawnError) do |e|
if cmd.current_phase == :termination
Log::Debug.system(e.message)
else
cmd.abort(e)
end
end
end

execution(:tuple_space_provider) do |item|
item.desc = "Spawn a tuple space provider"

item.process do
test(not(model[:without_tuple_space_provider]))

thread = Thread.new do
cmd.phase(:execution).find_item(:tuple_space_provider_spawner).execute(cmd)
end
model[:spawner_threads].add(thread)
end
end

# Spawn a task worker command. This is used from `task_worker` action.
execution(:task_worker_spawner) do |item|
item.desc = "Spawn a task worker"
Expand All @@ -441,7 +460,7 @@ class PioneClient < BasicCommand
:tuple_space_id => model[:tuple_space].uuid
}

Command::PioneTaskWorker.spawn(model, param)
model[:task_worker_spawners] << Command::PioneTaskWorker.spawn(model, param)
end

item.exception(SpawnError) do |e|
Expand Down Expand Up @@ -478,6 +497,8 @@ class PioneClient < BasicCommand
item.process do
test(not(model[:stand_alone]))

model[:task_worker_spawners] = []

# spawn task worker commands
model[:task_worker_size].times do
# we don't wait workers start up because of performance
Expand Down Expand Up @@ -554,14 +575,15 @@ class PioneClient < BasicCommand
seq.configure(:timeout => 10)

seq << :spawner_thread
seq << ProcessAction.terminate_children
seq << :job_manager
seq << :job_terminator
seq << :task_worker
seq << :input_generator
seq << :logger
seq << :messenger
seq << :tuple_space
seq << :kill_task_worker_processes
seq << ProcessAction.terminate_children
seq << ProcessAction.disconnect_parent
end

Expand All @@ -577,12 +599,10 @@ class PioneClient < BasicCommand
end
end

# Be careful that main thread of `pione-client` command waits to stop the
# job manager's chain thread, so pione-client cannot terminate until the
# thread terminated.
# Terminate job manager agent. Be careful that main thread of
# `pione-client` command waits to stop the job manager's chain thread, so
# pione-client cannot terminate until the thread terminated.
termination(:job_manager) do |item|
item.desc = "Terminate job manager agent"

item.process do
test(model[:job_manager])
test(not(model[:job_manager].terminated?))
Expand Down Expand Up @@ -655,6 +675,17 @@ class PioneClient < BasicCommand
model[:tuple_space].terminate
end
end

# Kill task worker processes.
termination(:kill_task_worker_processes) do |item|
item.process do
unless model[:task_worker_spawners].nil?
model[:task_worker_spawners].each do |child|
Util.ignore_exception {Process.kill(:TERM, child.pid)}
end
end
end
end
end
end
end
71 changes: 71 additions & 0 deletions lib/pione/command/pione-tuple-space-broker.rb
@@ -0,0 +1,71 @@
module Pione
module Command
# `PioneTupleSpaceBroker` is for `pione-tuple-space-broker` command.
class PioneTupleSpaceBroker < BasicCommand
#
# informations
#

define(:toplevel, true)
define(:name, "pione-tuple-space-broker")
define(:desc, "run tuple space provider agent")
define(:front, Pione::Front::TupleSpaceBrokerFront)

#
# options
#

option CommonOption.color
option CommonOption.debug
option CommonOption.communication_address
option CommonOption.parent_front
option NotificationOption.notification_targets

#
# command lifecycle: setup phase
#

phase(:setup) do |item|
item.configure(:timeout => 5)

item << ProcessAction.connect_parent
item << :tuple_space_manager
end

setup(:tuple_space_manager) do |item|
item.process do
model[:tuple_space_manager] = TupleSpace::Manager.new(self)
end
end

#
# command lifecycle: execution phase
#

phase(:execution) do |seq|
seq << :banner
seq << :sleep
end

execution(:banner) do |item|
item.process do
Log::SystemLog.info("Start tuple space broker.")
end
end

execution(:sleep) do |item|
item.process { sleep }
end

#
# command lifecycle: termination phase
#

phase(:termination) do |seq|
seq.configure(:timeout => 5)

seq << ProcessAction.terminate_children
end
end
end
end
19 changes: 12 additions & 7 deletions lib/pione/command/pione-tuple-space-provider.rb
Expand Up @@ -56,29 +56,34 @@ def self.spawn(cmd)
phase(:setup) do |item|
item.configure(:timeout => 5)

item << :tuple_space
item << ProcessAction.connect_parent
end

setup(:tuple_space) do |item|
item.process do
model[:tuple_space] = TupleSpace::TupleSpaceServer.new()
end
end

#
# command lifecycle: execution phase
#

phase(:execution) do |item|
item << :start_agent
item << :wait_agent
item << :sleep
end

# Start agent's activity.
execution(:start_agent) do |item|
item.desc = "Start an agent activity"

item.assign(:agent) do
Agent::TupleSpaceProvider.start(model[:front].uri)
end
end

execution(:wait_agent) do |item|
item.desc = "Wait agent to terminate"

# Sleep until agent is terminated.
execution(:sleep) do |item|
item.process do
model[:agent].wait_until_terminated(nil)
end
Expand All @@ -100,7 +105,7 @@ def self.spawn(cmd)

item.process do
test(model[:agent])
test(model[:agent].terminated?)
test(not(model[:agent].terminated?))

model[:agent].terminate
model[:agent].wait_until_terminated(nil)
Expand Down

0 comments on commit a61e4f4

Please sign in to comment.