Navigation Menu

Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
git-svn-id: svn+ssh://rubyforge.org/var/svn/async-observer/async_observer@1 f42208c8-597e-4dc9-a40e-02aa0bb3b44f
  • Loading branch information
Keith Rarick committed Dec 14, 2007
0 parents commit 069cb7e
Show file tree
Hide file tree
Showing 8 changed files with 458 additions and 0 deletions.
7 changes: 7 additions & 0 deletions README
@@ -0,0 +1,7 @@
This is Async Observer -- a Rails plugin that provides deep integration with
Beanstalk.

For more information, see http://async-observer.rubyforge.org/.

For more information on Beanstalk, see its home page at
http://xph.us/software/beanstalkd/.
10 changes: 10 additions & 0 deletions bin/run-job
@@ -0,0 +1,10 @@
#!/usr/bin/env ruby

# Rails initialization.
# We do this here instead of using script/runner because script/runner
# breaks __FILE__, which we use below.
require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot')
require RAILS_ROOT + '/config/environment'

require 'async_observer/worker'
AsyncObserver::Worker.new(binding).run_stdin_job()
21 changes: 21 additions & 0 deletions bin/worker
@@ -0,0 +1,21 @@
#!/usr/bin/env ruby

# Use the same pointer (and therefore same buffer) for stdout and stderr.
$VERBOSE = nil; STDERR = $stderr = STDOUT = $stdout; $VERBOSE = false

require 'time'

# Rails initialization.
# We do this here instead of using script/runner because script/runner
# breaks __FILE__, which we use below.
begin
puts "#!load-rails!begin!#{Time.now.utc.xmlschema(6)}"
require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot')
puts "RAILS_ROOT=#{RAILS_ROOT.inspect}"
require RAILS_ROOT + '/config/environment'
ensure
puts "#!load-rails!end!#{Time.now.utc.xmlschema(6)}"
end

require 'async_observer/worker'
AsyncObserver::Worker.new(binding).run()
1 change: 1 addition & 0 deletions init.rb
@@ -0,0 +1 @@
require 'async_observer/extend'
45 changes: 45 additions & 0 deletions lib/async_observer/extend.rb
@@ -0,0 +1,45 @@
require 'async_observer/queue'

module AsyncObserver::Extensions
def async_send(selector, *args)
AsyncObserver::Queue.put_call!(self, selector, args)
end
end
[Symbol, Module, Numeric, String, Array, Hash, ActiveRecord::Base].each do |c|
c.send :include, AsyncObserver::Extensions
end

HOOKS = [:after_create, :after_update, :after_save]

class << ActiveRecord::Base
HOOKS.each do |hook|
code = %Q{def async_#{hook}(&b) add_async_hook(#{hook.inspect}, b) end}
class_eval(code, "generated code from #{__FILE__}:#{__LINE__ - 1}", 1)
end

def add_async_hook(hook, block)
prepare_async_hook_list(hook) << block
end

def prepare_async_hook_list(hook)
(@async_hooks ||= {})[hook] ||= new_async_hook_list(hook)
end

def new_async_hook_list(hook)
ahook = :"_async_#{hook}"

# This is for the producer's benefit
send(hook){|o| o.async_send(ahook)}

# This is for the worker's benefit
define_method(ahook) do
self.class.run_async_hooks(hook, self)
end

return []
end

def run_async_hooks(hook, o)
@async_hooks[hook].each{|b| b.call(o)}
end
end
129 changes: 129 additions & 0 deletions lib/async_observer/queue.rb
@@ -0,0 +1,129 @@

module AsyncObserver; end

class AsyncObserver::Queue; end

class << AsyncObserver::Queue
DEFAULT_PRI = 512
attr_accessor :queue, :app_version

# This is a fake worker instance for running jobs synchronously.
def sync_worker()
@sync_worker ||= AsyncObserver::Worker.new(binding)
end

# This runs jobs synchronously; it's used when no queue is configured.
def sync_run(obj, pri=DEFAULT_PRI)
body = YAML.dump(obj)
job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, pri, body)
sync_worker.dispatch(job)
sync_worker.do_all_work()
end

def put!(obj, pri=DEFAULT_PRI)
return sync_run(obj, pri) if !queue
queue.connect()
queue.yput(obj, pri)
end

def put_call!(obj, sel, args=[])
code = gen(obj, sel, args)
put!(pkg(code), DEFAULT_PRI)
RAILS_DEFAULT_LOGGER.info("put #{DEFAULT_PRI} #{code}")
end

def pkg(code)
{
:type => :rails,
:code => code,
:appver => AsyncObserver::Queue.app_version,
}
end

# Be careful not to pass in a selector that's not valid ruby source.
def gen(obj, selector, args)
obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')'
end

def gen_args(args)
args.rrepr[1...-1]
end
end

class AsyncObserver::FakeConn
def delete(x)
end

def release(x, y, z)
end

def bury(x, y)
end

def addr
'<none>'
end

def job_stats(id)
{
'id' => id,
'state' => 'reserved',
'age' => 0,
'delay' => 0,
'time-left' => 5000,
'timeouts' => 0,
'releases' => 0,
'buries' => 0,
'kicks' => 0,
}
end
end

# This is commented out to workaround (what we think is) a ruby bug in method
# lookup. Somehow the methods defined here are being used instead of ones in
# ActiveRecord::Base.
#class Object
# def rrepr()
# raise ArgumentError.new('no consistent external repr for ' + self.inspect)
# end
#end

class Symbol
def rrepr() inspect end
end

class Module
def rrepr() name end
end

class NilClass
def rrepr() inspect end
end

class FalseClass
def rrepr() inspect end
end

class TrueClass
def rrepr() inspect end
end

class Numeric
def rrepr() inspect end
end

class String
def rrepr() inspect end
end

class Array
def rrepr() '[' + map(&:rrepr).join(', ') + ']' end
end

class Hash
def rrepr() '{' + map{|k,v| k.rrepr + '=>' + v.rrepr}.join(', ') + '}' end
end

module AsyncObserver::Extensions
def rrepr() "#{self.class.rrepr}.find(#{id.rrepr})" end
end
27 changes: 27 additions & 0 deletions lib/async_observer/util.rb
@@ -0,0 +1,27 @@

require 'open3'

module AsyncObserver; end
module AsyncObserver::Util
def plumb(outio, inios)
loop do
IO.select(inios)[0].each do |inio|
data = inio.read()
if data.nil? or data == ''
inios -= [inio] # EOF
else
outio.write(data)
end
end
break if inios.empty?
end
end

def plopen(cmd, io)
Open3.popen3(cmd) do |pin,pout,perr|
yield(pin)
pin.close()
plumb(io, [pout, perr])
end
end
end

0 comments on commit 069cb7e

Please sign in to comment.