Skip to content
Browse files

call it a 'handle' instead of a lock.

  • Loading branch information...
1 parent 214ede2 commit ced9e7cca662360494383d3bbf0cd875b0561651 @jakedouglas committed
Showing with 132 additions and 129 deletions.
  1. +8 −8 README.rdoc
  2. +99 −96 lib/cassandra_lock.rb
  3. +25 −25 spec/cassandra_lock_spec.rb
View
16 README.rdoc
@@ -23,23 +23,23 @@ It supports any number of locks because they are just Cassandra rows.
# Initialize a lock called "foo" with a maximum of 10 workers
CassandraLock.setup_lock("foo", 10)
- # Create a corresponding lock object with worker id 1 (ids start at 1, not 0)
- lock = CassandraLock.new("foo", 1)
+ # Create a corresponding lock handle with worker id 1 (ids start at 1, not 0)
+ handle = CassandraLock::Handle.new("foo", 1)
- # Normal lock/unlock
- lock.lock
+ # Normal lock/unlock (waits if the lock is not available)
+ handle.lock
...stuff...
- lock.unlock
+ handle.unlock
# Acquire, yield, and release
- lock.synchronize {
+ handle.synchronize {
...stuff...
}
# Only do something if the lock is immediately available
- if lock.try_lock
+ if handle.try_lock
...stuff...
- lock.unlock
+ handle.unlock
else
raise "couldn't acquire lock!"
end
View
195 lib/cassandra_lock.rb
@@ -74,138 +74,141 @@ def client
end
end
- def initialize(lock_id, my_worker_id, keyspace=nil)
- @keyspace = keyspace || self.class.keyspace
- @lock_id = lock_id
- @my_worker_id = my_worker_id
- @client = Cassandra.new(@keyspace)
+ class Handle
+ def initialize(lock_id, my_worker_id, keyspace=nil)
+ @keyspace = keyspace || CassandraLock.keyspace
+ @lock_id = lock_id
+ @my_worker_id = my_worker_id
+ @client = Cassandra.new(@keyspace)
- lock_data = get(@lock_id)
+ lock_data = get(@lock_id)
- unless lock_data[NUMBERS]
- raise "Lock '#{lock_id}' does not exist"
- end
+ unless lock_data[NUMBERS]
+ raise "Lock '#{lock_id}' does not exist"
+ end
- workers = lock_data[NUMBERS].keys.map{|n| n.to_i }
- range = (workers.min)..(workers.max)
+ workers = lock_data[NUMBERS].keys.map{|n| n.to_i }
+ range = (workers.min)..(workers.max)
- unless range.include?(my_worker_id)
- raise ArgumentError, "Worker ID #{my_worker_id} out of range. Valid range for this lock: #{range}"
+ unless range.include?(my_worker_id)
+ raise ArgumentError, "Worker ID #{my_worker_id} out of range. Valid range for this lock: #{range}"
+ end
end
- end
- def lock
- worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id)
- acquire(@lock_id, worker_count, my_number, @my_worker_id, true)
- end
+ def lock
+ worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id)
+ acquire(@lock_id, worker_count, my_number, @my_worker_id, true)
+ end
- def try_lock
- worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id, true)
+ def try_lock
+ worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id, true)
- # bail early if someone was already holding/waiting
- unless my_number
- return false
- end
+ # bail early if someone was already holding/waiting
+ unless my_number
+ return false
+ end
- lock_acquired = acquire(@lock_id, worker_count, my_number, @my_worker_id, false)
+ lock_acquired = acquire(@lock_id, worker_count, my_number, @my_worker_id, false)
- unless lock_acquired
- get_out_of_line
+ unless lock_acquired
+ get_out_of_line
+ end
+
+ lock_acquired
end
- lock_acquired
- end
+ def unlock
+ get_out_of_line
+ nil
+ end
- def unlock
- get_out_of_line
- nil
- end
+ def synchronize
+ unless block_given?
+ raise ArgumentError, "Block required"
+ end
- def synchronize
- unless block_given?
- raise ArgumentError, "Block required"
+ lock()
+ yield
+ ensure
+ unlock()
end
- lock()
- yield
- ensure
- unlock()
- end
-
- private
+ private
- def acquire(lock_id, worker_count, my_number, my_worker_id, should_wait)
- lock_data = get(lock_id)
+ def acquire(lock_id, worker_count, my_number, my_worker_id, should_wait)
+ lock_data = get(lock_id)
- (1..worker_count).each do |worker_id|
- while is_worker_choosing?(lock_data[CHOOSING], worker_id)
- if should_wait
- sleep SLEEP_DURATION
- lock_data = get(lock_id)
- else
- return false
+ (1..worker_count).each do |worker_id|
+ while is_worker_choosing?(lock_data[CHOOSING], worker_id)
+ if should_wait
+ sleep SLEEP_DURATION
+ lock_data = get(lock_id)
+ else
+ return false
+ end
end
- end
- while am_i_waiting_behind_worker?(lock_data[NUMBERS], worker_id, my_number, my_worker_id)
- if should_wait
- sleep SLEEP_DURATION
- lock_data = get(lock_id)
- else
- return false
+ while am_i_waiting_behind_worker?(lock_data[NUMBERS], worker_id, my_number, my_worker_id)
+ if should_wait
+ sleep SLEEP_DURATION
+ lock_data = get(lock_id)
+ else
+ return false
+ end
end
end
- end
- true
- end
+ true
+ end
- def get_worker_count_and_current_number(lock_id, worker_id, fail_fast=false)
- worker_id = worker_id.to_s
+ def get_worker_count_and_current_number(lock_id, worker_id, fail_fast=false)
+ worker_id = worker_id.to_s
- # indicate that we are in the process of picking a number
- set(lock_id, { CHOOSING => { worker_id => TRUE } })
+ # indicate that we are in the process of picking a number
+ set(lock_id, { CHOOSING => { worker_id => TRUE } })
- # get the current highest number, add 1, insert it as our number
- numbers = get(lock_id)[NUMBERS]
+ # get the current highest number, add 1, insert it as our number
+ numbers = get(lock_id)[NUMBERS]
- # for try_lock, just bail if anyone is holding/waiting
- if fail_fast
- if numbers.values.any?{|v| v != "0" }
- set(lock_id, { CHOOSING => { worker_id => FALSE } })
- return false
+ # for try_lock, just bail if anyone is holding/waiting
+ if fail_fast
+ if numbers.values.any?{|v| v != "0" }
+ set(lock_id, { CHOOSING => { worker_id => FALSE } })
+ return false
+ end
end
- end
- my_number = numbers.values.map{|n| n.to_i }.max + 1
- set(lock_id, { NUMBERS => { worker_id => my_number.to_s } })
+ my_number = numbers.values.map{|n| n.to_i }.max + 1
+ set(lock_id, { NUMBERS => { worker_id => my_number.to_s } })
- # indicate that we are done choosing our number
- set(lock_id, { CHOOSING => { worker_id => FALSE } })
+ # indicate that we are done choosing our number
+ set(lock_id, { CHOOSING => { worker_id => FALSE } })
- # return the total count of workers and our current number
- [numbers.size, my_number]
- end
+ # return the total count of workers and our current number
+ [numbers.size, my_number]
+ end
- def get_out_of_line
- set(@lock_id, { NUMBERS => { @my_worker_id.to_s => "0" } })
- end
+ def get_out_of_line
+ set(@lock_id, { NUMBERS => { @my_worker_id.to_s => "0" } })
+ end
- def is_worker_choosing?(choosing, worker_id)
- choosing[worker_id.to_s] == TRUE
- end
+ def is_worker_choosing?(choosing, worker_id)
+ choosing[worker_id.to_s] == TRUE
+ end
- def am_i_waiting_behind_worker?(numbers, his_worker_id, my_number, my_worker_id)
- his_number = numbers[his_worker_id.to_s].to_i
- his_number != 0 && (his_number < my_number || (his_number == my_number && his_worker_id < my_worker_id))
- end
+ def am_i_waiting_behind_worker?(numbers, his_worker_id, my_number, my_worker_id)
+ his_number = numbers[his_worker_id.to_s].to_i
+ his_number != 0 && (his_number < my_number || (his_number == my_number && his_worker_id < my_worker_id))
+ end
- def get(key)
- @client.get(CF, key, :consistency => Cassandra::Consistency::QUORUM)
- end
+ def get(key)
+ @client.get(CF, key, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+ def set(key, value)
+ @client.insert(CF, key, value, :consistency => Cassandra::Consistency::QUORUM)
+ end
- def set(key, value)
- @client.insert(CF, key, value, :consistency => Cassandra::Consistency::QUORUM)
end
end
View
50 spec/cassandra_lock_spec.rb
@@ -7,48 +7,48 @@
before(:each) do
CassandraLock.reset_lock("test_lock", 10)
- @lock1 = CassandraLock.new("test_lock", 1)
- @lock2 = CassandraLock.new("test_lock", 2)
+ @handle1 = CassandraLock::Handle.new("test_lock", 1)
+ @handle2 = CassandraLock::Handle.new("test_lock", 2)
@scratchpad = []
end
- context "CassandraLock.new" do
+ context "CassandraLock::Handle.new" do
it "raises a RuntimeError when given an uninitialized lock id" do
- lambda { CassandraLock.new("non_existent_lock", 1) }.should raise_error(RuntimeError)
+ lambda { CassandraLock::Handle.new("non_existent_lock", 1) }.should raise_error(RuntimeError)
end
it "raises an ArgumentError when given a worker ID that is out of range of the lock" do
- lambda { CassandraLock.new("test_lock", 0) }.should raise_error(ArgumentError)
- lambda { CassandraLock.new("test_lock", 11) }.should raise_error(ArgumentError)
+ lambda { CassandraLock::Handle.new("test_lock", 0) }.should raise_error(ArgumentError)
+ lambda { CassandraLock::Handle.new("test_lock", 11) }.should raise_error(ArgumentError)
end
end
- context "CassandraLock#lock" do
+ context "CassandraLock::Handle#lock" do
it "waits if the lock is not available" do
- @lock1.lock
+ @handle1.lock
th = Thread.new do
- @lock2.lock
+ @handle2.lock
@scratchpad << :after_lock
end
Thread.pass while th.status && th.status != "sleep"
@scratchpad.should == []
- @lock1.unlock
+ @handle1.unlock
th.join
@scratchpad.should == [:after_lock]
end
end
- context "CassandraLock#synchronize" do
+ context "CassandraLock::Handle#synchronize" do
it "raises an ArgumentError when called without a block" do
- lambda { @lock1.synchronize }.should raise_error(ArgumentError)
+ lambda { @handle1.synchronize }.should raise_error(ArgumentError)
end
it "acquires the lock before yielding and releases it after" do
th = Thread.new do
- @lock1.synchronize {
+ @handle1.synchronize {
@scratchpad << :after_lock
sleep 0.5
}
@@ -57,27 +57,27 @@
Thread.pass while @scratchpad.empty?
start = Time.now
- @lock2.lock
+ @handle2.lock
time = Time.now - start
time.should be_close(0.5, 0.2)
end
end
- context "CassandraLock#try_lock" do
+ context "CassandraLock::Handle#try_lock" do
it "acquires the lock and returns true if it is available immediately" do
- @lock1.try_lock.should == true
+ @handle1.try_lock.should == true
end
it "returns false if it cannot acquire the lock immediately" do
- @lock1.lock
- @lock2.try_lock.should == false
+ @handle1.lock
+ @handle2.try_lock.should == false
end
it "removes itself from the waiting line if it cannot acquire the lock" do
- @lock1.lock
- @lock2.try_lock.should == false
- @lock1.unlock
- @lock1.lock
+ @handle1.lock
+ @handle2.try_lock.should == false
+ @handle1.unlock
+ @handle1.lock
end
end
@@ -88,12 +88,12 @@
1.upto(10).each do |worker_id|
threads << Thread.new {
- lock = CassandraLock.new("test_lock", worker_id)
- lock.lock
+ handle = CassandraLock::Handle.new("test_lock", worker_id)
+ handle.lock
start = Time.now
sleep 0.2
finish = Time.now
- lock.unlock
+ handle.unlock
holding_periods << [start, finish]
}
end

0 comments on commit ced9e7c

Please sign in to comment.
Something went wrong with that request. Please try again.