Permalink
Browse files

Merge pull request #177 from dbalatero/resilent_2.4_locking

Resilient 2.4 locking
  • Loading branch information...
bvandenbos committed Aug 24, 2012
2 parents cb65088 + 6d955b6 commit 787787c1cf9c3cffb50bde8403676874a02f8572
@@ -0,0 +1,3 @@
+%w[base basic resilient].each do |file|
+ require "resque/scheduler/lock/#{file}"
+end
@@ -0,0 +1,53 @@
+module Resque
+ class Scheduler
+ module Lock
+ class Base
+ attr_reader :key
+ attr_accessor :timeout
+
+ def initialize(key, options = {})
+ @key = key
+
+ # 3 minute default timeout
+ @timeout = options[:timeout] || 60 * 3
+ end
+
+ # Attempts to acquire the lock. Returns true if successfully acquired.
+ def acquire!
+ raise NotImplementedError
+ end
+
+ def value
+ @value ||= [hostname, process_id].join(':')
+ end
+
+ # Returns true if you currently hold the lock.
+ def locked?
+ raise NotImplementedError
+ end
+
+ # Releases the lock.
+ def release!
+ puts "releasing #{key}"
+ Resque.redis.del(key) == 1
+ end
+
+ private
+
+ # Extends the lock by `timeout` seconds.
+ def extend_lock!
+ Resque.redis.expire(key, timeout)
+ end
+
+ def hostname
+ local_hostname = Socket.gethostname
+ Socket.gethostbyname(local_hostname).first rescue local_hostname
+ end
+
+ def process_id
+ Process.pid
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,28 @@
+require 'resque/scheduler/lock/base'
+
+module Resque
+ class Scheduler
+ module Lock
+ class Basic < Base
+ def acquire!
+ if Resque.redis.setnx(key, value)
+ extend_lock!
+ true
+ end
+ end
+
+ def locked?
+ if Resque.redis.get(key) == value
+ extend_lock!
+
+ if Resque.redis.get(key) == value
+ return true
+ end
+ end
+
+ false
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,69 @@
+require 'resque/scheduler/lock/base'
+
+module Resque
+ class Scheduler
+ module Lock
+ class Resilient < Base
+ def acquire!
+ Resque.redis.evalsha(
+ acquire_sha,
+ :keys => [key],
+ :argv => [value]
+ ).to_i == 1
+ end
+
+ def locked?
+ Resque.redis.evalsha(
+ locked_sha,
+ :keys => [key],
+ :argv => [value]
+ ).to_i == 1
+ end
+
+ private
+
+ def locked_sha(refresh = false)
+ @locked_sha = nil if refresh
+
+ @locked_sha ||= begin
+ Resque.redis.script(
+ :load,
+ <<-EOF
+if redis.call('GET', KEYS[1]) == ARGV[1]
+then
+ redis.call('EXPIRE', KEYS[1], #{timeout})
+
+ if redis.call('GET', KEYS[1]) == ARGV[1]
+ then
+ return 1
+ end
+end
+
+return 0
+EOF
+ )
+ end
+ end
+
+ def acquire_sha(refresh = false)
+ @acquire_sha = nil if refresh
+
+ @acquire_sha ||= begin
+ Resque.redis.script(
+ :load,
+ <<-EOF
+if redis.call('SETNX', KEYS[1], ARGV[1]) == 1
+then
+ redis.call('EXPIRE', KEYS[1], #{timeout})
+ return 1
+else
+ return 0
+end
+EOF
+ )
+ end
+ end
+ end
+ end
+ end
+end
@@ -2,13 +2,13 @@
# ### Locking the scheduler process
#
# There are two places in resque-scheduler that need to be synchonized
-# in order to be able to run redundant scheduler processes while ensuring jobs don't
+# in order to be able to run redundant scheduler processes while ensuring jobs don't
# get queued multiple times when the master process changes.
-#
+#
# 1) Processing the delayed queues (jobs that are created from enqueue_at/enqueue_in, etc)
# 2) Processing the scheduled (cron-like) jobs from rufus-scheduler
#
-# Protecting the delayed queues (#1) is relatively easy. A simple SETNX in
+# Protecting the delayed queues (#1) is relatively easy. A simple SETNX in
# redis would suffice. However, protecting the scheduled jobs is trickier
# because the clocks on machines could be slightly off or actual firing times
# could vary slightly due to load. If scheduler A's clock is slightly ahead
@@ -49,67 +49,42 @@
# like cron - if you stop cron, no jobs fire while it's stopped and it doesn't
# fire jobs that were missed when it starts up again.
-module Resque
+require 'resque/scheduler/lock'
+module Resque
module SchedulerLocking
-
- # The TTL (in seconds) for the master lock
- def lock_timeout=(v)
- @lock_timeout = v
+ def master_lock
+ @master_lock ||= build_master_lock
end
- def lock_timeout
- @lock_timeout ||= 60 * 3 # 3 minutes
+ def supports_lua?
+ redis_master_version >= 2.5
end
- def hostname
- Socket.gethostbyname(Socket.gethostname).first
+ def is_master?
+ master_lock.acquire! || master_lock.locked?
end
- def process_id
- Process.pid
+ def release_master_lock!
+ master_lock.release!
end
- def is_master?
- acquire_master_lock! || has_master_lock?
- end
+ private
- def master_lock_value
- [hostname, process_id].join(':')
+ def build_master_lock
+ if supports_lua?
+ Resque::Scheduler::Lock::Resilient.new(master_lock_key)
+ else
+ Resque::Scheduler::Lock::Basic.new(master_lock_key)
+ end
end
def master_lock_key
:resque_scheduler_master_lock
end
- def extend_lock!
- # If the master fails to checkin for 3 minutes, the lock is released and is up for grabs
- Resque.redis.expire(master_lock_key, lock_timeout)
- end
-
- def release_master_lock!
- Resque.redis.del(master_lock_key)
- end
-
- def acquire_master_lock!
- if Resque.redis.setnx(master_lock_key, master_lock_value)
- extend_lock!
- true
- end
- end
-
- def has_master_lock?
- if Resque.redis.get(master_lock_key) == master_lock_value
- extend_lock!
- # Since this process could lose the lock between checking
- # if it has it and extending the lock, check again to make
- # sure it still has it.
- if Resque.redis.get(master_lock_key) == master_lock_value
- true
- end
- end
+ def redis_master_version
+ Resque.redis.info['redis_version'].to_f
end
-
end
-
-end
+end
View
@@ -1,115 +0,0 @@
-# Redis configuration file example
-
-# By default Redis does not run as a daemon. Use 'yes' if you need it.
-# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
-daemonize yes
-
-# When run as a daemon, Redis write a pid file in /var/run/redis.pid by default.
-# You can specify a custom pid file location here.
-pidfile ./test/redis-test.pid
-
-# Accept connections on the specified port, default is 6379
-port 9736
-
-# If you want you can bind a single interface, if the bind option is not
-# specified all the interfaces will listen for connections.
-#
-# bind 127.0.0.1
-
-# Close the connection after a client is idle for N seconds (0 to disable)
-timeout 300
-
-# Save the DB on disk:
-#
-# save <seconds> <changes>
-#
-# Will save the DB if both the given number of seconds and the given
-# number of write operations against the DB occurred.
-#
-# In the example below the behaviour will be to save:
-# after 900 sec (15 min) if at least 1 key changed
-# after 300 sec (5 min) if at least 10 keys changed
-# after 60 sec if at least 10000 keys changed
-save 900 1
-save 300 10
-save 60 10000
-
-# The filename where to dump the DB
-dbfilename dump.rdb
-
-# For default save/load DB in/from the working directory
-# Note that you must specify a directory not a file name.
-dir ./test/
-
-# Set server verbosity to 'debug'
-# it can be one of:
-# debug (a lot of information, useful for development/testing)
-# notice (moderately verbose, what you want in production probably)
-# warning (only very important / critical messages are logged)
-loglevel debug
-
-# Specify the log file name. Also 'stdout' can be used to force
-# the demon to log on the standard output. Note that if you use standard
-# output for logging but daemonize, logs will be sent to /dev/null
-logfile stdout
-
-# Set the number of databases. The default database is DB 0, you can select
-# a different one on a per-connection basis using SELECT <dbid> where
-# dbid is a number between 0 and 'databases'-1
-databases 16
-
-################################# REPLICATION #################################
-
-# Master-Slave replication. Use slaveof to make a Redis instance a copy of
-# another Redis server. Note that the configuration is local to the slave
-# so for example it is possible to configure the slave to save the DB with a
-# different interval, or to listen to another port, and so on.
-
-# slaveof <masterip> <masterport>
-
-################################## SECURITY ###################################
-
-# Require clients to issue AUTH <PASSWORD> before processing any other
-# commands. This might be useful in environments in which you do not trust
-# others with access to the host running redis-server.
-#
-# This should stay commented out for backward compatibility and because most
-# people do not need auth (e.g. they run their own servers).
-
-# requirepass foobared
-
-################################### LIMITS ####################################
-
-# Set the max number of connected clients at the same time. By default there
-# is no limit, and it's up to the number of file descriptors the Redis process
-# is able to open. The special value '0' means no limts.
-# Once the limit is reached Redis will close all the new connections sending
-# an error 'max number of clients reached'.
-
-# maxclients 128
-
-# Don't use more memory than the specified amount of bytes.
-# When the memory limit is reached Redis will try to remove keys with an
-# EXPIRE set. It will try to start freeing keys that are going to expire
-# in little time and preserve keys with a longer time to live.
-# Redis will also try to remove objects from free lists if possible.
-#
-# If all this fails, Redis will start to reply with errors to commands
-# that will use more memory, like SET, LPUSH, and so on, and will continue
-# to reply to most read-only commands like GET.
-#
-# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
-# 'state' server or cache, not as a real DB. When Redis is used as a real
-# database the memory usage will grow over the weeks, it will be obvious if
-# it is going to use too much memory in the long run, and you'll have the time
-# to upgrade. With maxmemory after the limit is reached you'll start to get
-# errors for write operations, and this may even lead to DB inconsistency.
-
-# maxmemory <bytes>
-
-############################### ADVANCED CONFIG ###############################
-
-# Glue small output buffers together in order to send small replies in a
-# single TCP packet. Uses a bit more CPU but most of the times it is a win
-# in terms of number of queries per second. Use 'yes' if unsure.
-glueoutputbuf yes
Oops, something went wrong.

0 comments on commit 787787c

Please sign in to comment.