Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

attempt to no longer wrap ZK instance

  • Loading branch information...
commit b8e6742c80139f00166090634082aa621e66670b 1 parent f166261
@ryanlecompte authored
View
1  lib/redis_failover.rb
@@ -16,6 +16,5 @@
require 'redis_failover/client'
require 'redis_failover/runner'
require 'redis_failover/version'
-require 'redis_failover/zk_client'
require 'redis_failover/node_manager'
require 'redis_failover/node_watcher'
View
17 lib/redis_failover/client.rb
@@ -134,22 +134,25 @@ def inspect
private
def setup_zookeeper_client
- @zkclient = ZkClient.new(@zkservers) do |client|
+ @zkclient = ZK.new(@zkservers).tap do |client|
# when session expires, purge client list
- client.on_session_expiration do
+ client.on_expired_session do
+ logger.info('ZK session expired callback received')
purge_clients
+ client.reopen
+ end
+
+ client.on_connected do
+ logger.info('ZK connected callback received')
+ build_clients
end
# when we are disconnected, purge client list
client.event_handler.register_state_handler(:connecting) do
+ logger.info('ZK connecting callback received')
purge_clients
end
- # when session is recovered, watch again
- client.on_session_recovered do
- client.stat(@znode, :watch => true)
- end
-
# register a watcher for future changes
client.watcher.register(@znode) do |event|
if event.node_created? || event.node_changed?
View
7 lib/redis_failover/node_manager.rb
@@ -26,10 +26,15 @@ def initialize(options)
@znode = @options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@unavailable = []
@queue = Queue.new
+ @zkclient = ZK.new(@options[:zkservers]).tap do |client|
+ client.on_expired_session do
+ logger.info('ZK session expired callback received')
+ client.reopen
+ end
+ end
end
def start
- @zkclient = ZkClient.new(@options[:zkservers])
logger.info('Waiting to become master Node Manager ...')
@zkclient.with_lock(LOCK_PATH) do
logger.info('Acquired master Node Manager lock')
View
118 lib/redis_failover/zk_client.rb
@@ -1,118 +0,0 @@
-module RedisFailover
- # ZkClient is a thin wrapper over the ZK client to gracefully handle reconnects
- # when a session expires.
- class ZkClient
- include Util
-
- # Time to sleep before retrying a failed operation.
- TIMEOUT = 2
-
- # Maximum reconnect attempts.
- MAX_RECONNECTS = 3
-
- # Errors that are candidates for rebuilding the underlying ZK client.
- RECONNECTABLE_ERRORS = [
- ZookeeperExceptions::ZookeeperException::SessionExpired,
- ZookeeperExceptions::ZookeeperException::SystemError,
- ZookeeperExceptions::ZookeeperException::ConnectionLoss,
- ZookeeperExceptions::ZookeeperException::OperationTimeOut,
- ZookeeperExceptions::ZookeeperException::AuthFailed,
- ZookeeperExceptions::ZookeeperException::SessionMoved,
- ZookeeperExceptions::ZookeeperException::ConnectionClosed,
- ZookeeperExceptions::ZookeeperException::NotConnected
- ].freeze
-
- # ZK methods that are wrapped with reconnect logic.
- WRAPPED_ZK_METHODS = [
- :get,
- :set,
- :watcher,
- :event_handler,
- :stat,
- :create,
- :delete,
- :with_lock].freeze
-
- def initialize(servers, &setup_block)
- @servers = servers
- @setup_block = setup_block
- @lock = Monitor.new
-
- @on_session_expiration = @on_session_recovered = nil
- @session_expiration_sub = @session_recovered_sub = nil
-
- build_client
- end
-
- def on_session_expiration(&block)
- @lock.synchronize do
- @on_session_expiration = block
- end
- end
-
- def on_session_recovered(&block)
- @lock.synchronize do
- @on_session_recovered = block
- end
- end
-
- # method_missing?
- WRAPPED_ZK_METHODS.each do |zk_method|
- class_eval(<<-RUBY, __FILE__, __LINE__ + 1)
- def #{zk_method}(*args, &block)
- perform_with_reconnect do
- client = @lock.synchronize { @client }
- client.#{zk_method}(*args, &block)
- end
- end
- RUBY
- end
-
- private
- def handle_expired_session_event(*ignored)
- cb = @lock.synchronize { @on_session_expiration }
- cb.call if cb
- end
-
- def handle_connected_event(*ignored)
- cb = @lock.synchronize { @on_session_recovered }
- cb.call if cb
- end
-
- def perform_with_reconnect
- tries = 0
- begin
- yield
- rescue *RECONNECTABLE_ERRORS => ex
- logger.error("ZooKeeper connection error, rebuilding client: #{ex.inspect}")
- logger.error(ex.backtrace.join("\n"))
- if tries < MAX_RECONNECTS
- tries += 1
- @on_session_expiration.call if @on_session_expiration
- build_client
- @on_session_recovered.call if @on_session_recovered
- sleep(TIMEOUT)
- retry
- end
-
- raise
- end
- end
-
- def build_client
- @lock.synchronize do
- @session_expiration_sub.unregister if @session_expiration_sub
- @session_recovered_sub.unregister if @session_recovered_sub
- @client.close! if @client
-
- @client = ZK.new(@servers)
-
- @session_expiration_sub = @client.on_expired_session(&method(:handle_expired_session_event))
- @session_recovered_sub = @client.on_connected(&method(:handle_connected_event))
-
- @setup_block.call(self) if @setup_block
- logger.info("Communicating with ZooKeeper servers #{@servers}")
- end
- end
- end
-end
Please sign in to comment.
Something went wrong with that request. Please try again.