Permalink
Browse files

Initial commit.

git-svn-id: svn+ssh://rubyforge.org/var/svn/async-observer/async_observer@1 f42208c8-597e-4dc9-a40e-02aa0bb3b44f
  • Loading branch information...
0 parents commit 069cb7e5d7662a3fadf117f95f90ba33fb3e01a7 Keith Rarick committed Dec 14, 2007
Showing with 458 additions and 0 deletions.
  1. +7 −0 README
  2. +10 −0 bin/run-job
  3. +21 −0 bin/worker
  4. +1 −0 init.rb
  5. +45 −0 lib/async_observer/extend.rb
  6. +129 −0 lib/async_observer/queue.rb
  7. +27 −0 lib/async_observer/util.rb
  8. +218 −0 lib/async_observer/worker.rb
7 README
@@ -0,0 +1,7 @@
+This is Async Observer -- a Rails plugin that provides deep integration with
+Beanstalk.
+
+For more information, see http://async-observer.rubyforge.org/.
+
+For more information on Beanstalk, see its home page at
+http://xph.us/software/beanstalkd/.
@@ -0,0 +1,10 @@
+#!/usr/bin/env ruby
+
+# Rails initialization.
+# We do this here instead of using script/runner because script/runner
+# breaks __FILE__, which we use below.
+require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot')
+require RAILS_ROOT + '/config/environment'
+
+require 'async_observer/worker'
+AsyncObserver::Worker.new(binding).run_stdin_job()
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+
+# Use the same pointer (and therefore same buffer) for stdout and stderr.
+$VERBOSE = nil; STDERR = $stderr = STDOUT = $stdout; $VERBOSE = false
+
+require 'time'
+
+# Rails initialization.
+# We do this here instead of using script/runner because script/runner
+# breaks __FILE__, which we use below.
+begin
+ puts "#!load-rails!begin!#{Time.now.utc.xmlschema(6)}"
+ require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot')
+ puts "RAILS_ROOT=#{RAILS_ROOT.inspect}"
+ require RAILS_ROOT + '/config/environment'
+ensure
+ puts "#!load-rails!end!#{Time.now.utc.xmlschema(6)}"
+end
+
+require 'async_observer/worker'
+AsyncObserver::Worker.new(binding).run()
@@ -0,0 +1 @@
+require 'async_observer/extend'
@@ -0,0 +1,45 @@
+require 'async_observer/queue'
+
+module AsyncObserver::Extensions
+ def async_send(selector, *args)
+ AsyncObserver::Queue.put_call!(self, selector, args)
+ end
+end
+[Symbol, Module, Numeric, String, Array, Hash, ActiveRecord::Base].each do |c|
+ c.send :include, AsyncObserver::Extensions
+end
+
+HOOKS = [:after_create, :after_update, :after_save]
+
+class << ActiveRecord::Base
+ HOOKS.each do |hook|
+ code = %Q{def async_#{hook}(&b) add_async_hook(#{hook.inspect}, b) end}
+ class_eval(code, "generated code from #{__FILE__}:#{__LINE__ - 1}", 1)
+ end
+
+ def add_async_hook(hook, block)
+ prepare_async_hook_list(hook) << block
+ end
+
+ def prepare_async_hook_list(hook)
+ (@async_hooks ||= {})[hook] ||= new_async_hook_list(hook)
+ end
+
+ def new_async_hook_list(hook)
+ ahook = :"_async_#{hook}"
+
+ # This is for the producer's benefit
+ send(hook){|o| o.async_send(ahook)}
+
+ # This is for the worker's benefit
+ define_method(ahook) do
+ self.class.run_async_hooks(hook, self)
+ end
+
+ return []
+ end
+
+ def run_async_hooks(hook, o)
+ @async_hooks[hook].each{|b| b.call(o)}
+ end
+end
@@ -0,0 +1,129 @@
+
+module AsyncObserver; end
+
+class AsyncObserver::Queue; end
+
+class << AsyncObserver::Queue
+ DEFAULT_PRI = 512
+ attr_accessor :queue, :app_version
+
+ # This is a fake worker instance for running jobs synchronously.
+ def sync_worker()
+ @sync_worker ||= AsyncObserver::Worker.new(binding)
+ end
+
+ # This runs jobs synchronously; it's used when no queue is configured.
+ def sync_run(obj, pri=DEFAULT_PRI)
+ body = YAML.dump(obj)
+ job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, pri, body)
+ sync_worker.dispatch(job)
+ sync_worker.do_all_work()
+ end
+
+ def put!(obj, pri=DEFAULT_PRI)
+ return sync_run(obj, pri) if !queue
+ queue.connect()
+ queue.yput(obj, pri)
+ end
+
+ def put_call!(obj, sel, args=[])
+ code = gen(obj, sel, args)
+ put!(pkg(code), DEFAULT_PRI)
+ RAILS_DEFAULT_LOGGER.info("put #{DEFAULT_PRI} #{code}")
+ end
+
+ def pkg(code)
+ {
+ :type => :rails,
+ :code => code,
+ :appver => AsyncObserver::Queue.app_version,
+ }
+ end
+
+ # Be careful not to pass in a selector that's not valid ruby source.
+ def gen(obj, selector, args)
+ obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')'
+ end
+
+ def gen_args(args)
+ args.rrepr[1...-1]
+ end
+end
+
+class AsyncObserver::FakeConn
+ def delete(x)
+ end
+
+ def release(x, y, z)
+ end
+
+ def bury(x, y)
+ end
+
+ def addr
+ '<none>'
+ end
+
+ def job_stats(id)
+ {
+ 'id' => id,
+ 'state' => 'reserved',
+ 'age' => 0,
+ 'delay' => 0,
+ 'time-left' => 5000,
+ 'timeouts' => 0,
+ 'releases' => 0,
+ 'buries' => 0,
+ 'kicks' => 0,
+ }
+ end
+end
+
+# This is commented out to workaround (what we think is) a ruby bug in method
+# lookup. Somehow the methods defined here are being used instead of ones in
+# ActiveRecord::Base.
+#class Object
+# def rrepr()
+# raise ArgumentError.new('no consistent external repr for ' + self.inspect)
+# end
+#end
+
+class Symbol
+ def rrepr() inspect end
+end
+
+class Module
+ def rrepr() name end
+end
+
+class NilClass
+ def rrepr() inspect end
+end
+
+class FalseClass
+ def rrepr() inspect end
+end
+
+class TrueClass
+ def rrepr() inspect end
+end
+
+class Numeric
+ def rrepr() inspect end
+end
+
+class String
+ def rrepr() inspect end
+end
+
+class Array
+ def rrepr() '[' + map(&:rrepr).join(', ') + ']' end
+end
+
+class Hash
+ def rrepr() '{' + map{|k,v| k.rrepr + '=>' + v.rrepr}.join(', ') + '}' end
+end
+
+module AsyncObserver::Extensions
+ def rrepr() "#{self.class.rrepr}.find(#{id.rrepr})" end
+end
@@ -0,0 +1,27 @@
+
+require 'open3'
+
+module AsyncObserver; end
+module AsyncObserver::Util
+ def plumb(outio, inios)
+ loop do
+ IO.select(inios)[0].each do |inio|
+ data = inio.read()
+ if data.nil? or data == ''
+ inios -= [inio] # EOF
+ else
+ outio.write(data)
+ end
+ end
+ break if inios.empty?
+ end
+ end
+
+ def plopen(cmd, io)
+ Open3.popen3(cmd) do |pin,pout,perr|
+ yield(pin)
+ pin.close()
+ plumb(io, [pout, perr])
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 069cb7e

Please sign in to comment.