Skip to content

Commit

Permalink
Merge commit 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufied committed Mar 30, 2009
2 parents 3ab89f7 + 8e7af36 commit f391764
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 127 deletions.
12 changes: 2 additions & 10 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ require 'rake/testtask'
require 'rake/rdoctask'
require 'spec/rake/spectask'
require 'rake/contrib/sshpublisher'
require "darkfish-rdoc"

desc 'Default: run unit tests.'
task :default => :test
Expand Down Expand Up @@ -32,21 +31,14 @@ desc 'Generate documentation for the backgroundrb plugin.'
Rake::RDocTask.new(:rdoc) do |rdoc|
rdoc.rdoc_dir = 'doc/output/manual'
rdoc.title = 'Backgroundrb'
#rdoc.options << '--line-numbers' << '--inline-source'
rdoc.options << '--line-numbers' << '--inline-source'
rdoc.rdoc_files.include('README')
rdoc.rdoc_files.include('LICENSE')
rdoc.rdoc_files.include('lib/*.rb')
rdoc.rdoc_files.include('lib/backgroundrb/*.rb')
rdoc.rdoc_files.include('server/*.rb')
rdoc.rdoc_files.include('server/lib/*.rb')
#rdoc.template = 'jamis'
rdoc.options += [
'-w', '4',
'-SHN',
'-f', 'darkfish', # This bit
'-m', 'README',
]

rdoc.template = 'jamis'
end

module Rake
Expand Down
2 changes: 1 addition & 1 deletion doc/content/content.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ p(sub-title). Installation using Piston

%(entry-title)<a name="configuration"> Configuration </a>%

After getting the plugin, you must copy it into your vendor/rails and
After getting the plugin, you must copy it into your vendor/plugins and
then configure it for use. _BackgrounDRb_ comes with a rake task for
automating plugin configuration. Before running rake task, remove if
any old @backgroundrb@ or @load_worker_env.rb@ script is there in script folder of your rails
Expand Down
2 changes: 1 addition & 1 deletion examples/backgroundrb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
:persistent_disabled: false # turn this off if your application doesn't use backgroundrb's persistent/enqueued tasks system
:persistent_delay: 10 # the time (seconds) between each time backgroundrb checks the database for enqueued tasks

:memcache: "10.0.0.1:11211,10.0.0.2:11211" #=> location of mecache clusters seperated by comma
:memcache: "10.0.0.1:11211,10.0.0.2:11211" #=> location of memcache clusters separated by comma

# following section is totally optional, and only useful if you are trying to cluster of backgroundrb server
# if you do not specify this section backgroundrb will assume that, from rails you are connecting to the
Expand Down
1 change: 1 addition & 0 deletions lib/backgroundrb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require "backgroundrb/bdrb_connection"
require "backgroundrb/bdrb_cluster_connection"
require "backgroundrb/bdrb_start_stop"
require "backgroundrb/bdrb_result"
MiddleMan = BackgrounDRb::ClusterConnection.new


4 changes: 3 additions & 1 deletion lib/backgroundrb/bdrb_cluster_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ def all_worker_info
def new_worker(options = {})
update_stats
succeeded = false
result = nil

@backend_connections.each do |connection|
begin
connection.new_worker(options)
result = connection.new_worker(options)
succeeded = true
rescue BdrbConnError; end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/backgroundrb/bdrb_conn_error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ def initialize(message)
# raised, when said task was submitted without a job key, whereas
# nature of the task requires a job key
class NoJobKey < RuntimeError; end

# raised if worker throws some error
class RemoteWorkerError < RuntimeError
attr_accessor :message
def initialize message
@message = message
end
end
end
5 changes: 4 additions & 1 deletion lib/backgroundrb/bdrb_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def close_connection
def ask_work p_data
p_data[:type] = :async_invoke
dump_object(p_data)
bdrb_response = nil
@mutex.synchronize { bdrb_response = read_from_bdrb() }
close_connection
bdrb_response
end

def new_worker p_data
Expand Down Expand Up @@ -170,7 +173,7 @@ def send_request(p_data)
bdrb_response = nil
@mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
close_connection
bdrb_response ? bdrb_response[:data] : nil
bdrb_response
end
end
end
5 changes: 5 additions & 0 deletions lib/backgroundrb/bdrb_job_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class BdrbJobQueue < ActiveRecord::Base
# find next task from the table
def self.find_next(worker_name,worker_key = nil)
returned_job = nil
ActiveRecord::Base.verify_active_connections!
transaction do
unless worker_key
#use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
Expand All @@ -25,6 +26,7 @@ def self.find_next(worker_name,worker_key = nil)
# release a job and mark it to be unfinished and free.
# useful, if inside a worker, processing of this job failed and you want it to process later
def release_job
ActiveRecord::Base.verify_active_connections!
self.class.transaction do
self.taken = 0
self.started_at = nil
Expand All @@ -34,6 +36,7 @@ def release_job

# insert a new job for processing. jobs added will be automatically picked by the appropriate worker
def self.insert_job(options = { })
ActiveRecord::Base.verify_active_connections!
transaction do
options.merge!(:submitted_at => Time.now.utc,:finished => 0,:taken => 0)
t_job = new(options)
Expand All @@ -43,6 +46,7 @@ def self.insert_job(options = { })

# remove a job from table
def self.remove_job(options = { })
ActiveRecord::Base.verify_active_connections!
transaction do
t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
delete(t_job_id)
Expand All @@ -51,6 +55,7 @@ def self.remove_job(options = { })

# Mark a job as finished
def finish!
ActiveRecord::Base.verify_active_connections!
self.class.transaction do
self.finished = 1
self.finished_at = Time.now.utc
Expand Down
19 changes: 19 additions & 0 deletions lib/backgroundrb/bdrb_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module BackgrounDRb
class Result
def initialize results
@results = resuls
end

def async_response?
!(@results[:result] == true)
end

def sync_response?
(@results[:result] == true)
end

def error?
!(@results[:result_flag] == "ok")
end
end
end
26 changes: 23 additions & 3 deletions lib/backgroundrb/bdrb_start_stop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,34 @@ def start
op.write(Process.pid().to_s)
op.close
if BDRB_CONFIG[:backgroundrb][:log].nil? or BDRB_CONFIG[:backgroundrb][:log] != 'foreground'
log_file = File.open(SERVER_LOGGER,"w+")
[STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)}
redirect_io(SERVER_LOGGER)
end

BackgrounDRb::MasterProxy.new()
end
end

# Free file descriptors and
# point them somewhere sensible
# STDOUT/STDERR should go to a logfile
def redirect_io(logfile_name)
begin; STDIN.reopen "/dev/null"; rescue ::Exception; end

if logfile_name
begin
STDOUT.reopen logfile_name, "a"
STDOUT.sync = true
rescue ::Exception
begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
end
else
begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
end

begin; STDERR.reopen STDOUT; rescue ::Exception; end
STDERR.sync = true
end


def stop
pid_files = Dir["#{RAILS_HOME}/tmp/pids/backgroundrb_*.pid"]
pid_files.each { |x| kill_process(x) }
Expand Down
27 changes: 25 additions & 2 deletions lib/backgroundrb/rails_worker_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def method_missing(method_id,*args)
arguments = args.first

arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)

# allow both arg and args
arg ||= arguments[:args]

new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc

if worker_method =~ /^async_(\w+)/
Expand Down Expand Up @@ -76,8 +80,27 @@ def run_method host_info,method_name,worker_options = {}
retry
end
end
return nil if method_name == :ask_work
return_result(result)
#return nil if method_name == :ask_work
process_result(return_result(result))
end

def process_result t_result
case t_result
when Hash
if(t_result[:result] == true && t_result[:type] = :response)
if(t_result[:result_flag] == "ok")
return t_result[:data]
else
raise RemoteWorkerError.new("Error while executing worker method")
end
elsif(t_result[:result_flag] == "ok")
"ok"
elsif(t_result[:result_flag] == "error")
raise RemoteWorkerError.new("Error while executing worker method")
end
when Array
t_result
end
end

# choose a backgroundrb server connection and invoke worker method on it.
Expand Down
48 changes: 48 additions & 0 deletions release_notes.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Hi,

BackgrounDRb 1.2 is being unleashed.

* New features:

** Exceptions/errors are now popped out at the earliest moment
in the client side itself. For example:

>> MiddleMan.worker(:foo_worker).async_bar(:args => {:age => 10})
BackgrounDRb::RemoteWorkerError: BackgrounDRb::RemoteWorkerError

Above exception is thrown because remote worker doesn't have method
"bar" defined on it.

Similarly:

>> MiddleMan.worker(:foo_worker).checksum(:args => {:age => "lolz"})
BackgrounDRb::RemoteWorkerError: BackgrounDRb::RemoteWorkerError

Above exception is thrown because remote worker's checksum method
expects an integer as an argument.

For asynchronous method calls, BackgrounDRb doesn't check if method
ran successfully, it only checks existence of methods on remote
worker. For sync method calls it checks if method ran successfully
or not.

** Its possible to have per worker configuration options now.

* Bug Fixes

** Much better error/exception handling. Rogue worker methods shouldn't
crash the worker now. All the unhandled exceptions and dispatch
errors can be found in debug log file.

** Fixes for postgres db with persistent job queues.

** Switched to lightweight Queue implementation for tasks enqueued
to thread pool.

** Fixes for database dropped connections while running tasks from
persistent queues.

** Fixes for newer Rails versions.

**

65 changes: 53 additions & 12 deletions server/lib/bdrb_thread_pool.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module BackgrounDRb

class InterruptedException < RuntimeError ; end

class WorkData
attr_accessor :args,:block,:job_method,:persistent_job_id,:job_key
def initialize(args,job_key,job_method,persistent_job_id)
Expand All @@ -18,7 +21,9 @@ def initialize(master,size,logger)
@logger = logger
@size = size
@threads = []
@work_queue = Queue.new
@work_queue = []
@mutex = Monitor.new
@cv = @mutex.new_cond
@size.times { add_thread }
end

Expand All @@ -43,9 +48,13 @@ def initialize(master,size,logger)
# assuming method is defined in rss_worker

def defer(method_name,args = nil)
job_key = Thread.current[:job_key]
persistent_job_id = Thread.current[:persistent_job_id]
@work_queue << WorkData.new(args,job_key,method_name,persistent_job_id)
@mutex.synchronize do
job_key = Thread.current[:job_key]
persistent_job_id = Thread.current[:persistent_job_id]
@cv.wait_while { @work_queue.size >= size }
@work_queue.push(WorkData.new(args,job_key,method_name,persistent_job_id))
@cv.broadcast
end
end

# Start worker threads
Expand All @@ -54,10 +63,22 @@ def add_thread
Thread.current[:job_key] = nil
Thread.current[:persistent_job_id] = nil
while true
task = @work_queue.pop
Thread.current[:job_key] = task.job_key
Thread.current[:persistent_job_id] = task.persistent_job_id
block_result = run_task(task)
begin
task = nil
@mutex.synchronize do
@cv.wait_while { @work_queue.size == 0 }
task = @work_queue.pop
@cv.broadcast
end
if task
Thread.current[:job_key] = task.job_key
Thread.current[:persistent_job_id] = task.persistent_job_id
block_result = run_task(task)
end
rescue BackgrounDRb::InterruptedException
STDERR.puts("BackgrounDRb thread interrupted: #{Thread.current.inspect}")
STDERR.flush
end
end
end
end
Expand All @@ -66,7 +87,7 @@ def add_thread
def run_task task
block_arity = master.method(task.job_method).arity
begin
ActiveRecord::Base.verify_active_connections!
check_db_connection
t_data = task.args
result = nil
if block_arity != 0
Expand All @@ -75,12 +96,32 @@ def run_task task
result = master.send(task.job_method)
end
return result
rescue
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
rescue BackgrounDRb::InterruptedException => e
# Don't log, just re-raise
raise e
rescue Object => bdrb_error
log_exception(bdrb_error)
return nil
end
end

def log_exception exception_object
STDERR.puts exception_object.to_s
STDERR.puts exception_object.backtrace.join("\n")
STDERR.flush
end


# Periodic check for lost database connections and closed connections
def check_db_connection
begin
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
rescue Object => bdrb_error
log_exception(bdrb_error)
end
end


end #end of class ThreadPool
end # end of module BackgrounDRb

Loading

0 comments on commit f391764

Please sign in to comment.