Permalink
Browse files

blam

  • Loading branch information...
0 parents commit c78468b992e26ad724dde8a679e6f99d42423fe4 @jakedouglas committed Sep 13, 2010
Showing with 441 additions and 0 deletions.
  1. +58 −0 README
  2. +56 −0 Rakefile
  3. +204 −0 lib/cassandra_lock.rb
  4. +123 −0 spec/cassandra_lock_spec.rb
@@ -0,0 +1,58 @@
+== CassandraLock
+
+Distributed mutexes using Cassandra
+
+== Summary
+
+This is just a proof of concept implementation of
+Lamport's_Bakery_Algorithm[http://en.wikipedia.org/wiki/Lamport's_bakery_algorithm].
+
+It uses Cassandra to store and operate on the locks, instead of
+local shared memory. This is feasible because the Bakery algorithm
+(awesomely?) does not rely on a CAS primitive.
+
+It supports any number of locks because they are just Cassandra rows.
+
+== Example
+
+ # Sets up column families used for the lock data
+ CassandraLock.keyspace = "MyApp"
+ CassandraLock.host = "127.0.0.1:9160"
+ CassandraLock.reset_cfs!
+
+ # Initialize a lock called "foo" with a maximum of 10 workers
+ CassandraLock.setup_lock("foo", 10)
+
+ # Create a corresponding lock object with worker id 1 (ids start at 1, not 0)
+ lock = CassandraLock.new("foo", 1)
+
+ # Normal lock/unlock
+ lock.lock
+ ...stuff...
+ lock.unlock
+
+ # Acquire, yield, and release
+ lock.synchronize {
+ ...stuff...
+ }
+
+ # Only do something if the lock is immediately available
+ if lock.try_lock
+ ...stuff...
+ lock.unlock
+ else
+ raise "couldn't acquire lock!"
+ end
+
+== Problems (lots)
+
+* The lock must have it's maximum number of workers predefined.
+* Workers using the lock must know their unique worker ID.
+* It has to use quorum reads and writes to be coherent.
+* It's inefficient, taking 5 or so exchanges with Cassandra just to acquire a free lock.
+* Acquisition time increases significantly with some correlation to the number of active workers.
+* A crashed worker can leave the mutex locked. This is probably addressable with Cassandra TTLs.
+* Locking in a distributed database...hmm.
+
+
+Implemented by Jake Douglas at RightScale, Inc. Concept from Thorsten von Eicken.
@@ -0,0 +1,56 @@
+require 'rubygems'
+require 'spec/rake/spectask'
+require 'lib/cassandra_lock'
+
+task :default => :spec
+
+Spec::Rake::SpecTask.new(:spec) do |t|
+ t.spec_opts = ["--colour --format=nested --backtrace"]
+ t.spec_files = Dir['spec/*.rb'].sort
+end
+
+task :reset_keyspace do
+ if Cassandra.VERSION < "0.7"
+ raise "Need Cassandra 0.7 to manipulate keyspaces"
+ end
+
+ keyspace = ENV['KEYSPACE'] || "CassandraLock"
+ host = ENV['HOST'] || "127.0.0.1:9160"
+ rf = ENV['RF'] || "1"
+
+ puts "Using values:"
+ puts "KEYSPACE: #{keyspace}"
+ puts "HOST: #{host}"
+ puts "RF: #{rf}"
+ puts "Dropping and re-adding keyspace..."
+
+ c = Cassandra.new("system", host, :timeout => 5)
+ c.drop_keyspace(keyspace) rescue nil
+
+ CassandraLock.keyspace = keyspace
+
+ ks_def = Cassandra::Keyspace.new(:name => keyspace,
+ :strategy_class => "org.apache.cassandra.locator.RackUnawareStrategy",
+ :replication_factor => rf.to_i,
+ :cf_defs => CassandraLock.cf_defs)
+
+ c.add_keyspace(ks_def)
+
+ puts "Done"
+end
+
+task :reset_cfs do
+ keyspace = ENV['KEYSPACE'] || "CassandraLock"
+ host = ENV['HOST'] || "127.0.0.1:9160"
+
+ puts "Using values:"
+ puts "KEYSPACE: #{keyspace}"
+ puts "HOST: #{host}"
+ puts "Resetting lock CFs..."
+
+ CassandraLock.keyspace = keyspace
+ CassandraLock.host = host
+ CassandraLock.reset_cfs!
+
+ puts "Done"
+end
@@ -0,0 +1,204 @@
+begin
+ require 'cassandra/0.7'
+rescue LoadError
+ require 'cassandra'
+end
+
+class CassandraLock
+ TRUE = "1"
+ FALSE = "0"
+ CHOOSING = "_locks_choosing"
+ NUMBERS = "_locks_numbers"
+ SLEEP_DURATION = 0.1
+
+ class << self
+ attr_writer :keyspace, :host
+
+ def host
+ @host || "127.0.0.1:9160"
+ end
+
+ def keyspace
+ @keyspace || "CassandraLock"
+ end
+
+ def reset_cfs!
+ if Cassandra.VERSION < "0.7"
+ raise "Need Cassandra 0.7 to manipulate CFs"
+ end
+
+ client.drop_column_family(CHOOSING) rescue nil
+ client.drop_column_family(NUMBERS) rescue nil
+
+ cf_defs.each do |cf|
+ client.add_column_family(cf)
+ end
+
+ nil
+ end
+
+ def setup_lock(lock_id, max_workers)
+ if get(CHOOSING, lock_id).any? || get(NUMBERS, lock_id).any?
+ raise "Lock #{lock_id} exists! use reset_lock()"
+ end
+
+ workers = (1..max_workers)
+ empty = workers.inject({}){|acc, worker| acc.merge(worker.to_s => "0") }
+
+ set(CHOOSING, lock_id, empty)
+ set(NUMBERS, lock_id, empty)
+
+ nil
+ end
+
+ def reset_lock(lock_id, max_workers)
+ delete_lock(lock_id)
+ setup_lock(lock_id, max_workers)
+ end
+
+ def delete_lock(lock_id)
+ client.remove(CHOOSING, lock_id, :consistency => Cassandra::Consistency::QUORUM)
+ client.remove(NUMBERS, lock_id, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+ def cf_defs
+ [Cassandra::ColumnFamily.new(:keyspace => self.keyspace,
+ :name => CHOOSING),
+ Cassandra::ColumnFamily.new(:keyspace => self.keyspace,
+ :name => NUMBERS)]
+ end
+
+ def get(cf, key)
+ client.get(cf, key, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+ def set(cf, key, value)
+ client.insert(cf, key, value, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+ def client
+ @client ||= Cassandra.new(keyspace, host, :timeout => 5)
+ end
+ end
+
+ def initialize(lock_id, my_worker_id, keyspace=nil)
+ @keyspace = keyspace || self.class.keyspace
+ @lock_id = lock_id
+ @my_worker_id = my_worker_id
+ @client = Cassandra.new(@keyspace)
+
+ numbers = get(NUMBERS, @lock_id)
+
+ if numbers.empty?
+ raise "Lock '#{lock_id}' does not exist"
+ end
+
+ workers = numbers.keys.map{|n| n.to_i }
+ range = (workers.min)..(workers.max)
+
+ unless range.include?(my_worker_id)
+ raise ArgumentError, "Worker ID #{my_worker_id} out of range. Valid range for this lock: #{range}"
+ end
+ end
+
+ def lock
+ worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id)
+ acquire(@lock_id, worker_count, my_number, @my_worker_id, true)
+ end
+
+ def try_lock
+ worker_count, my_number = get_worker_count_and_current_number(@lock_id, @my_worker_id)
+ lock_acquired = acquire(@lock_id, worker_count, my_number, @my_worker_id, false)
+
+ unless lock_acquired
+ get_out_of_line
+ end
+
+ lock_acquired
+ end
+
+ def unlock
+ get_out_of_line
+ nil
+ end
+
+ def synchronize
+ unless block_given?
+ raise ArgumentError, "Block required"
+ end
+
+ lock()
+ yield
+ ensure
+ unlock()
+ end
+
+ private
+
+ def acquire(lock_id, worker_count, my_number, my_worker_id, should_wait)
+ choosing = get(CHOOSING, lock_id)
+ numbers = get(NUMBERS, lock_id)
+
+ (1..worker_count).each do |worker_id|
+ while is_worker_choosing?(choosing, worker_id)
+ if should_wait
+ sleep SLEEP_DURATION
+ choosing = get(CHOOSING, lock_id)
+ else
+ return false
+ end
+ end
+
+ while am_i_waiting_behind_worker?(numbers, worker_id, my_number, my_worker_id)
+ if should_wait
+ sleep SLEEP_DURATION
+ numbers = get(NUMBERS, lock_id)
+ else
+ return false
+ end
+ end
+ end
+
+ true
+ end
+
+ def get_worker_count_and_current_number(lock_id, worker_id)
+ worker_id = worker_id.to_s
+
+ # indicate that we are in the process of picking a number
+ set(CHOOSING, lock_id, { worker_id => TRUE })
+
+ # get the current highest number, add 1, insert it as our number
+ numbers = get(NUMBERS, lock_id)
+ my_number = numbers.values.map{|n| n.to_i }.max + 1
+ set(NUMBERS, lock_id, { worker_id => my_number.to_s })
+
+ # indicate that we are done choosing our number
+ set(CHOOSING, lock_id, { worker_id => FALSE })
+
+ # return the total count of workers and our current number
+ [numbers.size, my_number]
+ end
+
+ def get_out_of_line
+ set(NUMBERS, @lock_id, { @my_worker_id.to_s => "0" })
+ end
+
+ def is_worker_choosing?(choosing, worker_id)
+ choosing[worker_id.to_s] == TRUE
+ end
+
+ def am_i_waiting_behind_worker?(numbers, his_worker_id, my_number, my_worker_id)
+ his_number = numbers[his_worker_id.to_s].to_i
+ his_number != 0 && (his_number < my_number || (his_number == my_number && his_worker_id < my_worker_id))
+ end
+
+ def get(cf, key)
+ @client.get(cf, key, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+ def set(cf, key, value)
+ @client.insert(cf, key, value, :consistency => Cassandra::Consistency::QUORUM)
+ end
+
+end
Oops, something went wrong.

0 comments on commit c78468b

Please sign in to comment.