Skip to content

Commit

Permalink
Removed the global lock. Jobs can now processed in parallel by runnin…
Browse files Browse the repository at this point in the history
…g multiple job runners on the same machine or across your server farm.

This requires two new columns in the job table: locked_by and locked_until
  • Loading branch information
Tobias Lütke committed Mar 23, 2008
1 parent 2309a94 commit 8ec934e
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 166 deletions.
71 changes: 36 additions & 35 deletions README
Expand Up @@ -12,6 +12,12 @@ It is a direct extraction from Shopify where the job table is responsible for a
* updating solr, our search server, after product changes
* batch imports
* spam checks

== Changes ==

1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us
to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
1.0 Initial release

== Setup ==

Expand All @@ -22,7 +28,9 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
table.datetime :run_at
table.datetime :run_at
table.datetime :locked_until
table.string :locked_by
table.timestamps
end

Expand Down Expand Up @@ -58,46 +66,39 @@ At Shopify we run the the tasks from a simple script/job_runner which is being i
#!/usr/bin/env ruby
require File.dirname(__FILE__) + '/../config/environment'

SLEEP = 15
RESTART_AFTER = 1000
SLEEP = 5

trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true }

# this script dies after several runs to prevent memory leaks.
# runnit will immediately start it again.
count, runs_left = 0, RESTART_AFTER

loop do

count = 0

# this requires the locking plugin, also from jadedPixel
ActiveRecord::base.aquire_lock("jobs table worker", 10) do
puts 'got lock'

realtime = Benchmark.realtime do
count = Delayed::Job.work_off
end
end
puts "*** Staring job worker #{Delayed::Job.worker_name}"

begin

loop do
result = nil

runs_left -= 1
realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end

break if $exit
count = result.sum

break if $exit

if count.zero?
sleep(SLEEP)
else
status = "#{count} jobs completed at %.2f j/s ..." % [count / realtime]
RAILS_DEFAULT_LOGGER.info status
puts status
end
if count.zero?
sleep(SLEEP)
puts 'Waiting for more jobs...'
else
status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
RAILS_DEFAULT_LOGGER.info status
puts status
end

if $exit or runs_left <= 0
break
break if $exit
end
end

== Todo ==
Work out a locking mechanism which would allow several job runners to run at the same time, spreading the load between them.

ensure
Delayed::Job.clear_locks!
end

228 changes: 139 additions & 89 deletions lib/delayed/job.rb
Expand Up @@ -4,142 +4,192 @@ class DeserializationError < StandardError
end

class Job < ActiveRecord::Base
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

set_table_name :delayed_jobs

class Runner
attr_accessor :logger, :jobs
attr_accessor :runs, :success, :failure

cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"

def initialize(jobs, logger = nil)
@jobs = jobs
@logger = logger
self.runs = self.success = self.failure = 0
end

def run

ActiveRecord::Base.cache do
ActiveRecord::Base.transaction do
@jobs.each do |job|
self.runs += 1
begin
time = Benchmark.measure do
job.perform
ActiveRecord::Base.uncached { job.destroy }
self.success += 1
end
logger.debug "Executed job in #{time.real}"
rescue DeserializationError, StandardError, RuntimeError => e
if logger
logger.error "Job #{job.id}: #{e.class} #{e.message}"
logger.error e.backtrace.join("\n")
end
ActiveRecord::Base.uncached { job.reshedule e.message }
self.failure += 1
end
end
end
end

self
end
end

def self.enqueue(object, priority = 0)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform' unless object.respond_to?(:perform)

Job.create(:handler => object, :priority => priority)
end

def handler=(object)
self['handler'] = object.to_yaml
NextTaskSQL = '`run_at` <= ? AND (`locked_until` IS NULL OR `locked_until` < ?) OR (`locked_by`=?)'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

class LockError < StandardError
end

def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_until`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end

def handler
@handler ||= deserialize(self['handler'])
def payload_object
@payload_object ||= deserialize(self['handler'])
end

def perform
handler.perform
def payload_object=(object)
self['handler'] = object.to_yaml
end

def reshedule(message)
self.attempts += 1
self.run_at = self.class.time_now + (attempts ** 4).seconds
self.last_error = message
def reshedule(message, time = nil)
time ||= Job.db_time_now + (attempts ** 4).seconds + 1

self.attempts += 1
self.run_at = time
self.last_error = message
self.unlock
save!
end

def self.peek(limit = 1)
if limit == 1
find(:first, :order => "priority DESC, run_at ASC", :conditions => ['run_at <= ?', time_now])
else
find(:all, :order => "priority DESC, run_at ASC", :limit => limit, :conditions => ['run_at <= ?', time_now])
end


def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
end

def self.work_off(limit = 100)
jobs = Job.find(:all, :conditions => ['run_at <= ?', time_now], :order => "priority DESC, run_at ASC", :limit => limit)
Job.create(:payload_object => object, :priority => priority)
end

Job::Runner.new(jobs, logger).run
def self.find_available(limit = 5)
time_now = db_time_now
find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => 5)
end

protected

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(timeout = 5 * 60)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
job.lock_exclusively!(self.db_time_now + timeout, worker_name)
yield job.payload_object
job.destroy
return job
rescue LockError
# We did not get the lock, some other worker process must have
puts "failed to aquire exclusive lock for #{job.id}"
rescue StandardError => e
job.reshedule e.message
return job
end
end

nil
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(lock_until, worker = worker_name)

affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_until
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_until`<#{quote_value(self.class.db_time_now)} OR `locked_until` IS NULL)
end_sql

else

# We alrady own this job, this may happen if the job queue crashes.
# Simply update the lock timeout
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql

end

unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
end

self.locked_until = lock_until
self.locked_by = worker
end

def self.time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
def unlock
self.locked_until = nil
self.locked_by = nil
end

def before_save
self.run_at ||= self.class.time_now
end

def self.work_off(num = 100)
success, failure = 0, 0

num.times do

job = self.reserve do |j|
begin
j.perform
success += 1
rescue
failure += 1
raise
end
end

break if job.nil?
end

return [success, failure]
end

private

def deserialize(source)
attempt_to_load_file = true

begin
handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform)

if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
end

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
end

end
def attempt_to_load(klass)
klass.constantize
end

def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end

protected

def before_save
self.run_at ||= self.class.db_time_now
end

end
end
3 changes: 3 additions & 0 deletions lib/delayed/performable_method.rb
Expand Up @@ -12,6 +12,9 @@ def initialize(object, method, args)

def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
true
end

private
Expand Down

0 comments on commit 8ec934e

Please sign in to comment.