Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: soulware/distlock
base: cf6eaba315
...
head fork: soulware/distlock
compare: 0ebcf3a4f8
Checking mergeability… Don't worry, you can still create the pull request.
  • 9 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
1  .gitignore
@@ -1,4 +1,5 @@
*.gem
.bundle
+.rvmrc
Gemfile.lock
pkg/*
View
3  lib/distlock.rb
@@ -1,10 +1,11 @@
require 'distlock/version'
+require 'distlock/lock_error'
require 'distlock/zk/zk'
module Distlock
end
-# convenience so we can use either name
+# convenience so we can use either Distlock or DistLock
module DistLock
include Distlock
end
View
4 lib/distlock/lock_error.rb
@@ -0,0 +1,4 @@
+module Distlock
+ class LockError < StandardError
+ end
+end
View
2  lib/distlock/version.rb
@@ -1,3 +1,3 @@
module Distlock
- VERSION = "0.0.6"
+ VERSION = "0.0.7"
end
View
13 lib/distlock/zk/affinity_pool.rb
@@ -1,13 +0,0 @@
-module Distlock
- module ZK
- class AffinityPool
- include Distlock::ZK::Common
-
- def initialize(options={})
- defaults = {:host => "localhost:2181", :timeout => 10, :root_path => "/affinity/pool/test"}
- @options = defaults.merge(options)
- end
-
- end
- end
-end
View
24 lib/distlock/zk/common.rb
@@ -1,3 +1,5 @@
+require 'logger'
+
module Distlock
module ZK
module Common
@@ -7,6 +9,14 @@ def zk
end
end
+ def logger
+ @logger ||= Logger.new(STDOUT)
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+
# does a node exist for the given path?
def exists?(path)
puts "checking if #{path} exists"
@@ -42,7 +52,19 @@ def create_sequenced_ephemeral(path, prefix="lock")
puts lock_path
result = zk.create(:path => lock_path, :sequence => true, :ephemeral => true)
puts result
- end
+ result[:path]
+ end
+
+ # access @zk directly here, we don't want to lazy instantiate again if closed already
+ def close
+ begin
+ @zk.close if @zk
+ rescue StandardError, ZookeeperExceptions::ZookeeperException => error
+ logger.error("Distlock::ZK::Common#close: caught and squashed error while closing connection - #{error}")
+ end
+
+ @zk = nil
+ end
end
end
end
View
154 lib/distlock/zk/exclusive_lock.rb
@@ -0,0 +1,154 @@
+module Distlock
+ module ZK
+ class ExclusiveLock
+ include Distlock::ZK::Common
+
+ WATCHER_TIMEOUT = 60 #seconds
+
+ def initialize(options={})
+ defaults = {:host => "localhost:2181", :timeout => 10, :root_path => "/lock/exclusive/default"}
+ @options = defaults.merge(options)
+ end
+
+ def owner?
+ @owner
+ end
+
+ def my_lock
+ @my_lock
+ end
+
+ def do_watcher(watcher, lock)
+ if watcher.type == ZookeeperConstants::ZOO_DELETED_EVENT
+ logger.debug "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for delete of node - #{lock}"
+ else
+ if watcher.type == ZookeeperConstants::ZOO_CREATED_EVENT
+ logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for creation of node (should never happen for an ephemeral node?) - #{lock}"
+ elsif watcher.type == ZookeeperConstants::ZOO_SESSION_EVENT
+ logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for zoo session event, closing the session for this client - #{lock}"
+ else
+ logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for unexpected event, closing the session for this client - #{lock}, event - #{watcher.type}"
+ end
+
+ close
+ raise LockError.new("Distlock::ZK::ExclusiveLock#do_watcher: got an unexpected watcher type - #{watcher.type}")
+ end
+
+ @watcher_called=true
+ end
+
+ def check_for_existing_lock(path)
+ children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}
+ children.detect do |child|
+ logger.debug "checking existing lock for our client_id - #{child} vs. #{zk.client_id}"
+ if child.split('-')[1] == "#{zk.client_id}"
+ logger.debug "found existing lock for client_id #{zk.client_id}, lock - #{child}, reusing"
+ return "#{path}/#{child}"
+ end
+ end
+ end
+
+ def lock(path)
+ @owner = false
+
+ safe_create(path)
+
+ # TODO - combine these into a single method like find_or_create
+ lock = check_for_existing_lock(path)
+ lock = create_sequenced_ephemeral(path) unless lock
+
+ logger.debug "my lock path - #{lock}"
+ @my_lock = lock
+ result = _get_lock(lock)
+ logger.info("Distlock::ZK::ExclusiveLock#lock: lock acquired - #{lock}")
+
+ result
+ end
+
+ def unlock(lock = @my_lock)
+ return unless lock
+
+ logger.debug "unlocking - #{lock}"
+
+ zk.delete(:path => lock)
+
+ if lock == @my_lock
+ @my_lock = nil
+ @owner = false
+ end
+ end
+
+ def _get_lock(lock)
+ logger.debug "_get_lock: entered for #{lock}"
+
+ while !@owner
+
+ path = lock.split('/')[0...-1].join('/')
+
+ # TODO - pass this in as parameter?
+ children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}
+ lock_last = lock.split('/').last
+ lock_idx = children.index(lock_last)
+
+ if lock_idx.nil?
+ logger.error("Distlock::ZK::ExclusiveLock#_get_lock: failed to find our lock in the node children (connection reset?)")
+ raise LockError.new("failed to find our lock in the node children (connection reset?)")
+ elsif lock_idx == 0
+ logger.debug "lock acquired (client id - #{zk.client_id}), lock - #{lock}"
+ @owner = true
+ return true
+ else
+ logger.debug "Distlock::ZK::ExclusiveLock#_get_lock: lock contention for #{lock} - #{children.inspect} (my client id - #{zk.client_id})"
+ logger.info "Distlock::ZK::ExclusiveLock#_get_lock: lock contention - #{lock}"
+
+ to_watch = "#{path}/#{children[lock_idx-1]}"
+ logger.debug "about to set watch on - #{to_watch}"
+
+ # 2-step process so we minimise the chance of setting watches on the node if it does not exist for any reason
+ @watcher_called=false
+ @watcher = Zookeeper::WatcherCallback.new { do_watcher(@watcher, lock) }
+ resp = zk.stat(:path => to_watch)
+ resp = zk.stat(:path => to_watch, :watcher => @watcher) if resp[:stat].exists
+
+ if resp[:stat].exists
+ logger.info "Distlock::ZK::ExclusiveLock#_get_lock: watcher set, node exists, watching - #{to_watch}, our lock - #{lock}"
+ start_time = Time.now
+ while !@watcher_called
+ sleep 0.1
+
+ if (start_time + WATCHER_TIMEOUT) < Time.now
+ logger.error("Distlock::ZK::ExclusiveLock#_get_lock: timed out while watching - #{to_watch}, our lock - #{lock}, closing session and bombing out")
+ close
+ raise LockError.new("Distlock::ZK::ExclusiveLock#_get_lock timed out while waiting for watcher")
+ end
+ end
+ else
+ logger.error("Distlock::ZK::ExclusiveLock#_get_lock: node we are watching does not exist, closing session, lock - #{lock}")
+ close
+ raise LockError.new("node we tried to watch does not exist")
+ end
+ end
+ end
+ end
+
+ def with_lock(path="/distlock/zk/exclusive_lock/default")
+ begin
+ lock(path)
+ yield if block_given?
+ rescue ZookeeperExceptions::ZookeeperException::SessionExpired => e
+ close
+ raise LockError.new("error encountered while attempting to obtain lock - #{e}, zookeeper session has been closed")
+ rescue ZookeeperExceptions::ZookeeperException => e
+ raise LockError.new("error encountered while attempting to obtain lock - #{e}")
+ ensure
+ begin
+ unlock
+ rescue ZookeeperExceptions::ZookeeperException => e
+ logger.error("Distlock::ZK::ExclusiveLock#with_lock: error while unlocking - #{e}, closing session to clean up our lock")
+ close
+ end
+ end
+ end
+ end
+ end
+end
View
2  lib/distlock/zk/zk.rb
@@ -1,6 +1,6 @@
require 'zookeeper'
require 'distlock/zk/common'
-require 'distlock/zk/affinity_pool'
+require 'distlock/zk/exclusive_lock'
module Distlock
module ZK

No commit comments for this range

Something went wrong with that request. Please try again.