Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial extraction

  • Loading branch information...
commit 75b49dc1c281ffa934a4eb2c7e840291e8b4a5ff 0 parents
Tobias Lütke authored
20 MIT-LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2005 Tobias Luetke
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOa AND
+NONINFRINGEMENT. IN NO EVENT SaALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
103 README
@@ -0,0 +1,103 @@
+Delayed::Job
+============
+
+Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
+
+It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are:
+
+* sending massive newsletters
+* image resizing
+* http downloads
+* updating smart collections
+* updating solr, our search server, after product changes
+* batch imports
+* spam checks
+
+== Setup ==
+
+The library evolves around a delayed_jobs table which looks as follows:
+
+ create_table :delayed_jobs, :force => true do |table|
+ table.integer :priority, :default => 0
+ table.integer :attempts, :default => 0
+ table.text :handler
+ table.string :last_error
+ table.datetime :run_at
+ table.timestamps
+ end
+
+== Usage ==
+
+Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table.
+Job objects are serialized to yaml so that they can later be resurrected by the job runner.
+
+ class NewsletterJob < Struct.new(:text, :emails)
+ def perform
+ emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
+ end
+ end
+
+ Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.find(:all).collect(&:email))
+
+There is also a second way to get jobs in the queue: send_later.
+
+
+ BatchImporter.new(Shop.find(1)).send_later(:import_massive_csv, massive_csv)
+
+
+This will simply create a Delayed::PerformableMethod job in the jobs table which serializes all the parameters you pass to it. There are some special smarts for active record objects
+which are stored as their text representation and loaded from the database fresh when the job is actually run later.
+
+
+== Running the tasks ==
+
+You can invoke rake jobs:work which will start working off jobs. You can cancel the rake task by CTRL-C.
+
+At Shopify we run the the tasks from a simple script/job_runner which is being invoked by runnit:
+
+ #!/usr/bin/env ruby
+ require File.dirname(__FILE__) + '/../config/environment'
+
+ SLEEP = 15
+ RESTART_AFTER = 1000
+
+ trap('TERM') { puts 'Exiting...'; $exit = true }
+ trap('INT') { puts 'Exiting...'; $exit = true }
+
+ # this script dies after several runs to prevent memory leaks.
+ # runnit will immediately start it again.
+ count, runs_left = 0, RESTART_AFTER
+
+ loop do
+
+ count = 0
+
+ # this requires the locking plugin, also from jadedPixel
+ ActiveRecord::base.aquire_lock("jobs table worker", 10) do
+ puts 'got lock'
+
+ realtime = Benchmark.realtime do
+ count = Delayed::Job.work_off
+ end
+ end
+
+ runs_left -= 1
+
+ break if $exit
+
+ if count.zero?
+ sleep(SLEEP)
+ else
+ status = "#{count} jobs completed at %.2f j/s ..." % [count / realtime]
+ RAILS_DEFAULT_LOGGER.info status
+ puts status
+ end
+
+ if $exit or runs_left <= 0
+ break
+ end
+ end
+
+== Todo ==
+
+Work out a locking mechanism which would allow several job runners to run at the same time, spreading the load between them.
5 init.rb
@@ -0,0 +1,5 @@
+require File.dirname(__FILE__) + '/lib/delayed/message_sending'
+require File.dirname(__FILE__) + '/lib/delayed/performable_method'
+require File.dirname(__FILE__) + '/lib/delayed/job'
+
+Object.send(:include, Delayed::MessageSending)
145 lib/delayed/job.rb
@@ -0,0 +1,145 @@
+module Delayed
+
+ class DeserializationError < StandardError
+ end
+
+ class Job < ActiveRecord::Base
+ ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
+
+ set_table_name :delayed_jobs
+
+ class Runner
+ attr_accessor :logger, :jobs
+ attr_accessor :runs, :success, :failure
+
+ def initialize(jobs, logger = nil)
+ @jobs = jobs
+ @logger = logger
+ self.runs = self.success = self.failure = 0
+ end
+
+ def run
+
+ ActiveRecord::Base.cache do
+ ActiveRecord::Base.transaction do
+ @jobs.each do |job|
+ self.runs += 1
+ begin
+ time = Benchmark.measure do
+ job.perform
+ ActiveRecord::Base.uncached { job.destroy }
+ self.success += 1
+ end
+ logger.debug "Executed job in #{time.real}"
+ rescue DeserializationError, StandardError, RuntimeError => e
+ if logger
+ logger.error "Job #{job.id}: #{e.class} #{e.message}"
+ logger.error e.backtrace.join("\n")
+ end
+ ActiveRecord::Base.uncached { job.reshedule e.message }
+ self.failure += 1
+ end
+ end
+ end
+ end
+
+ self
+ end
+ end
+
+ def self.enqueue(object, priority = 0)
+ raise ArgumentError, 'Cannot enqueue items which do not respond to perform' unless object.respond_to?(:perform)
+
+ Job.create(:handler => object, :priority => priority)
+ end
+
+ def handler=(object)
+ self['handler'] = object.to_yaml
+ end
+
+ def handler
+ @handler ||= deserialize(self['handler'])
+ end
+
+ def perform
+ handler.perform
+ end
+
+ def reshedule(message)
+ self.attempts += 1
+ self.run_at = self.class.time_now + 5.minutes
+ self.last_error = message
+ save!
+ end
+
+ def self.peek(limit = 1)
+ if limit == 1
+ find(:first, :order => "priority DESC, run_at ASC", :conditions => ['run_at <= ?', time_now])
+ else
+ find(:all, :order => "priority DESC, run_at ASC", :limit => limit, :conditions => ['run_at <= ?', time_now])
+ end
+ end
+
+ def self.work_off(limit = 100)
+ jobs = Job.find(:all, :conditions => ['run_at <= ?', time_now], :order => "priority DESC, run_at ASC", :limit => limit)
+
+ Job::Runner.new(jobs, logger).run
+ end
+
+ protected
+
+ def self.time_now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ end
+
+ def before_save
+ self.run_at ||= self.class.time_now
+ end
+
+ private
+
+ def deserialize(source)
+ attempt_to_load_file = true
+
+ begin
+ handler = YAML.load(source) rescue nil
+ return handler if handler.respond_to?(:perform)
+
+ if handler.nil?
+ if source =~ ParseObjectFromYaml
+
+ # Constantize the object so that ActiveSupport can attempt
+ # its auto loading magic. Will raise LoadError if not successful.
+ attempt_to_load($1)
+
+ # If successful, retry the yaml.load
+ handler = YAML.load(source)
+ return handler if handler.respond_to?(:perform)
+ end
+ end
+
+ if handler.is_a?(YAML::Object)
+
+ # Constantize the object so that ActiveSupport can attempt
+ # its auto loading magic. Will raise LoadError if not successful.
+ attempt_to_load(handler.class)
+
+ # If successful, retry the yaml.load
+ handler = YAML.load(source)
+ return handler if handler.respond_to?(:perform)
+ end
+
+ raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+
+ rescue TypeError, LoadError, NameError => e
+
+ raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
+ end
+ end
+
+ def attempt_to_load(klass)
+ klass.constantize
+ end
+
+ end
+end
7 lib/delayed/message_sending.rb
@@ -0,0 +1,7 @@
+module Delayed
+ module MessageSending
+ def send_later(method, *args)
+ Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
+ end
+ end
+end
37 lib/delayed/performable_method.rb
@@ -0,0 +1,37 @@
+module Delayed
+ class PerformableMethod < Struct.new(:object, :method, :args)
+ AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
+
+ def initialize(object, method, args)
+ raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)
+
+ self.object = dump(object)
+ self.args = args.map { |a| dump(a) }
+ self.method = method.to_sym
+ end
+
+ def perform
+ load(object).send(method, *args.map{|a| load(a)})
+ end
+
+ private
+
+ def load(arg)
+ case arg
+ when AR_STRING_FORMAT then $1.constantize.find($2)
+ else arg
+ end
+ end
+
+ def dump(arg)
+ case arg
+ when ActiveRecord::Base then ar_to_string(arg)
+ else arg
+ end
+ end
+
+ def ar_to_string(obj)
+ "AR:#{obj.class}:#{obj.id}"
+ end
+ end
+end
33 spec/database.rb
@@ -0,0 +1,33 @@
+$:.unshift(File.dirname(__FILE__) + '/../lib')
+
+require 'rubygems'
+require 'active_record'
+require File.dirname(__FILE__) + '/../init'
+
+ActiveRecord::Base.logger = Logger.new(nil)
+ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite')
+ActiveRecord::Migration.verbose = false
+
+def reset_db
+ ActiveRecord::Schema.define do
+
+ create_table :delayed_jobs, :force => true do |table|
+ table.integer :priority, :default => 0
+ table.integer :attempts, :default => 0
+ table.text :handler
+ table.string :last_error
+ table.datetime :run_at
+ table.timestamps
+ end
+
+ create_table :stories, :force => true do |table|
+ table.string :text
+ end
+
+ end
+end
+
+# Purely useful for test cases...
+class Story < ActiveRecord::Base
+ def tell; text; end
+end
81 spec/delayed_method_spec.rb
@@ -0,0 +1,81 @@
+require File.dirname(__FILE__) + '/database'
+
+
+class SimpleJob
+ cattr_accessor :runs; self.runs = 0
+ def perform; @@runs += 1; end
+end
+
+class RandomRubyObject
+ def say_hello
+ 'hello'
+ end
+end
+
+class StoryReader
+
+ def read(story)
+ "Epilog: #{story.tell}"
+ end
+
+end
+
+
+describe 'random ruby objects' do
+
+ before { reset_db }
+
+ it "should respond_to :send_later method" do
+
+ RandomRubyObject.new.respond_to?(:send_later)
+
+ end
+
+ it "should raise a ArgumentError if send_later is called but the target method doesn't exist" do
+ lambda { RandomRubyObject.new.send_later(:method_that_deos_not_exist) }.should raise_error(NoMethodError)
+ end
+
+ it "should add a new entry to the job table when send_later is called on it" do
+ Delayed::Job.count.should == 0
+
+ RandomRubyObject.new.send_later(:to_s)
+
+ Delayed::Job.count.should == 1
+ end
+
+ it "should run get the original method executed when the job is performed" do
+
+ RandomRubyObject.new.send_later(:say_hello)
+
+ Delayed::Job.count.should == 1
+ Delayed::Job.peek.perform.should == 'hello'
+ end
+
+ it "should store the object as string if its an active record" do
+
+ story = Story.create :text => 'Once upon...'
+ story.send_later(:tell)
+
+ job = Delayed::Job.peek
+ job.handler.class.should == Delayed::PerformableMethod
+ job.handler.object.should == 'AR:Story:1'
+ job.handler.method.should == :tell
+ job.handler.args.should == []
+ job.perform.should == 'Once upon...'
+ end
+
+ it "should store arguments as string if they an active record" do
+
+ story = Story.create :text => 'Once upon...'
+
+ reader = StoryReader.new
+ reader.send_later(:read, story)
+
+ job = Delayed::Job.peek
+ job.handler.class.should == Delayed::PerformableMethod
+ job.handler.method.should == :read
+ job.handler.args.should == ['AR:Story:1']
+ job.perform.should == 'Epilog: Once upon...'
+ end
+
+end
122 spec/job_spec.rb
@@ -0,0 +1,122 @@
+require File.dirname(__FILE__) + '/database'
+
+
+class SimpleJob
+ cattr_accessor :runs; self.runs = 0
+ def perform; @@runs += 1; end
+end
+
+class ErrorJob
+ cattr_accessor :runs; self.runs = 0
+ def perform; raise 'did not work'; end
+end
+
+describe Delayed::Job do
+
+ before :each do
+ reset_db
+ end
+
+ it "should set run_at automatically" do
+ Delayed::Job.create.run_at.should_not == nil
+ end
+
+ it "should raise ArgumentError when handler doesn't respond_to :perform" do
+ lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError)
+ end
+
+ it "should increase count after enqueuing items" do
+ Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.count.should == 1
+ end
+
+ it "should return nil when peeking on empty table" do
+ Delayed::Job.peek.should == nil
+ end
+
+ it "should return a job when peeking a table with jobs in it" do
+ Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.peek.class.should == Delayed::Job
+ end
+
+ it "should return an array of jobs when peek is called with a count larger than zero" do
+ Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.peek(2).class.should == Array
+ end
+
+ it "should call perform on jobs when running work_off" do
+ SimpleJob.runs.should == 0
+
+ Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.work_off(1)
+
+ SimpleJob.runs.should == 1
+ end
+
+ it "should re-schedule by about 5 minutes when it fails to execute properly" do
+ Delayed::Job.enqueue ErrorJob.new
+ runner = Delayed::Job.work_off(1)
+ runner.success.should == 0
+ runner.failure.should == 1
+
+ job = Delayed::Job.find(:first)
+ job.last_error.should == 'did not work'
+ job.attempts.should == 1
+ job.run_at.should > Time.now + 4.minutes
+ job.run_at.should < Time.now + 6.minutes
+ end
+
+ it "should raise an DeserializationError when the job class is totally unknown" do
+
+ job = Delayed::Job.new
+ job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
+
+ lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
+ it "should try to load the class when it is unknown at the time of the deserialization" do
+ job = Delayed::Job.new
+ job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
+
+ job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
+
+ lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
+ it "should try include the namespace when loading unknown objects" do
+ job = Delayed::Job.new
+ job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
+
+ it "should also try to load structs when they are unknown (raises TypeError)" do
+ job = Delayed::Job.new
+ job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}"
+
+ job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
+
+ lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
+ it "should try include the namespace when loading unknown structs" do
+ job = Delayed::Job.new
+ job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
+end
+
+
+
+
+
+
+
+
+
+
+
+
18 spec/story_spec.rb
@@ -0,0 +1,18 @@
+require File.dirname(__FILE__) + '/database'
+
+describe "A story" do
+
+ before do
+ reset_db
+ Story.create :text => "Once upon a time..."
+ end
+
+ it "should be shared" do
+ Story.find(:first).tell.should == 'Once upon a time...'
+ end
+
+ it "should not return its result if it storytelling is delayed" do
+ Story.find(:first).send_later(:tell).should_not == 'Once upon a time...'
+ end
+
+end
29 tasks/jobs.rake
@@ -0,0 +1,29 @@
+namespace :jobs do
+
+ task :work => :environment do
+
+ SLEEP = 5
+
+ trap('TERM') { puts 'Exiting...'; $exit = true }
+ trap('INT') { puts 'Exiting...'; $exit = true }
+
+ loop do
+
+ count = 0
+
+ realtime = Benchmark.realtime do
+ count = Delayed::Job.work_off
+ end
+
+ break if $exit
+
+ if count.zero?
+ sleep(SLEEP)
+ else
+ RAILS_DEFAULT_LOGGER.info "#{count} jobs completed at %.2f j/s ..." % [count / realtime]
+ end
+
+ break if $exit
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.