Skip to content

Commit

Permalink
more cleanup after merging with master
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanlecompte committed Oct 4, 2012
1 parent 8a7ce27 commit 7503686
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 168 deletions.
189 changes: 29 additions & 160 deletions lib/redis_failover/node_manager.rb
Expand Up @@ -10,18 +10,14 @@ class NodeManager
include Util

# Number of seconds to wait before retrying bootstrap process.
<<<<<<< HEAD
TIMEOUT = 5
# Number of seconds for checking node snapshots.
CHECK_INTERVAL = 10
=======
TIMEOUT = 3

# ZK Errors that the Node Manager cares about.
ZK_ERRORS = [
ZK::Exceptions::LockAssertionFailedError,
ZK::Exceptions::InterruptedSession,
ZKDisconnectedError
ZK::Exceptions::InterruptedSession
].freeze

# Errors that can happen during the node discovery process.
Expand All @@ -31,7 +27,6 @@ class NodeManager
NoMasterError,
MultipleMastersError
].freeze
>>>>>>> master

# Creates a new instance.
#
Expand All @@ -44,50 +39,26 @@ class NodeManager
def initialize(options)
logger.info("Redis Node Manager v#{VERSION} starting (#{RUBY_DESCRIPTION})")
@options = options
<<<<<<< HEAD
@root_znode = options.fetch(:znode_path, Util::DEFAULT_ROOT_ZNODE_PATH)
@decision_mode = options.fetch(:decision_mode, :majority)
@lock = Mutex.new
@master_manager = false
=======
@znode = @options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@manual_znode = ManualFailover::ZNODE_PATH
@mutex = Mutex.new
@lock = Mutex.new
@shutdown = false
@leader = false
@master = nil
@slaves = []
@unavailable = []
@lock_path = "#{@znode}_lock".freeze
>>>>>>> master
end

# Starts the node manager.
#
# @note This method does not return until the manager terminates.
def start
<<<<<<< HEAD
return unless running?
setup_zk
discover_nodes
spawn_watchers
wait_until_master
rescue ZK::Exceptions::InterruptedSession => ex
=======
return unless running?
@queue = Queue.new
setup_zk
logger.info('Waiting to become master Node Manager ...')
with_lock do
@leader = true
logger.info('Acquired master Node Manager lock')
if discover_nodes
initialize_path
spawn_watchers
handle_state_reports
end
end
rescue *ZK_ERRORS => ex
>>>>>>> master
logger.error("ZK error while attempting to manage nodes: #{ex.inspect}")
reset
retry
Expand All @@ -107,31 +78,21 @@ def notify_state(node, state = nil)
logger.error(ex.backtrace.join("\n"))
end

<<<<<<< HEAD
# Performs a graceful shutdown of the manager.
def shutdown
@master_manager = false
@watchers.each(&:shutdown) if @watchers
=======
# Performs a reset of the manager.
def reset
@leader = false
@master_manager = false
@watchers.each(&:shutdown) if @watchers
@queue.clear
@zk.close! if @zk
@zk_lock = nil
end

# Initiates a graceful shutdown.
def shutdown
logger.info('Shutting down ...')
@mutex.synchronize do
@lock.synchronize do
@shutdown = true
unless @leader
reset
end
exit(0)
end
>>>>>>> master
end

private
Expand All @@ -140,52 +101,16 @@ def shutdown
def setup_zk
@zk.close! if @zk
@zk = ZK.new("#{@options[:zkservers]}#{@options[:chroot] || ''}")
<<<<<<< HEAD
create_path(@root_znode)
create_path(current_state_root)
@zk.register(manual_failover_path) do |event|
handle_manual_failover_update(event)
=======
@zk.on_expired_session { notify_state(:zk_disconnected, nil) }

@zk.register(@manual_znode) do |event|
@zk.register(manual_failover_path) do |event|
if event.node_created? || event.node_changed?
perform_manual_failover
end
end

@zk.on_connected { @zk.stat(@manual_znode, :watch => true) }
@zk.stat(@manual_znode, :watch => true)
end

# Handles periodic state reports from {RedisFailover::NodeWatcher} instances.
def handle_state_reports
while running? && (state_report = @queue.pop)
begin
@mutex.synchronize do
return unless running?
@zk_lock.assert!
node, state = state_report
case state
when :unavailable then handle_unavailable(node)
when :available then handle_available(node)
when :syncing then handle_syncing(node)
when :zk_disconnected then raise ZKDisconnectedError
else raise InvalidNodeStateError.new(node, state)
end

# flush current state
write_state
end
rescue *ZK_ERRORS
# fail hard if this is a ZK connection-related error
raise
rescue => ex
logger.error("Error handling #{state_report.inspect}: #{ex.inspect}")
logger.error(ex.backtrace.join("\n"))
end
>>>>>>> master
end
@zk.on_connected { @zk.stat(manual_failover_path, :watch => true) }
@zk.stat(manual_failover_path, :watch => true)
end

Expand Down Expand Up @@ -245,8 +170,6 @@ def handle_syncing(node)
handle_available(node)
end

<<<<<<< HEAD
=======
# Handles a manual failover request to the given node.
#
# @param [Node] node the candidate node for failover
Expand All @@ -261,7 +184,6 @@ def handle_manual_failover(node)
promote_new_master(node)
end

>>>>>>> master
# Promotes a new master.
#
# @param [Node] node the optional node to promote
Expand All @@ -288,28 +210,18 @@ def promote_new_master(node = nil)
# Discovers the current master and slave nodes.
# @return [Boolean] true if nodes successfully discovered, false otherwise
def discover_nodes
<<<<<<< HEAD
@monitored_available, @monitored_unavailable, @unavailable = [], [], []
nodes = @options[:nodes].map { |opts| Node.new(opts) }.uniq
@master = find_master(nodes)
@slaves = nodes - [@master]
logger.info("Monitoring master (#{@master}) and slaves" +
" (#{@slaves.map(&:to_s).join(', ')})")
=======
@mutex.synchronize do
return false unless running?
@lock.synchronize do
return unless running?
@monitored_available, @monitored_unavailable, @unavailable = [], [], []
nodes = @options[:nodes].map { |opts| Node.new(opts) }.uniq
if @master = find_existing_master
logger.info("Using master #{@master} from existing znode config.")
elsif @master = guess_master(nodes)
logger.info("Guessed master #{@master} from known redis nodes.")
end
@slaves = nodes - [@master]
logger.info("Managing master (#{@master}) and slaves " +
logger.info("Monitoring master (#{@master}) and slaves " +
"(#{@slaves.map(&:to_s).join(', ')})")
# ensure that slaves are correctly pointing to this master
redirect_slaves_to(@master)
true
end
rescue *NODE_DISCOVERY_ERRORS => ex
msg = <<-MSG.gsub(/\s+/, ' ')
Expand All @@ -327,7 +239,7 @@ def discover_nodes

# Seeds the initial node master from an existing znode config.
def find_existing_master
if data = @zk.get(@znode).first
if data = @zk.get(redis_nodes_path).first
nodes = symbolize_keys(decode(data))
master = node_from(nodes[:master])
logger.info("Master from existing znode config: #{master || 'none'}")
Expand All @@ -352,17 +264,12 @@ def node_from(node_string)
return if node_string.nil?
host, port = node_string.split(':', 2)
Node.new(:host => host, :port => port, :password => @options[:password])
>>>>>>> master
end

# Spawns the {RedisFailover::NodeWatcher} instances for each managed node.
def spawn_watchers
@watchers = [@master, @slaves, @unavailable].flatten.compact.map do |node|
<<<<<<< HEAD
NodeWatcher.new(self, node, @options.fetch(:max_failures, 3))
=======
NodeWatcher.new(self, node, @options[:max_failures] || 3)
>>>>>>> master
end
@watchers.each(&:watch)
end
Expand Down Expand Up @@ -495,34 +402,6 @@ def handle_manual_failover_update(event)
@zk.stat(@manual_znode, :watch => true)
end

<<<<<<< HEAD
# Schedules a manual failover to a redis node.
def perform_manual_failover
return unless @master_manager
new_master = @zk.get(manual_failover_path, :watch => true).first
return unless new_master && new_master.size > 0
logger.info("Received manual failover request for: #{new_master}")
logger.info("Current nodes: #{current_nodes.inspect}")

node = if new_master == ManualFailover::ANY_SLAVE
@slaves.shuffle.first
else
node_from(new_master)
end

if node.nil?
logger.error('Failed to perform manual failover, no candidate found.')
elsif node == @master
logger.error("Node #{new_master} is already the current master.")
else
logger.info("Performing manual failover")
# make current master a slave, and promote new master
@slaves << @master
@slaves.delete(node)
promote_new_master(node)
end
end

# Produces a FQDN id for this Node Manager.
#
# @return [String] the FQDN for this Node Manager
Expand Down Expand Up @@ -653,24 +532,25 @@ def current_node_snapshots
# Waits until this node manager becomes the master.
def wait_until_master
logger.info('Waiting to become master Node Manager ...')
@zk.with_lock('master_manager_lock') do

with_lock do
@master_manager = true
logger.info("Acquired master Node Manager lock. " +
"Managing nodes in #{@decision_mode} mode.")
seed_initial_node_states
# Re-discover nodes, since the state of the world may have been changed
# by the time we've become the primary node manager.
discover_nodes

# Periodically update master config state.
while master_manager?
while running? && master_manager?
@lock.synchronize do
begin
snapshots = current_node_snapshots
snapshots.each do |node, snapshot|
update_master_state(node, snapshot)
end

# flush current master state
write_current_redis_nodes
snapshots = current_node_snapshots
snapshots.each do |node, snapshot|
update_master_state(node, snapshot)
end

# flush current master state
write_current_redis_nodes
end

sleep(CHECK_INTERVAL)
Expand All @@ -688,19 +568,9 @@ def node_from(node_string)
Node.new(:host => host, :port => port, :password => @options[:password])
end

# Seeds the initial node states when this manager becomes the master.
def seed_initial_node_states
if data = @zk.get(redis_nodes_path).first
logger.info('Seeding initial node states ...')
nodes = symbolize_keys(decode(data))
@master = node_from(nodes[:master])
@slaves = nodes[:slaves].map { |n| node_from(n) }
@unavailable = nodes[:unavailable].map { |n| node_from(n) }
logger.info("Seeded initial node states: #{current_nodes}")
=======
# Executes a block wrapped in a ZK exclusive lock.
def with_lock
@zk_lock = @zk.locker(@lock_path)
@zk_lock = @zk.locker('master_redis_node_manager_lock')

begin
@zk_lock.lock!(true)
Expand All @@ -718,8 +588,8 @@ def with_lock

# Perform a manual failover to a redis node.
def perform_manual_failover
@mutex.synchronize do
return unless running? && @leader && @zk_lock
@lock.synchronize do
return unless running? && @master_manager && @zk_lock
@zk_lock.assert!
new_master = @zk.get(@manual_znode, :watch => true).first
return unless new_master && new_master.size > 0
Expand All @@ -732,13 +602,12 @@ def perform_manual_failover
else
logger.error('Failed to perform manual failover, no candidate found.')
end
>>>>>>> master
end
rescue => ex
logger.error("Error handling a manual failover: #{ex.inspect}")
logger.error(ex.backtrace.join("\n"))
ensure
@zk.stat(@manual_znode, :watch => true)
@zk.stat(manual_failover_path, :watch => true)
end

# @return [Boolean] true if running, false otherwise
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_failover/node_snapshot.rb
@@ -1,5 +1,5 @@
module RedisFailover
# Represents a snapshot of a particular node as seen by all currently running
# Represents a snapshot of a particular redis node as seen by all currently running
# redis node managers.
class NodeSnapshot
# @return [String] the redis node
Expand Down
9 changes: 2 additions & 7 deletions lib/redis_failover/node_watcher.rb
Expand Up @@ -52,13 +52,8 @@ def monitor_node
sleep(WATCHER_SLEEP_TIME)
@node.ping
failures = 0

if @node.syncing_with_master?
notify(:syncing)
else
notify(:available)
@node.wait
end
notify(:available)
@node.wait
rescue NodeUnavailableError => ex
logger.debug("Failed to communicate with node #{@node}: #{ex.inspect}")
failures += 1
Expand Down

0 comments on commit 7503686

Please sign in to comment.