Permalink
Browse files

Pulling brianjlandau/resque-scheduler.git

  • Loading branch information...
1 parent cb32cbc commit 08ff38c73b86a611e0f98db335fa4395383d1cbd @bvandenbos bvandenbos committed Nov 12, 2010
Showing with 309 additions and 24 deletions.
  1. +22 −0 README.markdown
  2. +86 −24 lib/resque/scheduler.rb
  3. +44 −0 lib/resque_scheduler.rb
  4. +1 −0 lib/resque_scheduler/server.rb
  5. +156 −0 test/scheduler_test.rb
View
@@ -138,6 +138,28 @@ Multiple envs are allowed, separated by commas:
NOTE: If you specify the `rails_env` arg without setting RAILS_ENV as an
environment variable, the job won't be loaded.
+### Dynamic Schedules
+
+If needed you can also have schedules that are dynamically defined and updated inside of your application. This can be completed by loading the schedule initially wherever you configure Resque and setting `Resque::Scheduler.dynamic` to `true`. Then subsequently updating the "`schedules`" key in redis, namespaced to the Resque namespace. The "`schedules`" key is expected to be a redis hash data type, where the key is the name of the schedule and the value is a JSON encoded hash of the schedule configuration.
+
+When the scheduler loops it will look for differences between the existing schedule and the current schedule in redis. If there are differences it will make the necessary changes to the running schedule.
+
+To force the scheduler to reload the schedule you just send it the `USR2` signal.
+
+Convenience methods are provided to add/update, delete, and retrieve individual schedule items from the `schedules` in redis:
+
+* `Resque.set_schedule(name, config)`
+* `Resque.get_schedule(name)`
+* `Resque.remove_schedule(name)`
+
+For example:
+
+ Resque.set_schedule("create_fake_leaderboards", {
+ :cron => "30 6 * * 1",
+ :class => "CreateFakeLeaderboards",
+ :queue => scoring
+ })
+
### Support for customized Job classes
Some Resque extensions like [resque-status](http://github.com/quirkey/resque-status) use custom job classes with a slightly different API signature.
View
@@ -14,19 +14,29 @@ class << self
# If set, produces no output
attr_accessor :mute
+
+ # If set, will try to update the schulde in the loop
+ attr_accessor :dynamic
+
+ # the Rufus::Scheduler jobs that are scheduled
+ def scheduled_jobs
+ @@scheduled_jobs
+ end
# Schedule all jobs and continually look for delayed jobs (never returns)
def run
-
+ $0 = "resque-scheduler: Starting"
# trap signals
register_signal_handlers
# Load the schedule into rufus
+ procline "Loading Schedule"
load_schedule!
# Now start the scheduling part of the loop.
loop do
handle_delayed_items
+ update_schedule if dynamic
poll_sleep
end
@@ -43,39 +53,53 @@ def register_signal_handlers
begin
trap('QUIT') { shutdown }
trap('USR1') { kill_child }
+ trap('USR2') { reload_schedule! }
rescue ArgumentError
- warn "Signals QUIT and USR1 not supported."
+ warn "Signals QUIT and USR1 and USR2 not supported."
end
end
# Pulls the schedule from Resque.schedule and loads it into the
# rufus scheduler instance
def load_schedule!
log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?
-
+
+ @@scheduled_jobs = {}
+
Resque.schedule.each do |name, config|
- # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
- # required for the jobs to be scheduled. If rails_env is missing, the
- # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
- # to.
- if config['rails_env'].nil? || rails_env_matches?(config)
- log! "Scheduling #{name} "
- interval_defined = false
- interval_types = %w{cron every}
- interval_types.each do |interval_type|
- if !config[interval_type].nil? && config[interval_type].length > 0
- rufus_scheduler.send(interval_type, config[interval_type]) do
+ load_schedule_job(name, config)
+ end
+ Resque.redis.del(:schedules_changed)
+ procline "Schedules Loaded"
+ end
+
+ # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
+ def load_schedule_job(name, config)
+ # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
+ # required for the jobs to be scheduled. If rails_env is missing, the
+ # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
+ # to.
+ if config['rails_env'].nil? || rails_env_matches?(config)
+ log! "Scheduling #{name} "
+ interval_defined = false
+ interval_types = %w{cron every}
+ interval_types.each do |interval_type|
+ if !config[interval_type].nil? && config[interval_type].length > 0
+ begin
+ @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do
log! "queueing #{config['class']} (#{name})"
enqueue_from_config(config)
end
- interval_defined = true
- break
+ rescue Exception => e
+ log! "#{e.class.name}: #{e.message}"
end
- end
- unless interval_defined
- log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
+ interval_defined = true
+ break
end
end
+ unless interval_defined
+ log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
+ end
end
end
@@ -88,13 +112,14 @@ def rails_env_matches?(config)
# Handles queueing delayed items
# at_time - Time to start scheduling items (default: now).
def handle_delayed_items(at_time=nil)
- timestamp = nil
- begin
- if timestamp = Resque.next_delayed_timestamp(at_time)
+ item = nil
+ if timestamp = Resque.next_delayed_timestamp(at_time)
+ procline "Processing Delayed Items"
+ while !timestamp.nil?
enqueue_delayed_items_for_timestamp(timestamp)
+ timestamp = Resque.next_delayed_timestamp(at_time)
end
- # continue processing until there are no more ready timestamps
- end while !timestamp.nil?
+ end
end
# Enqueues all delayed jobs for a timestamp
@@ -148,8 +173,40 @@ def rufus_scheduler
def clear_schedule!
rufus_scheduler.stop
@rufus_scheduler = nil
+ @@scheduled_jobs = {}
rufus_scheduler
end
+
+ def reload_schedule!
+ procline "Reloading Schedule"
+ clear_schedule!
+ Resque.reload_schedule!
+ load_schedule!
+ end
+
+ def update_schedule
+ if Resque.redis.scard(:schedules_changed) > 0
+ procline "Updating schedule"
+ Resque.reload_schedule!
+ while schedule_name = Resque.redis.spop(:schedules_changed)
+ if Resque.schedule.keys.include?(schedule_name)
+ unschedule_job(schedule_name)
+ load_schedule_job(schedule_name, Resque.schedule[schedule_name])
+ else
+ unschedule_job(schedule_name)
+ end
+ end
+ end
+ procline "Schedules Loaded"
+ end
+
+ def unschedule_job(name)
+ if scheduled_jobs[name]
+ log "Removing schedule #{name}"
+ scheduled_jobs[name].unschedule
+ @@scheduled_jobs.delete(name)
+ end
+ end
# Sleeps and returns true
def poll_sleep
@@ -173,6 +230,11 @@ def log(msg)
# add "verbose" logic later
log!(msg) if verbose
end
+
+ def procline(string)
+ $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
+ log! $0
+ end
end
View
@@ -28,13 +28,57 @@ module ResqueScheduler
# an array, each element in the array is passed as a separate param,
# otherwise params is passed in as the only parameter to perform.
def schedule=(schedule_hash)
+ if Resque::Scheduler.dynamic
+ schedule_hash.each do |name, job_spec|
+ set_schedule(name, job_spec)
+ end
+ end
@schedule = schedule_hash
end
# Returns the schedule hash
def schedule
@schedule ||= {}
end
+
+ # reloads the schedule from redis
+ def reload_schedule!
+ @schedule = get_schedules
+ end
+
+ # gets the schedule as it exists in redis
+ def get_schedules
+ if redis.exists(:schedules)
+ redis.hgetall(:schedules).tap do |h|
+ h.each do |name, config|
+ h[name] = decode(config)
+ end
+ end
+ else
+ nil
+ end
+ end
+
+ # create or update a schedule with the provided name and configuration
+ def set_schedule(name, config)
+ existing_config = get_schedule(name)
+ unless existing_config && existing_config == config
+ redis.hset(:schedules, name, encode(config))
+ redis.sadd(:schedules_changed, name)
+ end
+ config
+ end
+
+ # retrive the schedule configuration for the given name
+ def get_schedule(name)
+ decode(redis.hget(:schedules, name))
+ end
+
+ # remove a given schedule by name
+ def remove_schedule(name)
+ redis.hdel(:schedules, name)
+ redis.sadd(:schedules_changed, name)
+ end
# This method is nearly identical to +enqueue+ only it also
# takes a timestamp which will be used to schedule the job
@@ -19,6 +19,7 @@ def queue_from_class_name(class_name)
end
get "/schedule" do
+ Resque.reload_schedule! if Resque::Scheduler.dynamic
# Is there a better way to specify alternate template locations with sinatra?
erb File.read(File.join(File.dirname(__FILE__), 'server/views/scheduler.erb'))
end
Oops, something went wrong.

0 comments on commit 08ff38c

Please sign in to comment.