Skip to content

Commit

Permalink
finish job queue functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufied committed Jun 22, 2008
1 parent d48b840 commit 7c53f8d
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 5 deletions.
3 changes: 1 addition & 2 deletions app/controller/backgroundrb_status_controller.rb
@@ -1,5 +1,4 @@
require "application"
class BackgroundrbStatusController < ApplicationController
class BackgroundrbStatusController < ActionController::Base
def index
status = MiddleMan.all_worker_info
render :text => status
Expand Down
1 change: 1 addition & 0 deletions lib/backgroundrb.rb
Expand Up @@ -8,6 +8,7 @@
require "backgroundrb/bdrb_config"
BDRB_CONFIG = BackgrounDRb::Config.read_config("#{BACKGROUNDRB_ROOT}/config/backgroundrb.yml")

require "backgroundrb/bdrb_job_queue"
require "backgroundrb/bdrb_conn_error"
require "backgroundrb/rails_worker_proxy"
require "backgroundrb/bdrb_connection"
Expand Down
5 changes: 4 additions & 1 deletion lib/backgroundrb/bdrb_connection.rb
Expand Up @@ -158,7 +158,6 @@ def return_result_from_memcache options = {}
end

def read_from_bdrb(timeout = 3)
# @tokenizer = Packet::BinParser.new
begin
ret_val = select([@connection],nil,nil,timeout)
return nil unless ret_val
Expand All @@ -170,6 +169,10 @@ def read_from_bdrb(timeout = 3)
end
end

def enqueue_task options = {}
BdrbJobQueue.insert_job(options)
end

def send_request(p_data)
p_data[:type] = :sync_invoke
dump_object(p_data)
Expand Down
46 changes: 46 additions & 0 deletions lib/backgroundrb/bdrb_job_queue.rb
@@ -0,0 +1,46 @@
class BdrbJobQueue < ActiveRecord::Base
validates_uniqueness_of :job_key,:scope => [:worker_name,:worker_key]
def self.find_next(worker_name,worker_key = nil)
returned_job = nil
transaction do
unless worker_key
t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0},:lock => true)
else
t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0,:worker_key => worker_key },:lock => true)
end
if t_job
t_job.taken = 1
t_job.started_at = Time.now
t_job.save
returned_job = t_job
end
end
returned_job
end

def release_job
self.class.transaction do
self.taken = 0
self.started_at = nil
self.save
end
end

def self.insert_job(options = { })
transaction do
p options
options.merge!(:submitted_at => Time.now,:finished => 0,:taken => 0)
t_job = new(options)
t_job.save
end
end

def finish!
self.class.transaction do
self.finished = 1
self.finished_at = Time.now
self.save
end
end
end

6 changes: 5 additions & 1 deletion lib/backgroundrb/rails_worker_proxy.rb
Expand Up @@ -30,12 +30,16 @@ def choose_method worker_method,data
if worker_method =~ /^async_(\w+)/
method_name = $1
middle_man.ask_work(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => method_name,:job_key => job_key, :arg => data))
elsif worker_method =~ /^enq_(\w+)/i
method_name = $1
args = Marshal.dump([data[0]])
options = data[1]
middle_man.enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,:worker_method => method_name.to_s,:job_key => job_key.to_s, :args => args,:timeout => options[:timeout]))
else
middle_man.send_request(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:job_key => job_key,:arg => data))
end
end


def compact(options = { })
options.delete_if { |key,value| value.nil? }
options
Expand Down
1 change: 1 addition & 0 deletions script/backgroundrb
Expand Up @@ -23,6 +23,7 @@ BackgrounDRb::Config.parse_cmd_options ARGV
BDRB_CONFIG = BackgrounDRb::Config.read_config("#{RAILS_HOME}/config/backgroundrb.yml")

require RAILS_HOME + "/config/environment"
require "bdrb_job_queue"
require "backgroundrb_server"

pid_file = "#{RAILS_HOME}/tmp/pids/backgroundrb_#{BDRB_CONFIG[:backgroundrb][:port]}.pid"
Expand Down
23 changes: 22 additions & 1 deletion server/lib/meta_worker.rb
Expand Up @@ -130,6 +130,7 @@ def worker_init
end

def job_key; Thread.current[:job_key]; end
def worker_key; worker_options[:worker_key]; end

# loads workers schedule from options supplied from rails
# a user may pass trigger arguments to dynamically define the schedule
Expand Down Expand Up @@ -240,6 +241,26 @@ def unbind; end

def connection_completed; end

def check_for_enqueued_tasks
if worker_key && !worker_key.empty?
task = BdrbJobQueue.find_next(worker_name.to_s,worker_key.to_s)
else
task = BdrbJobQueue.find_next(worker_name.to_s)
end
return unless task
if self.respond_to? task.worker_method
called_method_arity = self.method(task.worker_method).arity
args = load_data(task.args)
if called_method_arity != 0
self.send(task.worker_method,*args)
else
self.send(task.worker_method)
end
else
task.release_job
end
end

def check_for_timer_events
begin
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
Expand All @@ -248,7 +269,7 @@ def check_for_timer_events
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
end

check_for_enqueued_tasks
return if @worker_method_triggers.nil? or @worker_method_triggers.empty?
@worker_method_triggers.delete_if { |key,value| value[:trigger].respond_to?(:end_time) && value[:trigger].end_time <= Time.now }

Expand Down
38 changes: 38 additions & 0 deletions tasks/backgroundrb_tasks.rake
@@ -1,4 +1,41 @@
namespace :backgroundrb do
def setup_queue_migration
config_file = "#{RAILS_ROOT}/config/database.yml"
require "erb"
require "active_record"
config = YAML.load(ERB.new(IO.read(config_file)).result)
env = ENV["env"] || 'development'
ActiveRecord::Base.establish_connection(config[env])

table_creation =<<-EOD
create table bdrb_job_queues(
id integer not null auto_increment primary key,
args blob,
worker_name varchar(255),
worker_method varchar(255),
job_key varchar(255),
taken tinyint,
finished tinyint,
timeout int,
priority int,
submitted_at datetime,
started_at datetime,
finished_at datetime,
archived_at datetime,
tag varchar(255),
submitter_info varchar(255),
runner_info varchar(255),
worker_key varchar(255)
) ENGINE=InnoDB;
EOD
connection = ActiveRecord::Base.connection
begin
connection.execute(table_creation)
rescue ActiveRecord::StatementInvalid => e
#puts e.message
end
end

require 'yaml'
desc 'Setup backgroundrb in your rails application'
task :setup do
Expand Down Expand Up @@ -40,6 +77,7 @@ namespace :backgroundrb do
puts "Copying Worker envionment loader file #{worker_env_loader_dest}"
FileUtils.cp_r(worker_env_loader_src,worker_env_loader_dest)
end
setup_queue_migration
end

desc 'Remove backgroundrb from your rails application'
Expand Down

0 comments on commit 7c53f8d

Please sign in to comment.