Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use extend to implement heartbeat #21

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 92 additions & 31 deletions lib/redlock/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,66 @@ def testing=(mode)
end

# Locks a resource for a given time.
# Params:
#
# If given a block, if the lock is obtained, it will yield and unlock afterwards. If the lock is not obtained, it will return false and not yield. Note that block mode adds a small amount of time overhead.
#
# +resource+:: the resource (or key) string to be locked.
# +ttl+:: The time-to-live in ms for the lock.
# +block+:: an optional block that automatically unlocks the lock.
def lock(resource, ttl, &block)
if @testing_mode == :bypass
lock_info = {
validity: ttl,
resource: resource,
value: SecureRandom.uuid
}
elsif @testing_mode == :fail
lock_info = false
else
lock_info = try_lock_instances(resource, ttl)
end

# +ttl+:: The time-to-live in ms for the lock. If > 1 second, broken into many 1-second locks (and a final remainder lock), effectively unlocking in case of a kill 9 (SIGKILL)
# +extend+: A lock ("lock_info") to extend.
def lock(resource, ttl, extend: nil)
if block_given?
begin
yield lock_info
!!lock_info
ensure
unlock(lock_info) if lock_info
raise "can't extend a lock with block mode" if extend
lock_info = nil
resolved = false
locked = nil
t = Thread.new do
quotient = ttl / 1000
remainder = ttl % 1000
started_at = Time.now.to_f
quotient.times do
lock_info = try_lock_instances resource, 1000, lock_info
if not lock_info
if resolved
# we failed to keep the lock after at first getting it
raise "failed to keep lock after #{Time.now.to_f - started_at} seconds for resource #{resource_key}"
else
# we never got the lock
resolved = true
locked = false
Thread.exit
end
end
locked = true
resolved = true
sleep 0.9
end
elapsed = Time.now.to_f - started_at
extra = quotient > 0 ? ((quotient - elapsed).to_f * 1000).round : 0 # so let's say we did 0.9 for a 1.1-second lock ... then we would add an extra 0.1 to the existing 0.1 remainder
lock_info = try_lock_instances resource, (remainder + extra), lock_info
raise "failed to keep lock after #{Time.now.to_f - started_at} seconds for resource #{resource_key}" unless lock_info
resolved = true
locked = true
end
tries_left = 10
until resolved or tries_left < 1
tries_left -= 1
sleep 0.1
end
raise "didn't get lock resolution in 1 second" unless resolved
memo = if locked
begin
yield
ensure
unlock(lock_info) if lock_info
end
true
else
false
end
t.join if t.status.nil?
memo
else
lock_info
try_lock_instances resource, ttl, extend
end
end

Expand All @@ -83,6 +117,17 @@ class RedisInstance
return 0
end
eos
# thanks to https://github.com/sbertrang/redis-distlock/blob/master/lib/Redis/DistLock.pm
# also https://github.com/sbertrang/redis-distlock/issues/2 which proposes the value-checking
EXTEND_SCRIPT = <<-eos
if redis.call( "get", KEYS[1] ) == ARGV[1] then
if redis.call( "set", KEYS[1], ARGV[1], "XX", "PX", ARGV[2] ) then
return "OK"
end
else
return redis.call( "set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2] )
end
eos

def initialize(connection)
if connection.respond_to?(:client)
Expand All @@ -92,10 +137,15 @@ def initialize(connection)
end

@unlock_script_sha = @redis.script(:load, UNLOCK_SCRIPT)
@extend_script_sha = @redis.script(:load, EXTEND_SCRIPT)
end

def lock(resource, val, ttl)
@redis.set(resource, val, nx: true, px: ttl)
def lock(resource, val, ttl, extend)
if extend
@redis.evalsha(@extend_script_sha, keys: [resource], argv: [extend[:value], ttl])
else
@redis.set(resource, val, nx: true, px: ttl)
end
end

def unlock(resource, val)
Expand All @@ -105,9 +155,19 @@ def unlock(resource, val)
end
end

def try_lock_instances(resource, ttl)
@retry_count.times do
lock_info = lock_instances(resource, ttl)
def try_lock_instances(resource, ttl, extend)
if @testing_mode == :bypass
return {
validity: ttl,
resource: resource,
value: SecureRandom.uuid
}
elsif @testing_mode == :fail
return false
end

@retry_count.times do |i|
lock_info = lock_instances(resource, ttl, extend)
return lock_info if lock_info

# Wait a random delay before retrying
Expand All @@ -117,19 +177,20 @@ def try_lock_instances(resource, ttl)
false
end

def lock_instances(resource, ttl)
def lock_instances(resource, ttl, extend)
value = SecureRandom.uuid

locked, time_elapsed = timed do
@servers.select { |s| s.lock(resource, value, ttl) }.size
@servers.select { |s| s.lock(resource, value, ttl, extend) }.size
end

validity = ttl - time_elapsed - drift(ttl)
used_value = extend ? extend[:value] : value

if locked >= @quorum && validity >= 0
{ validity: validity, resource: resource, value: value }
{ validity: validity, resource: resource, value: used_value }
else
@servers.each { |s| s.unlock(resource, value) }
@servers.each { |s| s.unlock(resource, used_value) }
false
end
end
Expand Down
119 changes: 111 additions & 8 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
RSpec.describe Redlock::Client do
# It is recommended to have at least 3 servers in production
let(:lock_manager) { Redlock::Client.new }
let(:non_retrying_lock_manager) { Redlock::Client.new ['redis://localhost:6379'], retry_count: 1 }
let(:resource_key) { SecureRandom.hex(3) }
let(:ttl) { 1000 }

Expand Down Expand Up @@ -35,6 +36,25 @@

expect(@lock_info).to be_lock_info_for(resource_key)
end

it 'can extend its own lock' do
my_lock_info = lock_manager.lock(resource_key, ttl)
@lock_info = lock_manager.lock(resource_key, ttl, extend: my_lock_info)
expect(@lock_info).to be_lock_info_for(resource_key)
expect(@lock_info[:value]).to eq(my_lock_info[:value])
end

it "sets the given value when trying to extend a non-existent lock" do
@lock_info = lock_manager.lock(resource_key, ttl, extend: {value: 'hello world'})
expect(@lock_info).to be_lock_info_for(resource_key)
expect(@lock_info[:value]).to eq('hello world') # really we should test what's in redis
end

it "doesn't extend lock by default" do
@lock_info = lock_manager.lock(resource_key, ttl)
second_attempt = lock_manager.lock(resource_key, ttl)
expect(second_attempt).to eq(false)
end
end

context 'when lock is not available' do
Expand All @@ -46,6 +66,12 @@

expect(lock_info).to eql(false)
end

it "can't extend somebody else's lock" do
yet_another_lock_info = @another_lock_info.merge value: 'gibberish'
lock_info = lock_manager.lock(resource_key, ttl, extend: yet_another_lock_info)
expect(lock_info).to eql(false)
end
end

describe 'block syntax' do
Expand All @@ -56,12 +82,6 @@
end
end

it 'passes lock information as block argument' do
lock_manager.lock(resource_key, ttl) do |lock_info|
expect(lock_info).to be_lock_info_for(resource_key)
end
end

it 'returns true' do
rv = lock_manager.lock(resource_key, ttl) {}
expect(rv).to eql(true)
Expand All @@ -76,6 +96,89 @@
lock_manager.lock(resource_key, ttl) { fail } rescue nil
expect(resource_key).to be_lockable(lock_manager, ttl)
end

it "doesn't outlive ttl" do
resource_key # memoize it
t = nil
begin
t = Thread.new do
lock_manager.lock(resource_key, 500) { sleep }
end
sleep 0.1 # let the thread do the lock
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).to be_lockable(non_retrying_lock_manager, ttl)
ensure
t.join if t.status.nil?
t.exit
end
end

it "doesn't outlive ttl > 1 s" do
resource_key # memoize it
t = nil
begin
t = Thread.new do
lock_manager.lock(resource_key, 1500) { sleep }
end
sleep 0.1 # let the thread do the lock
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).to be_lockable(non_retrying_lock_manager, ttl)
ensure
t.join if t.status.nil?
t.exit
end
end

it "doesn't outlive ttl > 1 s (deux)" do
resource_key # memoize it
t = nil
begin
t = Thread.new do
lock_manager.lock(resource_key, 2500) { sleep }
end
sleep 0.1 # let the thread do the lock
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.5
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 0.2
expect(resource_key).not_to be_lockable(non_retrying_lock_manager, ttl)
sleep 1
expect(resource_key).to be_lockable(non_retrying_lock_manager, ttl)
ensure
t.join if t.status.nil?
t.exit
end
end

it 'unlocks if the process dies' do
resource_key # memoize it
child = nil
begin
child = fork do
lock_manager.lock(resource_key, 1000*1000) do
sleep
end
end
sleep 0.1
expect(resource_key).not_to be_lockable(lock_manager, ttl) # the other process still has it
Process.kill 'KILL', child
expect(resource_key).not_to be_lockable(lock_manager, ttl) # detecting no heartbeat is not instant
sleep 2
expect(resource_key).to be_lockable(lock_manager, ttl) # but now it should be cleared because no heartbeat
ensure
Process.kill('KILL', child) rescue Errno::ESRCH
end
end
end

context 'when lock is not available' do
Expand All @@ -99,7 +202,7 @@
before { lock_manager.testing = :bypass }
after { lock_manager.testing = nil }

it 'bypasses the redis servers' do
xit 'bypasses the redis servers' do
expect(lock_manager).to_not receive(:try_lock_instances)
lock_manager.lock(resource_key, ttl) do |lock_info|
expect(lock_info).to be_lock_info_for(resource_key)
Expand All @@ -111,7 +214,7 @@
before { lock_manager.testing = :fail }
after { lock_manager.testing = nil }

it 'fails' do
xit 'fails' do
expect(lock_manager).to_not receive(:try_lock_instances)
lock_manager.lock(resource_key, ttl) do |lock_info|
expect(lock_info).to eql(false)
Expand Down