Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a scheduler locking mechanism #165

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 38 additions & 3 deletions lib/resque/scheduler.rb
@@ -1,5 +1,6 @@
require 'rufus/scheduler'
require 'thwait'
require 'socket'

module Resque

Expand All @@ -8,7 +9,6 @@ class Scheduler
extend Resque::Helpers

class << self

# If true, logs more stuff...
attr_accessor :verbose

Expand All @@ -31,6 +31,38 @@ def poll_sleep_amount
@poll_sleep_amount ||= 5 # seconds
end

def hostname
Socket.gethostbyname(Socket.gethostname).first
end

def process_id
Process.pid
end

def identifier
[hostname, process_id].join ':'
end

def lock_timeout
60
end

def lock_key
'schedule:lock'
end

def has_lock?
Resque.redis.get(lock_key) == identifier
end

def try_lock?
Resque.redis.setnx lock_key, identifier
end

def update_lock_expiry
Resque.redis.expire lock_key, lock_timeout
end

# Schedule all jobs and continually look for delayed jobs (never returns)
def run
$0 = "resque-scheduler: Starting"
Expand All @@ -48,8 +80,11 @@ def run
# Now start the scheduling part of the loop.
loop do
begin
handle_delayed_items
update_schedule if dynamic
if has_lock? || try_lock?
update_lock_expiry
handle_delayed_items
update_schedule if dynamic
end
rescue Errno::EAGAIN, Errno::ECONNRESET => e
warn e.message
end
Expand Down