Browse files

changes for taking care of error popping out

  • Loading branch information...
1 parent 2d38299 commit 22d365a4df4b49d29774efdb5a9335e0300a3ac9 @gnufied gnufied committed Mar 28, 2009
View
1 lib/backgroundrb.rb
@@ -17,6 +17,7 @@
require "backgroundrb/bdrb_connection"
require "backgroundrb/bdrb_cluster_connection"
require "backgroundrb/bdrb_start_stop"
+require "backgroundrb/bdrb_result"
MiddleMan = BackgrounDRb::ClusterConnection.new
View
4 lib/backgroundrb/bdrb_cluster_connection.rb
@@ -129,9 +129,11 @@ def all_worker_info
def new_worker(options = {})
update_stats
succeeded = false
+ result = nil
+
@backend_connections.each do |connection|
begin
- connection.new_worker(options)
+ result = connection.new_worker(options)
succeeded = true
rescue BdrbConnError; end
end
View
8 lib/backgroundrb/bdrb_conn_error.rb
@@ -18,4 +18,12 @@ def initialize(message)
# raised, when said task was submitted without a job key, whereas
# nature of the task requires a job key
class NoJobKey < RuntimeError; end
+
+ # raised if worker throws some error
+ class RemoteWorkerError < RuntimeError
+ attr_accessor :message
+ def initialize message
+ @message = message
+ end
+ end
end
View
5 lib/backgroundrb/bdrb_connection.rb
@@ -84,7 +84,10 @@ def close_connection
def ask_work p_data
p_data[:type] = :async_invoke
dump_object(p_data)
+ bdrb_response = nil
+ @mutex.synchronize { bdrb_response = read_from_bdrb() }
close_connection
+ bdrb_response
end
def new_worker p_data
@@ -170,7 +173,7 @@ def send_request(p_data)
bdrb_response = nil
@mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
close_connection
- bdrb_response ? bdrb_response[:data] : nil
+ bdrb_response
end
end
end
View
19 lib/backgroundrb/bdrb_result.rb
@@ -0,0 +1,19 @@
+module BackgrounDRb
+ class Result
+ def initialize results
+ @results = resuls
+ end
+
+ def async_response?
+ !(@results[:result] == true)
+ end
+
+ def sync_response?
+ (@results[:result] == true)
+ end
+
+ def error?
+ !(@results[:result_flag] == "ok")
+ end
+ end
+end
View
12 lib/backgroundrb/rails_worker_proxy.rb
@@ -80,8 +80,16 @@ def run_method host_info,method_name,worker_options = {}
retry
end
end
- return nil if method_name == :ask_work
- return_result(result)
+ #return nil if method_name == :ask_work
+ process_result(return_result(result))
+ end
+
+ def process_result t_result
+ case t_result
+ when Hash
+ when Array
+ t_result
+ end
end
# choose a backgroundrb server connection and invoke worker method on it.
View
14 server/lib/master_worker.rb
@@ -103,19 +103,32 @@ def start_worker_request(p_data)
def async_method_invoke(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
+
+ unless worker_methods(worker_name_key).include?(t_data[:worker_method])
+ send_object(:result_flag => "error")
+ return
+ end
+
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :request, :result => false)
+ send_object(:result_flag => "ok")
rescue Packet::DisconnectError => sock_error
+ send_object(:result_flag => "error")
reactor.live_workers.delete(worker_name_key)
rescue
+ send_object(:result_flag => "error")
debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
end
+ def worker_methods worker_name_key
+ reactor.live_workers[worker_name_key].invokable_worker_methods
+ end
+
# Given a cache key, ask the worker for result stored in it.
# If you are using Memcache for result storage, this method won't be
# called at all and bdrb client library will directly fetch
@@ -155,6 +168,7 @@ def method_invoke(t_data)
# Receieve responses from workers and dispatch them back to the client
def worker_receive p_data
+ p_data[:result_flag] ||= "ok"
send_object(p_data)
end
View
19 server/lib/meta_worker.rb
@@ -234,19 +234,23 @@ def process_request(p_data)
if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
result = nil
puts "Trying to invoke invalid worker method on worker #{worker_name}"
- send_response(p_data,result)
+ send_response(p_data,result,"error")
return
end
result = nil
Thread.current[:job_key] = user_input[:job_key]
- result = invoke_user_method(user_input[:worker_method],user_input[:arg])
+ result,result_flag = invoke_user_method(user_input[:worker_method],user_input[:arg])
if p_data[:result]
result = "dummy_result" if result.nil?
- send_response(p_data,result) if can_dump?(result)
+ if can_dump?(result)
+ send_response(p_data,result,result_flag)
+ else
+ send_response(p_data,"dummy_result","error")
+ end
end
end
@@ -280,14 +284,16 @@ def new_load_schedule
# send the response back to master process and hence to the client
# if there is an error while dumping the object, send "invalid_result_dump_check_log"
- def send_response input,output
+ def send_response input,output,result_flag = "ok"
input[:data] = output
input[:type] = :response
+ input[:result_flag] = result_flag
begin
send_data(input)
rescue Object => bdrb_error
log_exception(bdrb_error)
input[:data] = "invalid_result_dump_check_log"
+ input[:result_flag] = "error"
send_data(input)
end
end
@@ -308,14 +314,15 @@ def invoke_user_method user_method,args
else
t_result = self.send(user_method)
end
+ [t_result,"ok"]
rescue Object => bdrb_error
puts "Error calling method #{user_method} with #{args} on worker #{worker_name}"
log_exception(bdrb_error)
+ [t_result,"error"]
end
- t_result
else
puts "Trying to invoke method #{user_method} with #{args} on worker #{worker_name} failed because no such method is defined on the worker"
- nil
+ [nil,"error"]
end
end

0 comments on commit 22d365a

Please sign in to comment.