Permalink
Browse files

Made a gem out of delayed_job so I can use it in my services. Had to …

…modify Worker to not infer DJ is running in a Rails instance.
  • Loading branch information...
Justin Knowlden
Justin Knowlden committed Nov 29, 2008
1 parent bcf8d1d commit 9ba6b0872375b55eadf9979b8773efe9132bda2c
Showing with 82 additions and 36 deletions.
  1. +1 −0 .gitignore
  2. +2 −0 HISTORY.txt
  3. +39 −0 delayed_job.gemspec
  4. +1 −5 init.rb
  5. +23 −23 lib/delayed/job.rb
  6. +10 −8 lib/delayed/worker.rb
  7. +6 −0 lib/delayed_job.rb
View
@@ -0,0 +1 @@
*.gem
View
@@ -0,0 +1,2 @@
== 0.1.0 / 2008-11-28
* First of many versions
View
@@ -0,0 +1,39 @@
Gem::Specification.new do |s|
s.name = "delayed_job"
s.version = "0.1.0"
s.date = "2008-11-28"
s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
s.email = "tobi@leetsoft.com"
s.homepage = "http://github.com/tobi/delayed_job/tree/master"
s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks."
s.authors = ["Tobias Lütke", "Justin Knowlden"]
# s.bindir = "bin"
# s.executables = ["delayed_job"]
# s.default_executable = "delayed_job"
s.has_rdoc = false
s.rdoc_options = ["--main", "README.textile"]
s.extra_rdoc_files = ["HISTORY.txt", "README.textile"]
# run git ls-files to get an updated list
s.files = %w[
HISTORY.txt
MIT-LICENSE
README.textile
delayed_job.gemspec
init.rb
lib/delayed/job.rb
lib/delayed/message_sending.rb
lib/delayed/performable_method.rb
lib/delayed/worker.rb
lib/delayed_job.rb
tasks/jobs.rake
]
s.test_files = %w[
spec/database.rb
spec/delayed_method_spec.rb
spec/job_spec.rb
spec/story_spec.rb
]
end
View
@@ -1,5 +1 @@
require File.dirname(__FILE__) + '/lib/delayed/message_sending'
require File.dirname(__FILE__) + '/lib/delayed/performable_method'
require File.dirname(__FILE__) + '/lib/delayed/job'
Object.send(:include, Delayed::MessageSending)
require File.dirname(__FILE__) + '/lib/delayed_job'
View
@@ -15,8 +15,8 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
@@ -25,10 +25,10 @@ class Job < ActiveRecord::Base
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil
self.max_priority = nil
class LockError < StandardError
end
@@ -45,8 +45,8 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
def name
def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
@@ -80,13 +80,13 @@ def self.enqueue(*args, &block)
if block_given?
priority = args.first || 0
run_at = args.second
Job.create(:payload_object => EvaledJob.new(&block), :priority => priority.to_i, :run_at => run_at)
else
object = args.first
priority = args.second || 0
run_at = args.third
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
@@ -96,32 +96,32 @@ def self.enqueue(*args, &block)
end
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
time_now = db_time_now
time_now = db_time_now
sql = NextTaskSQL.dup
conditions = [time_now, time_now - max_run_time, worker_name]
if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end
if self.max_priority
sql << ' AND (priority <= ?)'
conditions << max_priority
conditions << max_priority
end
conditions.unshift(sql)
conditions.unshift(sql)
records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
records.sort { rand() }
end
end
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
@@ -142,7 +142,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
@@ -160,16 +160,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
self.locked_at = now
self.locked_by = worker
end
def unlock
self.locked_at = nil
self.locked_by = nil
View
@@ -2,19 +2,21 @@ module Delayed
class Worker
SLEEP = 5
cattr_accessor :logger
self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER)
def initialize(options={})
@quiet = options[:quiet]
@quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
end
def start
say "*** Starting job worker #{Delayed::Job.worker_name}"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
loop do
result = nil
@@ -33,15 +35,15 @@ def start
end
break if $exit
end
end
ensure
Delayed::Job.clear_locks!
end
def say(text)
puts text unless @quiet
RAILS_DEFAULT_LOGGER.info text
logger.info text if logger
end
end
View
@@ -0,0 +1,6 @@
require File.dirname(__FILE__) + '/delayed/message_sending'
require File.dirname(__FILE__) + '/delayed/performable_method'
require File.dirname(__FILE__) + '/delayed/job'
require File.dirname(__FILE__) + '/delayed/worker'
Object.send(:include, Delayed::MessageSending)

0 comments on commit 9ba6b08

Please sign in to comment.