Skip to content
Browse files

Initial commit

  • Loading branch information...
0 parents commit 95d09a065b228ce71763e639d1006f5b1856340f @mperham committed Apr 10, 2011
4 .gitignore
@@ -0,0 +1,4 @@
+*.gem
+.bundle
+Gemfile.lock
+pkg/*
1 .rvmrc
@@ -0,0 +1 @@
+rvm use rbx
4 Gemfile
@@ -0,0 +1,4 @@
+source "http://rubygems.org"
+
+# Specify your gem's dependencies in girl_friday.gemspec
+gemspec
69 README.md
@@ -0,0 +1,69 @@
+girl_friday
+====================
+
+Have a task you want to get done sometime soon but don't want to do it yourself? Give it to your [girl friday](http://en.wikipedia.org/wiki/Girl_Friday)!
+
+girl_friday is a Ruby library for performing asynchronous tasks. Often times you don't want to block a web response by performing some task, like sending an email, so you can just use this gem to perform it in the background. It works with any Ruby application, including Rails 3 applications.
+
+
+Installation
+------------------
+
+We recommend using [JRuby](http://jruby.org) or [Rubinius](http://rubini.us) with girl_friday. Both are excellent options for executing Ruby these days.
+
+ gem install girl_friday
+
+Open your Rails application's Gemfile and add:
+
+ gem 'girl_friday'
+
+
+Usage
+--------------------
+
+Put girl_friday in your Gemfile:
+
+ gem 'girl_friday'
+
+In your Rails app, create a `config/initializers/queues.rb` which defines your queues:
+
+ QUEUE = GirlFriday::WorkQueue.new('user_email') do |msg|
+ UserMailer.registration_email(msg).deliver
+ end
+
+In your controller action or model, you can call `#push(msg)`
+
+ QUEUE.push(:type => 'registration_email', :user => { :email => @user.email, :name => @user.name }))
+
+The msg parameter to push is just a Hash whose contents are completely up to you.
+
+Your message processing block should **NOT** access any instance data or variables outside of the block. That's shared mutable state and dangerous to touch! I also strongly recommend your queue processor block be **VERY** short, ideally just a method call or two. You can unit test those methods easily but not the processor block itself.
+
+
+Error Handling
+--------------------
+
+Your processor block can raise any error; don't worry about needing a begin..rescue block. Each queue contains a supervisor who will log any exceptions (to stderr or Hoptoad Notifier) and restart a new worker.
+
+
+More Detail
+--------------------
+
+But why not use any of the zillions of other async solutions (Resque, dj, etc)? Because girl\_friday is easier and more efficient than those solutions: girl_friday runs in your Rails process and uses the actor pattern for safe concurrency. Because it runs in the same process, you don't have to monitor a separate set of processes, deploy a separate codebase, buy extra memory for those processes, etc.
+
+You do need to write thread-safe code. This is not hard to do: the actor pattern means that you get a message and process that message. There is no shared data which requires locks and could lead to deadlock in your application code. Because girl\_friday does use Threads under the covers, you do need to ensure that your VM can execute Threads efficiently: today this means JRuby or Rubinius. **To be clear: this gem will work but not scale well on Ruby 1.9.**
+
+
+
+Thanks
+--------------------
+
+[Carbon Five](http://carbonfive.com), I write and maintain girl_friday on their clock.
+
+This gem contains a copy of the Rubinius Actor API, modified to work on any Ruby VM. Thanks to Evan Phoenix, MenTaLguY and the Rubinius project for permission to use and distribute this code.
+
+
+Author
+--------------------
+
+Mike Perham, [@mperham](https://twitter.com/mperham), [mikeperham.com](http://mikeperham.com)
2 Rakefile
@@ -0,0 +1,2 @@
+require 'bundler'
+Bundler::GemHelper.install_tasks
19 girl_friday.gemspec
@@ -0,0 +1,19 @@
+# -*- encoding: utf-8 -*-
+require "./lib/girl_friday/version"
+
+Gem::Specification.new do |s|
+ s.name = "girl_friday"
+ s.version = GirlFriday::VERSION
+ s.platform = Gem::Platform::RUBY
+ s.authors = ["Mike Perham"]
+ s.email = ["mperham@gmail.com"]
+ s.homepage = ""
+ s.summary = s.description = %q{Background processing via Rubinius's actor API}
+
+ s.rubyforge_project = "girl_friday"
+
+ s.files = `git ls-files`.split("\n")
+ s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
+ s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
+ s.require_paths = ["lib"]
+end
10 lib/girl_friday.rb
@@ -0,0 +1,10 @@
+begin
+ require 'actor'
+ require 'girl_friday/monkey_patches'
+rescue LoadError
+ require 'girl_friday/actor'
+end
+
+require 'girl_friday/version'
+require 'girl_friday/work_queue'
+require 'girl_friday/error_handler'
463 lib/girl_friday/actor.rb
@@ -0,0 +1,463 @@
+# actor.rb - implementation of the actor model
+#
+# Copyright 2007-2008 MenTaLguY <mental@rydia.net>
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright notice,
+# thi slist of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice
+# this list of conditions and the following disclaimer in the documentatio
+# and/or other materials provided with the distribution.
+# * Neither the name of the Evan Phoenix nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+require 'thread' # for Queue
+
+class Actor
+ class DeadActorError < RuntimeError
+ attr_reader :actor
+ attr_reader :reason
+ def initialize(actor, reason)
+ super(reason)
+ @actor = actor
+ @reason = reason
+ end
+ end
+
+ ANY = Object.new
+ def ANY.===(other)
+ true
+ end
+
+ class << self
+ alias_method :private_new, :new
+ private :private_new
+
+ @@registered_lock = Queue.new
+ @@registered = {}
+ @@registered_lock << nil
+
+ def current
+ Thread.current[:__current_actor__] ||= private_new
+ end
+
+ # Spawn a new Actor that will run in its own thread
+ def spawn(*args, &block)
+ raise ArgumentError, "no block given" unless block
+ spawned = Queue.new
+ Thread.new do
+ private_new do |actor|
+ Thread.current[:__current_actor__] = actor
+ spawned << actor
+ block.call *args
+ end
+ end
+ spawned.pop
+ end
+ alias_method :new, :spawn
+
+ # Atomically spawn an actor and link it to the current actor
+ def spawn_link(*args, &block)
+ current = self.current
+ link_complete = Queue.new
+ spawn do
+ begin
+ Actor.link(current)
+ ensure
+ link_complete << Actor.current
+ end
+ block.call *args
+ end
+ link_complete.pop
+ end
+
+ # Polls for exit notifications
+ def check_for_interrupt
+ current._check_for_interrupt
+ self
+ end
+
+ # Waits until a matching message is received in the current actor's
+ # mailbox, and executes the appropriate action. May be interrupted by
+ # exit notifications.
+ def receive #:yields: filter
+ filter = Filter.new
+ if block_given?
+ yield filter
+ else
+ filter.when(ANY) { |m| m }
+ end
+ current._receive(filter)
+ end
+
+ # Send a "fake" exit notification to another actor, as if the current
+ # actor had exited with +reason+
+ def send_exit(recipient, reason)
+ recipient.notify_exited(current, reason)
+ self
+ end
+
+ # Link the current Actor to another one.
+ def link(actor)
+ current = self.current
+ current.notify_link actor
+ actor.notify_link current
+ self
+ end
+
+ # Unlink the current Actor from another one
+ def unlink(actor)
+ current = self.current
+ current.notify_unlink actor
+ actor.notify_unlink current
+ self
+ end
+
+ # Actors trapping exit do not die when an error occurs in an Actor they
+ # are linked to. Instead the exit message is sent to their regular
+ # mailbox in the form [:exit, actor, reason]. This allows certain
+ # Actors to supervise sets of others and restart them in the event
+ # of an error. Setting the trap flag may be interrupted by pending
+ # exit notifications.
+ #
+ def trap_exit=(value)
+ current._trap_exit = value
+ self
+ end
+
+ # Is the Actor trapping exit?
+ def trap_exit
+ current._trap_exit
+ end
+ alias_method :trap_exit?, :trap_exit
+
+ # Lookup a locally named service
+ def lookup(name)
+ raise ArgumentError, "name must be a symbol" unless Symbol === name
+ @@registered_lock.receive
+ begin
+ @@registered[name]
+ ensure
+ @@registered_lock << nil
+ end
+ end
+ alias_method :[], :lookup
+
+ # Register an Actor locally as a named service
+ def register(name, actor)
+ raise ArgumentError, "name must be a symbol" unless Symbol === name
+ unless actor.nil? or actor.is_a?(Actor)
+ raise ArgumentError, "only actors may be registered"
+ end
+
+ @@registered_lock.receive
+ begin
+ if actor.nil?
+ @@registered.delete(name)
+ else
+ @@registered[name] = actor
+ end
+ ensure
+ @@registered_lock << nil
+ end
+ end
+ alias_method :[]=, :register
+
+ def _unregister(actor) #:nodoc:
+ @@registered_lock.receive
+ begin
+ @@registered.delete_if { |n, a| actor.equal? a }
+ ensure
+ @@registered_lock << nil
+ end
+ end
+ end
+
+ def initialize
+ @lock = Queue.new
+
+ @filter = nil
+ @ready = Queue.new
+ @action = nil
+ @message = nil
+
+ @mailbox = []
+ @interrupts = []
+ @links = []
+ @alive = true
+ @exit_reason = nil
+ @trap_exit = false
+ @thread = Thread.current
+
+ @lock << nil
+
+ if block_given?
+ watchdog { yield self }
+ else
+ Thread.new { watchdog { @thread.join } }
+ end
+ end
+
+ def send(message)
+ @lock.pop
+ begin
+ return self unless @alive
+ if @filter
+ @action = @filter.action_for(message)
+ if @action
+ @filter = nil
+ @message = message
+ @ready << nil
+ else
+ @mailbox << message
+ end
+ else
+ @mailbox << message
+ end
+ ensure
+ @lock << nil
+ end
+ self
+ end
+ alias_method :<<, :send
+
+ def _check_for_interrupt #:nodoc:
+ check_thread
+ @lock.pop
+ begin
+ raise @interrupts.shift unless @interrupts.empty?
+ ensure
+ @lock << nil
+ end
+ end
+
+ def _receive(filter) #:nodoc:
+ check_thread
+
+ action = nil
+ message = nil
+ timed_out = false
+
+ @lock.pop
+ begin
+ raise @interrupts.shift unless @interrupts.empty?
+
+ for i in 0...(@mailbox.size)
+ message = @mailbox[i]
+ action = filter.action_for(message)
+ if action
+ @mailbox.delete_at(i)
+ break
+ end
+ end
+
+ unless action
+ @filter = filter
+ @lock << nil
+ begin
+ if filter.timeout?
+ timed_out = @ready.receive_timeout(filter.timeout) == false
+ else
+ @ready.pop
+ end
+ ensure
+ @lock.pop
+ end
+
+ if !timed_out and @interrupts.empty?
+ action = @action
+ message = @message
+ else
+ @mailbox << @message if @action
+ end
+
+ @action = nil
+ @message = nil
+
+ raise @interrupts.shift unless @interrupts.empty?
+ end
+ ensure
+ @lock << nil
+ end
+
+ if timed_out
+ filter.timeout_action.call
+ else
+ action.call message
+ end
+ end
+
+ # Notify this actor that it's now linked to the given one; this is not
+ # intended to be used directly except by actor implementations. Most
+ # users will want to use Actor.link instead.
+ #
+ def notify_link(actor)
+ @lock.pop
+ alive = nil
+ exit_reason = nil
+ begin
+ alive = @alive
+ exit_reason = @exit_reason
+ @links << actor if alive and not @links.include? actor
+ ensure
+ @lock << nil
+ end
+ actor.notify_exited(self, exit_reason) unless alive
+ self
+ end
+
+ # Notify this actor that it's now unlinked from the given one; this is
+ # not intended to be used directly except by actor implementations. Most
+ # users will want to use Actor.unlink instead.
+ #
+ def notify_unlink(actor)
+ @lock.pop
+ begin
+ return self unless @alive
+ @links.delete(actor)
+ ensure
+ @lock << nil
+ end
+ self
+ end
+
+ # Notify this actor that one of the Actors it's linked to has exited;
+ # this is not intended to be used directly except by actor implementations.
+ # Most users will want to use Actor.send_exit instead.
+ #
+ def notify_exited(actor, reason)
+ exit_message = nil
+ @lock.pop
+ begin
+ return self unless @alive
+ @links.delete(actor)
+ if @trap_exit
+ exit_message = DeadActorError.new(actor, reason)
+ elsif reason
+ @interrupts << DeadActorError.new(actor, reason)
+ if @filter
+ @filter = nil
+ @ready << nil
+ end
+ end
+ ensure
+ @lock << nil
+ end
+ send exit_message if exit_message
+ self
+ end
+
+ def watchdog
+ reason = nil
+ begin
+ yield
+ rescue Exception => reason
+ ensure
+ links = nil
+ Actor._unregister(self)
+ @lock.pop
+ begin
+ @alive = false
+ @mailbox = nil
+ @interrupts = nil
+ @exit_reason = reason
+ links = @links
+ @links = nil
+ ensure
+ @lock << nil
+ end
+ links.each do |actor|
+ begin
+ actor.notify_exited(self, reason)
+ rescue Exception
+ end
+ end
+ end
+ end
+ private :watchdog
+
+ def check_thread
+ unless Thread.current == @thread
+ raise ThreadError, "illegal cross-actor call"
+ end
+ end
+ private :check_thread
+
+ def _trap_exit=(value) #:nodoc:
+ check_thread
+ @lock.pop
+ begin
+ raise @interrupts.shift unless @interrupts.empty?
+ @trap_exit = !!value
+ ensure
+ @lock << nil
+ end
+ end
+
+ def _trap_exit #:nodoc:
+ check_thread
+ @lock.pop
+ begin
+ @trap_exit
+ ensure
+ @lock << nil
+ end
+ end
+end
+
+
+class Actor
+class Filter
+ attr_reader :timeout
+ attr_reader :timeout_action
+
+ def initialize
+ @pairs = []
+ @timeout = nil
+ @timeout_action = nil
+ end
+
+ def timeout?
+ not @timeout.nil?
+ end
+
+ def when(pattern, &action)
+ raise ArgumentError, "no block given" unless action
+ @pairs.push [pattern, action]
+ self
+ end
+
+ def after(seconds, &action)
+ raise ArgumentError, "no block given" unless action
+
+ seconds = seconds.to_f
+ if !@timeout or seconds < @timeout
+ @timeout = seconds
+ @timeout_action = action
+ end
+ self
+ end
+
+ def action_for(value)
+ pair = @pairs.find { |pattern, action| pattern === value }
+ pair ? pair.last : nil
+ end
+end
+end
22 lib/girl_friday/error_handler.rb
@@ -0,0 +1,22 @@
+module GirlFriday
+ class ErrorHandler
+ def handle(ex)
+ $stderr.puts(ex)
+ $stderr.puts(ex.backtrace.join("\n"))
+ end
+
+ def self.default
+ defined?(HoptoadNotifier) ? Hoptoad : self
+ end
+ end
+end
+
+module GirlFriday
+ class ErrorHandler
+ class Hoptoad
+ def handle(ex)
+ HoptoadNotifier.notify(ex)
+ end
+ end
+ end
+end
31 lib/girl_friday/monkey_patches.rb
@@ -0,0 +1,31 @@
+if RUBY_ENGINE == 'rbx' && Rubinius::VERSION < '1.2.4'
+
+ class Actor
+
+ # Monkeypatch so this works with Rubinius 1.2.3 (latest).
+ # 1.2.4 should have the necessary fix included.
+ def notify_exited(actor, reason)
+ exit_message = nil
+ @lock.receive
+ begin
+ return self unless @alive
+ @links.delete(actor)
+ if @trap_exit
+ exit_message = DeadActorError.new(actor, reason)
+ elsif reason
+ @interrupts << DeadActorError.new(actor, reason)
+ if @filter
+ @filter = nil
+ @ready << nil
+ end
+ end
+ ensure
+ @lock << nil
+ end
+ send exit_message if exit_message
+ self
+ end
+
+ end
+
+end
3 lib/girl_friday/version.rb
@@ -0,0 +1,3 @@
+module GirlFriday
+ VERSION = "0.0.1"
+end
83 lib/girl_friday/work_queue.rb
@@ -0,0 +1,83 @@
+
+module GirlFriday
+ class WorkQueue
+ Ready = Struct.new(:this)
+ Work = Struct.new(:msg)
+
+ attr_reader :name
+ def initialize(name, options={}, &block)
+ @name = name
+ @error_handler = (options[:error_handler] || ErrorHandler.default).new
+ create_pool(options[:size] || 5, block)
+ end
+
+ def push(work)
+ @supervisor << Work[work]
+ end
+ alias_method :<<, :push
+
+ private
+
+ def drain(ready, work)
+ # give as much work to as many ready workers as possible
+ todo = ready.size < work.size ? ready.size : work.size
+ todo.times do
+ ready.pop << work.pop
+ end
+ end
+
+ def create_pool(size, processor)
+ @supervisor = Actor.spawn do
+ supervisor = Actor.current
+ ready_workers = []
+ extra_work = []
+
+ Actor.trap_exit = true
+ size.times do |x|
+ # start N workers
+ ready_workers << Actor.spawn_link do
+ loop do
+ work = Actor.receive
+ processor.call(work.msg)
+ supervisor << Ready[Actor.current]
+ end
+ end
+ end
+
+ begin
+
+ loop do
+ Actor.receive do |f|
+ f.when(Ready) do |who|
+ if work = extra_work.pop
+ who.this << work
+ drain(ready_workers, extra_work)
+ else
+ ready_workers << who.this
+ end
+ end
+ f.when(Work) do |work|
+ if worker = ready_workers.pop
+ worker << work
+ drain(ready_workers, extra_work)
+ else
+ extra_work << work
+ end
+ end
+ f.when(Actor::DeadActorError) do |exit|
+ print "Actor exited due to: #{exit.reason}\n"
+ # TODO need to respawn crashed worker
+ end
+ end
+ end
+
+ rescue Exception => ex
+ $stderr.print "Fatal error in girl_friday: supervisor for #{name} died.\n"
+ $stderr.print("#{ex}\n")
+ $stderr.print("#{ex.backtrace.join("\n")}\n")
+ end
+
+ end
+
+ end
+end

0 comments on commit 95d09a0

Please sign in to comment.
Something went wrong with that request. Please try again.