Skip to content

Commit

Permalink
Removed the global lock. Jobs can now processed in parallel by runnin…
Browse files Browse the repository at this point in the history
…g multiple job runners on the same machine or across your server farm.

This requires two new columns in the job table: locked_by and locked_until
  • Loading branch information
Tobias Lütke committed Mar 23, 2008
1 parent 2309a94 commit 8ec934e
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 166 deletions.
71 changes: 36 additions & 35 deletions README
Expand Up @@ -12,6 +12,12 @@ It is a direct extraction from Shopify where the job table is responsible for a
* updating solr, our search server, after product changes * updating solr, our search server, after product changes
* batch imports * batch imports
* spam checks * spam checks

== Changes ==

1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us
to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
1.0 Initial release


== Setup == == Setup ==


Expand All @@ -22,7 +28,9 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0 table.integer :attempts, :default => 0
table.text :handler table.text :handler
table.string :last_error table.string :last_error
table.datetime :run_at table.datetime :run_at
table.datetime :locked_until
table.string :locked_by
table.timestamps table.timestamps
end end


Expand Down Expand Up @@ -58,46 +66,39 @@ At Shopify we run the the tasks from a simple script/job_runner which is being i
#!/usr/bin/env ruby #!/usr/bin/env ruby
require File.dirname(__FILE__) + '/../config/environment' require File.dirname(__FILE__) + '/../config/environment'


SLEEP = 15 SLEEP = 5
RESTART_AFTER = 1000


trap('TERM') { puts 'Exiting...'; $exit = true } trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true } trap('INT') { puts 'Exiting...'; $exit = true }

# this script dies after several runs to prevent memory leaks.
# runnit will immediately start it again.
count, runs_left = 0, RESTART_AFTER


loop do puts "*** Staring job worker #{Delayed::Job.worker_name}"


count = 0 begin


# this requires the locking plugin, also from jadedPixel loop do
ActiveRecord::base.aquire_lock("jobs table worker", 10) do result = nil
puts 'got lock'

realtime = Benchmark.realtime do
count = Delayed::Job.work_off
end
end


runs_left -= 1 realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end


break if $exit count = result.sum

break if $exit


if count.zero? if count.zero?
sleep(SLEEP) sleep(SLEEP)
else puts 'Waiting for more jobs...'
status = "#{count} jobs completed at %.2f j/s ..." % [count / realtime] else
RAILS_DEFAULT_LOGGER.info status status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
puts status RAILS_DEFAULT_LOGGER.info status
end puts status
end


if $exit or runs_left <= 0 break if $exit
break
end end
end

ensure
== Todo == Delayed::Job.clear_locks!
end
Work out a locking mechanism which would allow several job runners to run at the same time, spreading the load between them.
228 changes: 139 additions & 89 deletions lib/delayed/job.rb
Expand Up @@ -4,142 +4,192 @@ class DeserializationError < StandardError
end end


class Job < ActiveRecord::Base class Job < ActiveRecord::Base
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

set_table_name :delayed_jobs set_table_name :delayed_jobs


class Runner cattr_accessor :worker_name
attr_accessor :logger, :jobs self.worker_name = "pid:#{Process.pid}"
attr_accessor :runs, :success, :failure


def initialize(jobs, logger = nil)
@jobs = jobs
@logger = logger
self.runs = self.success = self.failure = 0
end

def run

ActiveRecord::Base.cache do
ActiveRecord::Base.transaction do
@jobs.each do |job|
self.runs += 1
begin
time = Benchmark.measure do
job.perform
ActiveRecord::Base.uncached { job.destroy }
self.success += 1
end
logger.debug "Executed job in #{time.real}"
rescue DeserializationError, StandardError, RuntimeError => e
if logger
logger.error "Job #{job.id}: #{e.class} #{e.message}"
logger.error e.backtrace.join("\n")
end
ActiveRecord::Base.uncached { job.reshedule e.message }
self.failure += 1
end
end
end
end

self
end
end

def self.enqueue(object, priority = 0)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform' unless object.respond_to?(:perform)


Job.create(:handler => object, :priority => priority) NextTaskSQL = '`run_at` <= ? AND (`locked_until` IS NULL OR `locked_until` < ?) OR (`locked_by`=?)'
end NextTaskOrder = 'priority DESC, run_at ASC'

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
def handler=(object)
self['handler'] = object.to_yaml class LockError < StandardError
end

def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_until`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end end

def handler def payload_object
@handler ||= deserialize(self['handler']) @payload_object ||= deserialize(self['handler'])
end end


def perform def payload_object=(object)
handler.perform self['handler'] = object.to_yaml
end end


def reshedule(message) def reshedule(message, time = nil)
self.attempts += 1 time ||= Job.db_time_now + (attempts ** 4).seconds + 1
self.run_at = self.class.time_now + (attempts ** 4).seconds
self.last_error = message self.attempts += 1
self.run_at = time
self.last_error = message
self.unlock
save! save!
end end


def self.peek(limit = 1)
if limit == 1 def self.enqueue(object, priority = 0)
find(:first, :order => "priority DESC, run_at ASC", :conditions => ['run_at <= ?', time_now]) unless object.respond_to?(:perform)
else raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
find(:all, :order => "priority DESC, run_at ASC", :limit => limit, :conditions => ['run_at <= ?', time_now])
end end
end


def self.work_off(limit = 100) Job.create(:payload_object => object, :priority => priority)
jobs = Job.find(:all, :conditions => ['run_at <= ?', time_now], :order => "priority DESC, run_at ASC", :limit => limit) end


Job::Runner.new(jobs, logger).run def self.find_available(limit = 5)
time_now = db_time_now
find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => 5)
end end


protected # Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(timeout = 5 * 60)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
job.lock_exclusively!(self.db_time_now + timeout, worker_name)
yield job.payload_object
job.destroy
return job
rescue LockError
# We did not get the lock, some other worker process must have
puts "failed to aquire exclusive lock for #{job.id}"
rescue StandardError => e
job.reshedule e.message
return job
end
end

nil
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(lock_until, worker = worker_name)

affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_until
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_until`<#{quote_value(self.class.db_time_now)} OR `locked_until` IS NULL)
end_sql

else

# We alrady own this job, this may happen if the job queue crashes.
# Simply update the lock timeout
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql

end

unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
end

self.locked_until = lock_until
self.locked_by = worker
end


def self.time_now def unlock
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now self.locked_until = nil
self.locked_by = nil
end end


def before_save def self.work_off(num = 100)
self.run_at ||= self.class.time_now success, failure = 0, 0
end

num.times do

job = self.reserve do |j|
begin
j.perform
success += 1
rescue
failure += 1
raise
end
end

break if job.nil?
end

return [success, failure]
end

private private

def deserialize(source) def deserialize(source)
attempt_to_load_file = true attempt_to_load_file = true

begin begin
handler = YAML.load(source) rescue nil handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform) return handler if handler.respond_to?(:perform)

if handler.nil? if handler.nil?
if source =~ ParseObjectFromYaml if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt # Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful. # its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1) attempt_to_load($1)

# If successful, retry the yaml.load # If successful, retry the yaml.load
handler = YAML.load(source) handler = YAML.load(source)
return handler if handler.respond_to?(:perform) return handler if handler.respond_to?(:perform)
end end
end end

if handler.is_a?(YAML::Object) if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt # Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful. # its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class) attempt_to_load(handler.class)

# If successful, retry the yaml.load # If successful, retry the yaml.load
handler = YAML.load(source) handler = YAML.load(source)
return handler if handler.respond_to?(:perform) return handler if handler.respond_to?(:perform)
end end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e rescue TypeError, LoadError, NameError => e

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end end
end end

def attempt_to_load(klass) def attempt_to_load(klass)
klass.constantize klass.constantize
end end

def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end


protected

def before_save
self.run_at ||= self.class.db_time_now
end

end end
end end
3 changes: 3 additions & 0 deletions lib/delayed/performable_method.rb
Expand Up @@ -12,6 +12,9 @@ def initialize(object, method, args)


def perform def perform
load(object).send(method, *args.map{|a| load(a)}) load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
true
end end


private private
Expand Down

0 comments on commit 8ec934e

Please sign in to comment.