From 069cb7e5d7662a3fadf117f95f90ba33fb3e01a7 Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Fri, 14 Dec 2007 00:08:20 +0000 Subject: [PATCH] Initial commit. git-svn-id: svn+ssh://rubyforge.org/var/svn/async-observer/async_observer@1 f42208c8-597e-4dc9-a40e-02aa0bb3b44f --- README | 7 ++ bin/run-job | 10 ++ bin/worker | 21 ++++ init.rb | 1 + lib/async_observer/extend.rb | 45 ++++++++ lib/async_observer/queue.rb | 129 +++++++++++++++++++++ lib/async_observer/util.rb | 27 +++++ lib/async_observer/worker.rb | 218 +++++++++++++++++++++++++++++++++++ 8 files changed, 458 insertions(+) create mode 100644 README create mode 100755 bin/run-job create mode 100755 bin/worker create mode 100644 init.rb create mode 100644 lib/async_observer/extend.rb create mode 100644 lib/async_observer/queue.rb create mode 100644 lib/async_observer/util.rb create mode 100644 lib/async_observer/worker.rb diff --git a/README b/README new file mode 100644 index 0000000..1601221 --- /dev/null +++ b/README @@ -0,0 +1,7 @@ +This is Async Observer -- a Rails plugin that provides deep integration with +Beanstalk. + +For more information, see http://async-observer.rubyforge.org/. + +For more information on Beanstalk, see its home page at +http://xph.us/software/beanstalkd/. diff --git a/bin/run-job b/bin/run-job new file mode 100755 index 0000000..799b610 --- /dev/null +++ b/bin/run-job @@ -0,0 +1,10 @@ +#!/usr/bin/env ruby + +# Rails initialization. +# We do this here instead of using script/runner because script/runner +# breaks __FILE__, which we use below. +require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot') +require RAILS_ROOT + '/config/environment' + +require 'async_observer/worker' +AsyncObserver::Worker.new(binding).run_stdin_job() diff --git a/bin/worker b/bin/worker new file mode 100755 index 0000000..662dd6c --- /dev/null +++ b/bin/worker @@ -0,0 +1,21 @@ +#!/usr/bin/env ruby + +# Use the same pointer (and therefore same buffer) for stdout and stderr. +$VERBOSE = nil; STDERR = $stderr = STDOUT = $stdout; $VERBOSE = false + +require 'time' + +# Rails initialization. +# We do this here instead of using script/runner because script/runner +# breaks __FILE__, which we use below. +begin + puts "#!load-rails!begin!#{Time.now.utc.xmlschema(6)}" + require File.expand_path(File.dirname(__FILE__) + '/../../../../config/boot') + puts "RAILS_ROOT=#{RAILS_ROOT.inspect}" + require RAILS_ROOT + '/config/environment' +ensure + puts "#!load-rails!end!#{Time.now.utc.xmlschema(6)}" +end + +require 'async_observer/worker' +AsyncObserver::Worker.new(binding).run() diff --git a/init.rb b/init.rb new file mode 100644 index 0000000..3b9c1f0 --- /dev/null +++ b/init.rb @@ -0,0 +1 @@ +require 'async_observer/extend' diff --git a/lib/async_observer/extend.rb b/lib/async_observer/extend.rb new file mode 100644 index 0000000..b8e22b3 --- /dev/null +++ b/lib/async_observer/extend.rb @@ -0,0 +1,45 @@ +require 'async_observer/queue' + +module AsyncObserver::Extensions + def async_send(selector, *args) + AsyncObserver::Queue.put_call!(self, selector, args) + end +end +[Symbol, Module, Numeric, String, Array, Hash, ActiveRecord::Base].each do |c| + c.send :include, AsyncObserver::Extensions +end + +HOOKS = [:after_create, :after_update, :after_save] + +class << ActiveRecord::Base + HOOKS.each do |hook| + code = %Q{def async_#{hook}(&b) add_async_hook(#{hook.inspect}, b) end} + class_eval(code, "generated code from #{__FILE__}:#{__LINE__ - 1}", 1) + end + + def add_async_hook(hook, block) + prepare_async_hook_list(hook) << block + end + + def prepare_async_hook_list(hook) + (@async_hooks ||= {})[hook] ||= new_async_hook_list(hook) + end + + def new_async_hook_list(hook) + ahook = :"_async_#{hook}" + + # This is for the producer's benefit + send(hook){|o| o.async_send(ahook)} + + # This is for the worker's benefit + define_method(ahook) do + self.class.run_async_hooks(hook, self) + end + + return [] + end + + def run_async_hooks(hook, o) + @async_hooks[hook].each{|b| b.call(o)} + end +end diff --git a/lib/async_observer/queue.rb b/lib/async_observer/queue.rb new file mode 100644 index 0000000..835d446 --- /dev/null +++ b/lib/async_observer/queue.rb @@ -0,0 +1,129 @@ + +module AsyncObserver; end + +class AsyncObserver::Queue; end + +class << AsyncObserver::Queue + DEFAULT_PRI = 512 + attr_accessor :queue, :app_version + + # This is a fake worker instance for running jobs synchronously. + def sync_worker() + @sync_worker ||= AsyncObserver::Worker.new(binding) + end + + # This runs jobs synchronously; it's used when no queue is configured. + def sync_run(obj, pri=DEFAULT_PRI) + body = YAML.dump(obj) + job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, pri, body) + sync_worker.dispatch(job) + sync_worker.do_all_work() + end + + def put!(obj, pri=DEFAULT_PRI) + return sync_run(obj, pri) if !queue + queue.connect() + queue.yput(obj, pri) + end + + def put_call!(obj, sel, args=[]) + code = gen(obj, sel, args) + put!(pkg(code), DEFAULT_PRI) + RAILS_DEFAULT_LOGGER.info("put #{DEFAULT_PRI} #{code}") + end + + def pkg(code) + { + :type => :rails, + :code => code, + :appver => AsyncObserver::Queue.app_version, + } + end + + # Be careful not to pass in a selector that's not valid ruby source. + def gen(obj, selector, args) + obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')' + end + + def gen_args(args) + args.rrepr[1...-1] + end +end + +class AsyncObserver::FakeConn + def delete(x) + end + + def release(x, y, z) + end + + def bury(x, y) + end + + def addr + '' + end + + def job_stats(id) + { + 'id' => id, + 'state' => 'reserved', + 'age' => 0, + 'delay' => 0, + 'time-left' => 5000, + 'timeouts' => 0, + 'releases' => 0, + 'buries' => 0, + 'kicks' => 0, + } + end +end + +# This is commented out to workaround (what we think is) a ruby bug in method +# lookup. Somehow the methods defined here are being used instead of ones in +# ActiveRecord::Base. +#class Object +# def rrepr() +# raise ArgumentError.new('no consistent external repr for ' + self.inspect) +# end +#end + +class Symbol + def rrepr() inspect end +end + +class Module + def rrepr() name end +end + +class NilClass + def rrepr() inspect end +end + +class FalseClass + def rrepr() inspect end +end + +class TrueClass + def rrepr() inspect end +end + +class Numeric + def rrepr() inspect end +end + +class String + def rrepr() inspect end +end + +class Array + def rrepr() '[' + map(&:rrepr).join(', ') + ']' end +end + +class Hash + def rrepr() '{' + map{|k,v| k.rrepr + '=>' + v.rrepr}.join(', ') + '}' end +end + +module AsyncObserver::Extensions + def rrepr() "#{self.class.rrepr}.find(#{id.rrepr})" end +end diff --git a/lib/async_observer/util.rb b/lib/async_observer/util.rb new file mode 100644 index 0000000..dac23ab --- /dev/null +++ b/lib/async_observer/util.rb @@ -0,0 +1,27 @@ + +require 'open3' + +module AsyncObserver; end +module AsyncObserver::Util + def plumb(outio, inios) + loop do + IO.select(inios)[0].each do |inio| + data = inio.read() + if data.nil? or data == '' + inios -= [inio] # EOF + else + outio.write(data) + end + end + break if inios.empty? + end + end + + def plopen(cmd, io) + Open3.popen3(cmd) do |pin,pout,perr| + yield(pin) + pin.close() + plumb(io, [pout, perr]) + end + end +end diff --git a/lib/async_observer/worker.rb b/lib/async_observer/worker.rb new file mode 100644 index 0000000..e1fd35c --- /dev/null +++ b/lib/async_observer/worker.rb @@ -0,0 +1,218 @@ + +require 'beanstalk' +require 'async_observer/queue' +require 'async_observer/util' +require 'lib/joey_3000_compliant_logger' + +module AsyncObserver; end + +class AsyncObserver::Worker + extend AsyncObserver::Util + + SLEEP_TIME = 60 if !defined?(SLEEP_TIME) # rails loads this file twice + + class << self + attr_accessor :finish + attr_writer :handle, :run_version + + def handle + @handle or raise 'no custom handler is defined' + end + + def run_version + @run_version or raise 'no alternate version runner is defined' + end + end + + def initialize(top_binding) + @top_binding = top_binding + end + + def main_loop() + loop do + safe_dispatch(get_job()) + end + end + + def startup() + logb('worker-startup') do + RAILS_DEFAULT_LOGGER.info "pid is #{$$}" + RAILS_DEFAULT_LOGGER.info "app version is #{AsyncObserver::Queue.app_version}" + mark_db_socket_close_on_exec() + if AsyncObserver::Queue.queue.nil? + RAILS_DEFAULT_LOGGER.info 'no queue has been configured' + exit(1) + end + end + end + + # This prevents us from leaking fds when we exec. Only works for mysql. + def mark_db_socket_close_on_exec() + ActiveRecord::Base.connection.set_close_on_exec() + rescue NoMethodError + end + + def shutdown() + logb('worker-shutdown') do + do_all_work() + end + end + + def run() + startup() + main_loop() + rescue Interrupt + shutdown() + end + + def run_stdin_job() + job = Beanstalk::Job.new(nil, 0, 0, $stdin.read()) + raise 'Fatal version mismatch' if !version_matches?(job) + run_code(job) + end + + def self.run_job_in(root, job) + RAILS_DEFAULT_LOGGER.info "run job #{job.id} in #{root}" + plopen(root + '/vendor/plugins/async_observer/bin/run-job', $stdout) do |io| + io.write(job.body) + end + raise 'job failed for some reason. check the log.' if !$?.success? + job.delete() + end + + def q_hint() + @q_hint || AsyncObserver::Queue.queue + end + + # This heuristic is to help prevent one queue from starving. The idea is that + # if the connection returns a job right away, it probably has more available. + # But if it takes time, then it's probably empty. So reuse the same + # connection as long as it stays fast. Otherwise, have no preference. + def reserve_and_set_hint() + t1 = Time.now.utc + return job = q_hint().reserve() + ensure + t2 = Time.now.utc + @q_hint = if brief?(t1, t2) and job then job.conn else nil end + end + + def brief?(t1, t2) + ((t2 - t1) * 100).to_i.abs < 10 + end + + def get_job() + logb('worker-get-job') do + loop do + begin + AsyncObserver::Queue.queue.connect() + return reserve_and_set_hint() + rescue Interrupt => ex + raise ex + rescue Exception => ex + @q_hint = nil # in case there's something wrong with this conn + RAILS_DEFAULT_LOGGER.info( + "#{ex.class}: #{ex}\n" + ex.fixed_backtrace.join("\n")) + RAILS_DEFAULT_LOGGER.info 'something is wrong. We failed to get a job.' + RAILS_DEFAULT_LOGGER.info "sleeping for #{SLEEP_TIME}s..." + sleep(SLEEP_TIME) + end + end + end + end + + def dispatch(job) + return run_ao_job(job) if async_observer_job?(job) + return run_other(job) + end + + def safe_dispatch(job) + logb('worker-dispatch') do + RAILS_DEFAULT_LOGGER.info "got #{job.inspect}:\n" + job.body + logb('job-stats') do + job.stats.each do |k,v| + RAILS_DEFAULT_LOGGER.info "#{k}=#{v}" + end + end + begin + return dispatch(job) + rescue Interrupt => ex + begin job.release() rescue :ok end + raise ex + rescue Exception => ex + RAILS_DEFAULT_LOGGER.info '#!oops' + RAILS_DEFAULT_LOGGER.info "Job #{job.id} FAILED: #{job.inspect}" + RAILS_DEFAULT_LOGGER.info( + "#{ex.class}: #{ex}\n" + ex.fixed_backtrace.join("\n")) + begin + job.decay() + rescue Beanstalk::UnexpectedResponse + :ok + end + end + end + end + + def run_ao_job(job) + RAILS_DEFAULT_LOGGER.info 'running as async observer job' + if version_matches?(job) + run_code(job) + job.delete() + else + RAILS_DEFAULT_LOGGER.info "mismatch; running alternate app version #{job.ybody[:appver]}" + self.class.run_version.call(job.ybody[:appver], job) + end + rescue ActiveRecord::RecordNotFound => ex + if job.age > 60 + job.delete() # it's old; this error is most likely permanent + else + raise ex # it could be replication delay so retry + end + end + + def run_code(job) + eval(job.ybody[:code], @top_binding, "(beanstalk job #{job.id})", 1) + end + + def version_matches?(job) + return true if job.ybody[:appver].nil? # always run versionless jobs + job.ybody[:appver] == AsyncObserver::Queue.app_version + end + + def async_observer_job?(job) + begin job.ybody[:type] == :rails rescue false end + end + + def run_other(job) + RAILS_DEFAULT_LOGGER.info 'trying custom handler' + self.class.handle.call(job) + end + + def do_all_work() + RAILS_DEFAULT_LOGGER.info 'finishing all running jobs. interrupt again to kill them.' + f = self.class.finish + f.call() if f + end +end + +class ActiveRecord::ConnectionAdapters::MysqlAdapter + def set_close_on_exec() + @connection.set_close_on_exec() + end +end + +class Mysql + def set_close_on_exec() + if @net + @net.set_close_on_exec() + else + # we are in the c mysql binding + RAILS_DEFAULT_LOGGER.info "Warning: we are using the C mysql binding, can't set close-on-exec" + end + end +end + +class Mysql::Net + def set_close_on_exec() + @sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) + end +end