Permalink
Browse files

upgrade to latest Telegraph; wait for agents to connect to CentralCom…

…mand before moving on to read results
  • Loading branch information...
1 parent 75d6684 commit af3efed593da67df338e75df26b8fe3404c06f40 @qxjit committed Jul 31, 2009
@@ -4,7 +4,7 @@ class ThreadAgent < Agent
def initialize(options)
super(0, options, ListenerList.new([]))
- @thread = Thread.new { execute }
+ @thread = Thread.new { execute(StringIO.new, StringIO.new) }
@work_done = 0
end
@@ -9,12 +9,17 @@ def initialize(number, options, listener)
@options = options
end
- def connect
+ def connect(stream_to_parent_process)
+ DeepTest.logger.debug { "Agent: Connecting to #{@options.origin_hostname}:#{@options.server_port}" }
@wire = Telegraph::Wire.connect(@options.origin_hostname, @options.server_port)
+ stream_to_parent_process.puts "Connected"
+ ensure
+ stream_to_parent_process.close rescue nil
end
- def execute
- connect
+ def execute(stream_from_child_process, stream_to_parent_process)
+ stream_from_child_process.close
+ connect stream_to_parent_process
reseed_random_numbers
reconnect_to_database
@@ -1,12 +1,5 @@
require 'set'
-Signal.trap("USR2") do
- caller.each do |line|
- puts line
- end
- raise
-end
-
module DeepTest
class CentralCommand
attr_reader :operator
@@ -39,8 +32,8 @@ def take_result
if @results.any?
return @results.shift
else
- raise NoAgentsRunningError unless @switchboard.any_live_wires?
@results_condvar.wait @results_mutex
+ raise NoAgentsRunningError unless @results.any? || @switchboard.any_live_wires?
end
end
end
@@ -63,18 +63,17 @@ def execute(innie, outie, grace_period)
loop do
begin
- message, wire = switchboard.next_message :timeout => 1
-
- case message.body
- when LoadFiles
- load_files message.body.files
- when DeployAgents
- deploy_agents
- wire.send_message Done
- operator.shutdown
+ switchboard.process_messages :timeout => 1 do |message, wire|
+ case message.body
+ when LoadFiles
+ load_files message.body.files
+ when DeployAgents
+ deploy_agents
+ wire.send_message Done
+ operator.shutdown
+ break
+ end
end
- rescue Telegraph::NoMessageAvailable
- retry
end
end
end
@@ -34,6 +34,7 @@ def load_files(filelist)
end
def deploy_agents
+ DeepTest.logger.debug { "RemoteDeployment deploying agents with #{@landing_fleet}" }
@landing_fleet.deploy_agents
rescue => e
raise if failed_over?
@@ -42,6 +43,7 @@ def deploy_agents
end
def fail_over(method, exception)
+ DeepTest.logger.debug { "RemoteDeployment failing over on #{method}." }
@options.ui_instance.distributed_failover_to_local(method, exception)
@landing_fleet = @failover_deployment
end
@@ -1,11 +1,19 @@
module DeepTest
module FailureMessage
def self.show(title, message, width = 70)
- puts " #{title} ".center(width, '*')
+ lines = [" #{title} ".center(width, '*')]
message.each do |line|
- puts "* #{line.strip}".ljust(width - 1) + "*"
+ lines << "* #{line.strip}".ljust(width - 1) + "*"
+ end
+ lines << "*" * width
+ string = lines.join("\n")
+ begin
+ puts string
+ rescue
+ IO.new(2) do |err|
+ err.puts string
+ end
end
- puts "*" * width
end
end
end
@@ -17,9 +17,20 @@ def load_files(files)
def deploy_agents
DeepTest.logger.debug { "Deploying #{number_of_agents} #{@agent_class}s" }
+ wait_for_connect_threads = []
each_agent do |agent_num|
- warlock.start "agent #{agent_num}", @agent_class.new(agent_num, @options, @options.new_listener_list)
+ stream_from_child_process, stream_to_parent_process = IO.pipe
+ warlock.start "agent #{agent_num}", @agent_class.new(agent_num, @options, @options.new_listener_list),
+ stream_from_child_process, stream_to_parent_process
+ wait_for_connect_threads << Thread.new do
+ stream_to_parent_process.close
+ message = stream_from_child_process.read
+ stream_from_child_process.close
+ raise "Agent was not able to connect: #{message}" unless message == "Connected\n"
+ end
end
+
+ wait_for_connect_threads.each { |t| t.join }
end
def number_of_agents
@@ -20,7 +20,7 @@ def run(exit_when_done = true)
@options.new_listener_list.before_starting_agents
@deployment.deploy_agents
begin
- DeepTest.logger.debug { "Loader Starting (#{$$})" }
+ DeepTest.logger.debug { "Main: About to process work units (#{$$})" }
passed = @runner.process_work_units(central_command)
ensure
shutdown
@@ -25,6 +25,7 @@ def start(name, demon, *demon_args)
#
@demons_semaphore.unlock if @demons_semaphore.locked?
+ close_open_network_connections
demon.forked name, @options, demon_args
exit
@@ -42,6 +43,15 @@ def start(name, demon, *demon_args)
end
end
+ def close_open_network_connections
+ ObjectSpace.each_object(BasicSocket) do |sock|
+ begin
+ sock.close
+ rescue IOError
+ end
+ end
+ end
+
def demon_count
@demons_semaphore.synchronize do
@demons.size
@@ -5,9 +5,9 @@ module Logging
def self.logger
@logger ||= begin
l = Logger.new($stdout)
- l.level = Logger::INFO
+ l.level = Logger.const_get((ENV['TELEGRAPH_LOG_LEVEL'] || 'info').upcase)
l.formatter = proc do |sev, time, progmane, msg|
- "[#{time.strftime "%T"}] #{msg}\n"
+ "[#{time.strftime "%T"}] (pid #{Process.pid}) #{msg}\n"
end
l
end
@@ -14,10 +14,22 @@ def initialize(socket, switchboard)
@socket = socket
@switchboard = switchboard
@accept_thread = Thread.new do
+ @socket.listen 100
loop do
- client = @socket.accept
- debug { "Accepted connection: #{client.inspect}" }
- @switchboard.add_wire Wire.new(client)
+ if @should_shutdown
+ @socket.close
+ @switchboard.close_all_wires
+ break
+ end
+
+ begin
+ client = @socket.accept_nonblock
+ debug { "Accepted connection: #{client.inspect}" }
+ @switchboard.add_wire Wire.new(client)
+ rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
+ connection_ready, = IO.select([@socket], nil, nil, 0.25)
+ retry if connection_ready
+ end
end
end
end
@@ -28,11 +40,8 @@ def port
def shutdown
debug { "Shutting down" }
- begin
- @socket.close
- ensure
- @switchboard.close_all_wires
- end
+ @should_shutdown = true
+ @accept_thread.join
end
end
end
@@ -9,9 +9,10 @@ def process_messages(options = {:timeout => 0})
end
def next_message(options = {:timeout => 0})
- debug { "Waiting for next message on any wire" }
+ debug { "Waiting for next message on any of #{live_wires.size} wires for #{options[:timeout]} seconds" }
if live_wires.empty?
+ sleep 0.01
Thread.pass
raise NoMessageAvailable
end
@@ -20,8 +20,12 @@ def initialize(stream)
end
def close
- debug { "closing stream" }
- @stream.close
+ if @stream.closed?
+ debug { "stream already closed" }
+ else
+ debug { "closing stream #{@stream.inspect}" }
+ @stream.close
+ end
end
def closed?
@@ -8,7 +8,7 @@ module DeepTest
central_command.write_work Test::WorkUnit.new(TestFactory.passing_test)
central_command.done_with_work
- Agent.new(0, options, stub_everything).execute
+ Agent.new(0, options, stub_everything).execute(StringIO.new, StringIO.new)
assert_equal [], central_command.switchboard.live_wires.first.unacked_messages
assert_kind_of ::Test::Unit::TestResult, central_command.take_result
@@ -21,7 +21,7 @@ module DeepTest
central_command.write_work Test::WorkUnit.new(TestFactory.failing_test)
central_command.done_with_work
- Agent.new(0, options, stub_everything).execute
+ Agent.new(0, options, stub_everything).execute(StringIO.new, StringIO.new)
result_1 = central_command.take_result
result_2 = central_command.take_result
@@ -37,7 +37,7 @@ module DeepTest
listener = stub_everything
agent = Agent.new(0, options, listener)
listener.expects(:starting).with(agent)
- agent.execute
+ agent.execute(StringIO.new, StringIO.new)
end
test "notifies listener that it is about to do work" do
@@ -49,7 +49,7 @@ module DeepTest
listener = stub_everything
agent = Agent.new(0, options, listener)
listener.expects(:starting_work).with(agent, work_unit)
- agent.execute
+ agent.execute(StringIO.new, StringIO.new)
end
test "notifies listener that it has done work" do
@@ -61,7 +61,23 @@ module DeepTest
listener = stub_everything
agent = Agent.new(0, options, listener)
listener.expects(:finished_work).with(agent, work_unit, TestResult.new(:result))
- agent.execute
+ agent.execute(StringIO.new, StringIO.new)
+ end
+
+ test "connect indicates it has connected" do
+ options = Options.new({})
+ central_command = TestCentralCommand.start(options)
+ agent = Agent.new(0, options, stub_everything)
+ agent.connect(io = StringIO.new)
+ assert_equal "Connected\n", io.string
+ end
+
+ test "connect closes stream even if there is an error" do
+ options = Options.new({})
+ agent = Agent.new(0, options, stub_everything)
+ io = StringIO.new
+ assert_raises(Errno::EADDRNOTAVAIL) { agent.connect io }
+ assert_equal true, io.closed?
end
class ResultWorkUnit
@@ -103,7 +119,7 @@ def ==(other)
central_command.write_work work_unit
central_command.done_with_work
- Agent.new(0, options, stub_everything).execute
+ Agent.new(0, options, stub_everything).execute(StringIO.new, StringIO.new)
assert_equal Agent::Error.new(work_unit, exception), central_command.take_result
end
@@ -123,7 +139,7 @@ def o._dump; raise "error"; end
options = Options.new({})
central_command = TestCentralCommand.start(options)
- t = Thread.new { Agent.new(0, options, stub_everything).execute }
+ t = Thread.new { Agent.new(0, options, stub_everything).execute(StringIO.new, StringIO.new) }
Thread.pass
central_command.write_work stub(:run => TestResult.new(:result))
central_command.done_with_work
@@ -135,7 +151,7 @@ def o._dump; raise "error"; end
options = Options.new({})
central_command = TestCentralCommand.start(options)
begin
- t = Thread.new { Agent.new(0, options, stub_everything).execute }
+ t = Thread.new { Agent.new(0, options, stub_everything).execute(StringIO.new, StringIO.new) }
sleep 0.1
ensure
central_command.stop

0 comments on commit af3efed

Please sign in to comment.