Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Name your workers with ease #35

Open
wants to merge 2 commits into from

2 participants

@benfyvie

allowing a worker_name option to be passed in when creating a worker. This simplifies implementing workers that can restart jobs they were working on if the worker should happen to be killed or die.

Ben Fyvie added some commits
Ben Fyvie allowing a worker_name option to be passed in when creating a worker.…
… This simplifies implementing workers that can restart jobs they were working on if the worker should happen to be killed or die.
21024d3
Ben Fyvie Increment the "attempt" count of a job when a lock is retrieved (prio…
…r to invoking the job) and also check that the max_attempts is not exceeded prior to invoking the job.
d022b38
@latentflip latentflip referenced this pull request from a commit in latentflip/delayed_job
@bkeepers bkeepers List the daemons library as a dependency (for now, until I can kill i…
…t). closes #35
bac3c4d
@sodabrew

This looks specific to your site I think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 14, 2011
  1. allowing a worker_name option to be passed in when creating a worker.…

    Ben Fyvie authored
    … This simplifies implementing workers that can restart jobs they were working on if the worker should happen to be killed or die.
Commits on May 6, 2011
  1. Increment the "attempt" count of a job when a lock is retrieved (prio…

    Ben Fyvie authored
    …r to invoking the job) and also check that the max_attempts is not exceeded prior to invoking the job.
This page is out of date. Refresh to see the latest.
Showing with 37 additions and 20 deletions.
  1. +36 −20 lib/delayed/job.rb
  2. +1 −0  lib/delayed/worker.rb
View
56 lib/delayed/job.rb
@@ -1,3 +1,5 @@
+require 'timeout'
+
module Delayed
class DeserializationError < StandardError
@@ -6,8 +8,11 @@ class DeserializationError < StandardError
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ActiveRecord::Base
- MAX_ATTEMPTS = 25
- MAX_RUN_TIME = 4.hours
+ @@max_attempts = 25
+ @@max_run_time = 4.hours
+
+ cattr_accessor :max_attempts, :max_run_time
+
set_table_name :delayed_jobs
# By default failed jobs are destroyed after too many attempts.
@@ -63,33 +68,40 @@ def payload_object=(object)
# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
- if self.attempts < MAX_ATTEMPTS
+ if max_attempts_exceeded?
+ max_attempts_exceeded
+ else
time ||= Job.db_time_now + (attempts ** 4) + 5
- self.attempts += 1
self.run_at = time
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
- save!
- else
- logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
- destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
+ save!
end
end
+ def max_attempts_exceeded?
+ self.attempts > max_attempts
+ end
+
+ def max_attempts_exceeded
+ logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
+ destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
+ end
# Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
def run_with_lock(max_run_time, worker_name)
- logger.info "* [JOB] aquiring lock on #{name}"
+ logger.info "* [JOB] acquiring lock on #{name}"
unless lock_exclusively!(max_run_time, worker_name)
# We did not get the lock, some other worker process must have
- logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
+ logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
return nil # no work done
end
begin
+ raise "Attempted to run this job for a #{attempts} time which exceeds the max allowed of #{max_attempts}" if max_attempts_exceeded?
runtime = Benchmark.realtime do
- invoke_job # TODO: raise error if takes longer than max_run_time
+ Timeout.timeout(max_run_time.to_i) { invoke_job }
destroy
end
# TODO: warn if runtime > max_run_time ?
@@ -117,8 +129,7 @@ def self.enqueue(*args, &block)
end
# Find a few candidate jobs to run (in case some immediately get locked by others).
- # Return in random order prevent everyone trying to do same head job at once.
- def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
+ def self.find_available(limit = 5, max_run_time = max_run_time)
time_now = db_time_now
@@ -138,16 +149,14 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
conditions.unshift(sql)
- records = ActiveRecord::Base.silence do
+ ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
-
- records.sort_by { rand() }
end
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
- def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
+ def self.reserve_and_run_one_job(max_run_time = max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
@@ -165,11 +174,11 @@ def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
- self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ self.class.update_all(["locked_at = ?, locked_by = ?, attempts = ?", now, worker, self.attempts += 1], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
- self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
+ self.class.update_all(["locked_at = ?, attempts = ?", now, self.attempts += 1], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
@@ -190,6 +199,13 @@ def unlock
def log_exception(error)
logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
+ begin
+ scout_response = RailzScout.submit_bug(error, nil, nil)
+ logger.error("* [JOB] #{name} posted case #{scout_response[:case_number]} to Fogbugz")
+ rescue => e
+ logger.error("* [JOB] #{name} FogBugz post raised an error: #{e}")
+ logger.error(error.backtrace)
+ end
end
# Do num jobs and return stats on success/failure.
@@ -233,7 +249,7 @@ def deserialize(source)
return handler if handler.respond_to?(:perform)
raise DeserializationError,
- 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+ 'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
View
1  lib/delayed/worker.rb
@@ -11,6 +11,7 @@ class Worker
def initialize(options={})
@quiet = options[:quiet]
+ Delayed::Job.worker_name = options[:worker_name] if options.has_key?(:worker_name)
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
Something went wrong with that request. Please try again.