Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'better_locking'

  • Loading branch information...
commit 6ffdcc60b04d12589abe9913b349ae52e4e7fa10 2 parents c45570a + 07b1f0a
@mloughran authored
View
118 lib/em-hiredis/lock.rb
@@ -1,35 +1,52 @@
+require 'securerandom'
+
module EM::Hiredis
- # Distributed lock built on redis
+ # Cross-process re-entrant lock, backed by redis
class Lock
- # Register an callback which will be called 1s before the lock expires
+
+ EM::Hiredis::Client.load_scripts_from(File.expand_path("../lock_lua", __FILE__))
+
+ # Register a callback which will be called 1s before the lock expires
+ # This is an informational callback, there is no hard guarantee on the timing
+ # of its invocation because the callback firing and lock key expiry are handled
+ # by different clocks (the client process and redis server respectively)
def onexpire(&blk); @onexpire = blk; end
def initialize(redis, key, timeout)
+ unless timeout.kind_of?(Fixnum) && timeout >= 1
+ raise "Timeout must be an integer and >= 1s"
+ end
@redis, @key, @timeout = redis, key, timeout
- @locked = false
- @expiry = nil
+ @token = SecureRandom.hex
end
# Acquire the lock
#
- # It is ok to call acquire again before the lock expires, which will attempt to extend the existing lock.
+ # This is a re-entrant lock, re-acquiring will succeed and extend the timeout
#
- # Returns a deferrable which either succeeds if the lock can be acquired, or fails if it cannot. In both cases the expiry timestamp is returned (for the new lock or for the expired one respectively)
+ # Returns a deferrable which either succeeds if the lock can be acquired, or fails if it cannot.
def acquire
df = EM::DefaultDeferrable.new
- expiry = new_expiry
- @redis.setnx(@key, expiry).callback { |setnx|
- if setnx == 1
- lock_acquired(expiry)
- EM::Hiredis.logger.debug "#{to_s} Acquired new lock"
- df.succeed(expiry)
+ @redis.lock_acquire([@key], [@token, @timeout]).callback { |success|
+ if (success)
+ EM::Hiredis.logger.debug "#{to_s} acquired"
+
+ EM.cancel_timer(@expire_timer) if @expire_timer
+ @expire_timer = EM.add_timer(@timeout - 1) {
+ EM::Hiredis.logger.debug "#{to_s} Expires in 1s"
+ @onexpire.call if @onexpire
+ }
+
+ df.succeed
else
- attempt_to_acquire_existing_lock(df)
+ EM::Hiredis.logger.debug "#{to_s} failed to acquire"
+ df.fail("Lock is not available")
end
}.errback { |e|
+ EM::Hiredis.logger.error "#{to_s} Error acquiring lock #{e}"
df.fail(e)
}
- return df
+ df
end
# Release the lock
@@ -37,23 +54,29 @@ def acquire
# Returns a deferrable
def unlock
EM.cancel_timer(@expire_timer) if @expire_timer
-
- unless active
- df = EM::DefaultDeferrable.new
- df.fail Error.new("Cannot unlock, lock not active")
- return df
- end
- @redis.del(@key)
- end
-
- # Lock has been acquired and we're within it's expiry time
- def active
- @locked && Time.now.to_i < @expiry
+ df = EM::DefaultDeferrable.new
+ @redis.lock_release([@key], [@token]).callback { |keys_removed|
+ if keys_removed > 0
+ EM::Hiredis.logger.debug "#{to_s} released"
+ df.succeed
+ else
+ EM::Hiredis.logger.debug "#{to_s} could not release, not held"
+ df.fail("Cannot release a lock we do not hold")
+ end
+ }.errback { |e|
+ EM::Hiredis.logger.error "#{to_s} Error releasing lock #{e}"
+ df.fail(e)
+ }
+ df
end
- # This should not be used in normal operation - force clear
+ # This should not be used in normal operation.
+ # Force clear without regard to who owns the lock.
def clear
+ EM::Hiredis.logger.warn "#{to_s} Force clearing lock (unsafe)"
+ EM.cancel_timer(@expire_timer) if @expire_timer
+
@redis.del(@key)
end
@@ -61,46 +84,5 @@ def to_s
"[lock #{@key}]"
end
- private
-
- def attempt_to_acquire_existing_lock(df)
- @redis.get(@key) { |expiry_1|
- expiry_1 = expiry_1.to_i
- if expiry_1 == @expiry || expiry_1 < Time.now.to_i
- # Either the lock was ours or the lock has already expired
- expiry = new_expiry
- @redis.getset(@key, expiry) { |expiry_2|
- expiry_2 = expiry_2.to_i
- if expiry_2 == @expiry || expiry_2 < Time.now.to_i
- lock_acquired(expiry)
- EM::Hiredis.logger.debug "#{to_s} Acquired existing lock"
- df.succeed(expiry)
- else
- # Another client got there first
- EM::Hiredis.logger.debug "#{to_s} Could not acquire - another process acquired while we were in the process of acquiring"
- df.fail(expiry_2)
- end
- }
- else
- # Someone else has an active lock
- EM::Hiredis.logger.debug "#{to_s} Could not acquire - held by another process"
- df.fail(expiry_1)
- end
- }
- end
-
- def new_expiry
- Time.now.to_i + @timeout + 1
- end
-
- def lock_acquired(expiry)
- @locked = true
- @expiry = expiry
- EM.cancel_timer(@expire_timer) if @expire_timer
- @expire_timer = EM.add_timer(@timeout) {
- EM::Hiredis.logger.debug "#{to_s} Expires in 1s"
- @onexpire.call if @onexpire
- }
- end
end
end
View
17 lib/em-hiredis/lock_lua/lock_acquire.lua
@@ -0,0 +1,17 @@
+-- Set key to token with expiry of timeout, if:
+-- - It doesn't exist
+-- - It exists and already has value of token (further set extends timeout)
+-- Used to implement a re-entrant lock.
+local key = KEYS[1]
+local token = ARGV[1]
+local timeout = ARGV[2]
+
+local value = redis.call('get', key)
+
+if value == token or not value then
+ -- Great, either we hold the lock or it's free for us to take
+ return redis.call('setex', key, timeout, token)
+else
+ -- Someone else has it
+ return false
+end
View
9 lib/em-hiredis/lock_lua/lock_release.lua
@@ -0,0 +1,9 @@
+-- Deletes a key only if it has the value supplied as token
+local key = KEYS[1]
+local token = ARGV[1]
+
+if redis.call('get', key) == token then
+ return redis.call('del', key)
+else
+ return 0
+end
View
14 lib/em-hiredis/persistent_lock.rb
@@ -21,11 +21,8 @@ def initialize(redis, key, options = {})
@redis, @key = redis, key
@timeout = options[:lock_timeout] || 100
@retry_timeout = options[:retry_interval] || 60
+
@lock = EM::Hiredis::Lock.new(redis, key, @timeout)
- @lock.onexpire {
- # When the lock is about to expire, extend (called 1s before expiry)
- acquire()
- }
@locked = false
EM.next_tick {
@running = true
@@ -42,6 +39,11 @@ def acquire
@onlocked.call if @onlocked
@locked = true
end
+
+ # Re-acquire lock near the end of the period
+ @extend_timer = EM.add_timer(@timeout.to_f * 2 / 3) {
+ acquire()
+ }
}.errback { |e|
if @locked
# We were previously locked
@@ -54,7 +56,7 @@ def acquire
EM::Hiredis.logger.warn "Unexpected error acquiring #{@lock} #{err}"
end
- EM.add_timer(@retry_timeout) {
+ @retry_timer = EM.add_timer(@retry_timeout) {
acquire() unless @locked
}
}
@@ -62,6 +64,8 @@ def acquire
def stop
@running = false
+ EM.cancel_timer(@extend_timer) if @extend_timer
+ EM.cancel_timer(@retry_timer) if @retry_timer
if @locked
# We were previously locked
@onunlocked.call if @onunlocked
View
137 spec/lock_spec.rb
@@ -0,0 +1,137 @@
+require 'spec_helper'
+
+describe EventMachine::Hiredis::Lock do
+
+ def start(timeout = 1)
+ connect(timeout) do |redis|
+ @redis = redis
+ yield
+ end
+ end
+
+ def new_lock
+ EventMachine::Hiredis::Lock.new(@redis, "test-lock", 2)
+ end
+
+ it "can be acquired" do
+ start {
+ new_lock.acquire.callback {
+ done
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "is re-entrant" do
+ start {
+ lock = new_lock
+ lock.acquire.callback {
+ lock.acquire.callback {
+ done
+ }.errback { |e|
+ fail e
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "is exclusive" do
+ start {
+ new_lock.acquire.callback {
+ new_lock.acquire.errback {
+ done
+ }.callback {
+ fail "Should not be able to acquire lock from different client"
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "can be released and taken by another instance" do
+ start {
+ lock = new_lock
+ lock.acquire.callback {
+ lock.unlock.callback {
+ new_lock.acquire.callback {
+ done
+ }.errback { |e|
+ fail e
+ }
+ }.errback { |e|
+ fail e
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "times out" do
+ start(3) {
+ new_lock.acquire.callback {
+ EM.add_timer(2) {
+ new_lock.acquire.callback {
+ done
+ }.errback { |e|
+ fail e
+ }
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "extends timeout on re-entry" do
+ start(4) {
+ lock = new_lock
+ lock.acquire.callback {
+ EM.add_timer(1) {
+ lock.acquire.callback {
+ EM.add_timer(1.5) {
+ # Check it's still locked by initial instance
+ new_lock.acquire.errback {
+ done
+ }.callback { |e|
+ fail e
+ }
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+
+ it "fails to release if it has not been taken" do
+ start {
+ new_lock.unlock.errback {
+ done
+ }.callback {
+ fail "Released lock which had not been taken"
+ }
+ }
+ end
+
+ it "fails to release if taken by another instance" do
+ start {
+ new_lock.acquire.callback {
+ new_lock.unlock.errback {
+ done
+ }.callback {
+ fail "Released lock belonging to another instance"
+ }
+ }.errback { |e|
+ fail e
+ }
+ }
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.