Permalink
Browse files

Added status and worker_enabled to worker

  • Loading branch information...
1 parent 0f911d5 commit 73a3f645c653ea251fc55f5033007345a6815623 Horaci Cuevas committed Apr 26, 2010
Showing with 98 additions and 2 deletions.
  1. +36 −0 examples/worker_signals.rb
  2. +9 −2 lib/gearman/worker.rb
  3. +53 −0 test/mock_worker_test.rb
View
@@ -0,0 +1,36 @@
+require 'rubygems'
+#require 'gearman'
+require '../lib/gearman'
+
+Gearman::Util.debug = true
+
+servers = ['localhost:4730', 'localhost:4731']
+w = Gearman::Worker.new(servers)
+
+# Add a handler for a "sleep" function that takes a single argument, the
+# number of seconds to sleep before reporting success.
+w.add_ability('sleep') do |data,job|
+ seconds = data
+ (1..seconds.to_i).each do |i|
+ sleep 1
+ Gearman::Util.logger.info i
+ # Report our progress to the job server every second.
+ job.report_status(i, seconds)
+ end
+ # Report success.
+ true
+end
+
+# Trap signals while is working
+%w(HUP USR1 ALRM TERM).each do |signal|
+ trap(signal) do
+ puts "Received signal #{signal} - setting worker_enabled to false. Worker status is [#{w.status}]"
+ w.worker_enabled = false
+ if w.status == :waiting
+ trap(signal, "DEFAULT")
+ Process.kill( signal, $$ )
+ end
+ end
+end
+
+loop { w.work or break }
View
@@ -120,10 +120,12 @@ def initialize(job_servers=nil, opts={})
end
@reconnect_sec = 30 if not @reconnect_sec
@network_timeout_sec = 5 if not @network_timeout_sec
+ @worker_enabled = true
+ @status = :preparing
self.job_servers = job_servers if job_servers
start_reconnect_thread
end
- attr_accessor :client_id, :reconnect_sec, :network_timeout_sec, :bad_servers
+ attr_accessor :client_id, :reconnect_sec, :network_timeout_sec, :bad_servers, :worker_enabled, :status
# Start a thread to repeatedly attempt to connect to down job servers.
def start_reconnect_thread
@@ -309,6 +311,7 @@ def handle_job_assign(data, sock, hostport)
def work
req = Util.pack_request(:grab_job)
loop do
+ @status = :preparing
bad_servers = []
# We iterate through the servers in sorted order to make testing
# easier.
@@ -330,7 +333,8 @@ def work
Util.logger.debug "GearmanRuby: Got no_job from #{hostport}"
break
when :job_assign
- return if handle_job_assign(data, sock, hostport)
+ @status = :working
+ return worker_enabled if handle_job_assign(data, sock, hostport)
break
else
Util.logger.debug "GearmanRuby: Got #{type.to_s} from #{hostport}"
@@ -358,6 +362,9 @@ def work
end
end
+ return false unless worker_enabled
+ @status = :waiting
+
# FIXME: We could optimize things the next time through the 'each' by
# sending the first grab_job to one of the servers that had a socket
# with data in it. Not bothering with it for now.
View
@@ -265,4 +265,57 @@ def test_exception
s.wait
end
+
+ def test_worker_enabled
+ @server = FakeJobServer.new(self)
+ worker = nil
+ sock = nil
+ @result = nil
+
+ s = TestScript.new
+ w = TestScript.new
+
+ server_thread = Thread.new { s.loop_forever }.run
+ worker_thread = Thread.new { w.loop_forever }.run
+
+ # Create a worker and wait for it to connect to us.
+ w.exec {
+ worker = Gearman::Worker.new(
+ "localhost:#{@server.port}", { :client_id => 'test' })
+ }
+ s.exec { sock = @server.expect_connection }
+ s.wait
+
+ # After it connects, it should send its ID, and it should tell us its
+ # abilities when we report them.
+ s.exec { @server.expect_request(sock, :set_client_id, 'test') }
+ w.exec do
+ worker.add_ability('echo') do |data, job|
+ job.report_status(1, 1);
+ part1, part2 = data.split(//, 2)
+ job.send_data(part1) # send partial data first
+ part2
+ end
+ end
+ s.exec { @server.expect_request(sock, :can_do, 'echo') }
+
+ # When we set to false worker_enabled, worker.work shall return false
+ s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
+ w.exec { worker.worker_enabled = false; @result = worker.work }
+ w.wait
+ assert_equal false, @result
+
+ # When we set to true worker_enabled, worker.work shall return true
+ s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
+ w.exec { worker.worker_enabled = true; @result = worker.work }
+ w.wait
+ assert_equal true, @result
+
+ # when there are no jobs pending, and set to false worker_enabled, worker.work shall return false
+ s.exec { @server.send_response(sock, :no_job) }
+ w.exec { worker.worker_enabled = false; @result = worker.work }
+ w.wait
+ assert_equal false, @result
+ end
+
end

0 comments on commit 73a3f64

Please sign in to comment.