Skip to content
Browse files

add support for failover strategies

  • Loading branch information...
1 parent db1d77a commit cf4ce5418bee9049161ac1cce66401b6cfdf9984 @ryanlecompte committed Oct 4, 2012
View
2 lib/redis_failover.rb
@@ -22,4 +22,4 @@
require 'redis_failover/node_watcher'
require 'redis_failover/node_snapshot'
require 'redis_failover/manual_failover'
-
+require 'redis_failover/failover_strategy'
View
20 lib/redis_failover/cli.rb
@@ -49,10 +49,15 @@ def self.parse(source)
end
opts.on('--node-strategy STRATEGY',
- 'Strategy used when determining availability of nodes (single, majority or consensus)') do |strategy|
+ 'Strategy used when determining availability of nodes (default: majority)') do |strategy|
options[:node_strategy] = strategy
end
+ opts.on('--failover-strategy STRATEGY',
+ 'Strategy used when failing over to a new node (default: latency)') do |strategy|
+ options[:failover_strategy] = strategy
+ end
+
opts.on('-h', '--help', 'Display all options') do
puts opts
exit
@@ -76,11 +81,6 @@ def self.parse(source)
def self.invalid_options?(options)
return true if options.empty?
return true unless options.values_at(:nodes, :zkservers).all?
- if (strategy = options[:node_strategy]) &&
- !%w(single majority consensus).include?(strategy)
- return true
- end
-
false
end
@@ -123,8 +123,12 @@ def self.prepare(options)
options[:nodes].each { |opts| opts.update(:password => password) }
end
- if strategy = options[:node_strategy]
- options[:node_strategy] = strategy.to_sym
+ if node_strategy = options[:node_strategy]
+ options[:node_strategy] = node_strategy.to_sym
+ end
+
+ if failover_strategy = options[:failover_strategy]
+ options[:failover_strategy] = failover_strategy.to_sym
end
options
View
15 lib/redis_failover/failover_strategy.rb
@@ -0,0 +1,15 @@
+module RedisFailover
+ # Loads various strategies for determining which node is used during failover.
+ module FailoverStrategy
+ # Loads a strategy based on the given name.
+ #
+ # @param [String, Symbol] name the strategy name
+ # @return [Object] a new strategy instance
+ def self.for(name)
+ require "redis_failover/failover_strategy/#{name.downcase}"
+ const_get(name.capitalize).new
+ rescue LoadError, NameError
+ raise "Failed to find failover strategy: #{name}"
+ end
+ end
+end
View
31 lib/redis_failover/failover_strategy/latency.rb
@@ -0,0 +1,31 @@
+module RedisFailover
+ module FailoverStrategy
+ # Failover strategy that selects an availaboe node that is both seen by all
+ # node managers and has the lowest reported health check latency.
+ class Latency
+ include Util
+
+ # Returns a candidate node as determined by this strategy.
+ #
+ # @param [Hash<Node, NodeSnapshot>] snapshots the node snapshots
+ # @return [Node] the candidate node or nil if one couldn't be found
+ def find_candidate(snapshots)
+ logger.info('Attempting to find candidate from snapshots:')
+ logger.info(snapshots.join("\n"))
+
+ all_node_managers = Set.new
+ all_node_managers.merge(snapshots.map(&:node_managers).flatten)
+ candidates = {}
+ snapshots.each do |snapshot|
+ if snapshot.available_count == all_node_managers.size
+ candidates[snapshot.node] = snapshot.avg_latency
+ end
+ end
+
+ if candidate = candidates.min_by(&:last)
+ candidate.first
+ end
+ end
+ end
+ end
+end
View
15 lib/redis_failover/node_manager.rb
@@ -41,6 +41,7 @@ def initialize(options)
@options = options
@root_znode = options.fetch(:znode_path, Util::DEFAULT_ROOT_ZNODE_PATH)
@node_strategy = NodeStrategy.for(options.fetch(:node_strategy, :majority))
+ @failover_strategy = FailoverStrategy.for(options.fetch(:failover_strategy, :latency))
@nodes = Array(@options[:nodes]).map { |opts| Node.new(opts) }.uniq
@master_manager = false
@lock = Mutex.new
@@ -191,8 +192,8 @@ def promote_new_master(node = nil)
delete_path(redis_nodes_path)
@master = nil
- # make a specific node or slave the new master
- candidate = node || @slaves.pop
+ # make a specific node or selected candidate the new master
+ candidate = node || @failover_strategy.find_candidate(current_node_snapshots.values)
unless candidate
logger.error('Failed to promote a new master, no candidate available.')
return
@@ -535,7 +536,10 @@ def wait_until_master
with_lock do
@master_manager = true
- logger.info("Acquired master Node Manager lock. Using strategy #{strategy_name}")
+ logger.info('Acquired master Node Manager lock.')
+ logger.info("Configured node strategy #{@node_strategy.class}")
+ logger.info("Configured failover strategy #{@failover_strategy.class}")
+
# Re-discover nodes, since the state of the world may have been changed
# by the time we've become the primary node manager.
discover_nodes
@@ -617,11 +621,6 @@ def running?
!@shutdown
end
- # @return [String] the name of the node strategy
- def strategy_name
- @node_strategy.class.to_s.split('::').last.upcase
- end
-
# @return [String] a stringified version of redis nodes
def stringify_nodes(nodes)
"(#{nodes.map(&:to_s).join(', ')})"
View
11 lib/redis_failover/node_snapshot.rb
@@ -42,6 +42,17 @@ def unavailable_count
@unavailable.size
end
+ # @return [Integer] the average available latency
+ def avg_latency
+ return if @available.empty?
+ @available.values.inject(0) { |sum, n| sum + n } / @available.size
+ end
+
+ # @return [Array<String>] all node managers involved in this snapshot
+ def node_managers
+ (@available.keys + @unavailable).uniq
+ end
+
# @return [Boolean] true if all node managers indicated that this
# node was viewable
def all_available?
View
2 lib/redis_failover/node_strategy.rb
@@ -9,7 +9,7 @@ def self.for(name)
require "redis_failover/node_strategy/#{name.downcase}"
const_get(name.capitalize).new
rescue LoadError, NameError
- raise "Unknown redis failover node strategy: #{name}"
+ raise "Failed to find node strategy: #{name}"
end
end
end
View
41 spec/failover_strategy/latency_spec.rb
@@ -0,0 +1,41 @@
+require 'spec_helper'
+
+module RedisFailover
+ module FailoverStrategy
+ FailoverStrategy.for(:latency)
+
+ describe Latency do
+ let(:snapshot) { NodeSnapshot.new(Node.new(:host => 'localhost', :port => '123')) }
+
+ describe '#find_candidate' do
+ it 'returns only candidates seen by all node managers' do
+ strategy = FailoverStrategy.for(:latency)
+ snapshot_1 = NodeSnapshot.new(Node.new(:host => 'localhost', :port => '123'))
+ snapshot_1.viewable_by('nm1', 0)
+ snapshot_1.unviewable_by('nm2')
+
+ snapshot_2 = NodeSnapshot.new(Node.new(:host => 'localhost', :port => '456'))
+ snapshot_2.viewable_by('nm2', 0)
+ snapshot_2.unviewable_by('nm1')
+
+ strategy.find_candidate([snapshot_1, snapshot_2]).should be_nil
+ end
+
+ it 'returns the candidate with the lowest average latency' do
+ strategy = FailoverStrategy.for(:latency)
+ snapshot_1 = NodeSnapshot.new(Node.new(:host => 'localhost', :port => '123'))
+ snapshot_1.viewable_by('nm1', 5)
+ snapshot_1.viewable_by('nm2', 4)
+ snapshot_1.viewable_by('nm3', 3)
+
+ snapshot_2 = NodeSnapshot.new(Node.new(:host => 'localhost', :port => '456'))
+ snapshot_2.viewable_by('nm1', 1)
+ snapshot_2.viewable_by('nm2', 1)
+ snapshot_2.viewable_by('nm3', 2)
+
+ strategy.find_candidate([snapshot_1, snapshot_2]).should == snapshot_2.node
+ end
+ end
+ end
+ end
+end
View
17 spec/failover_strategy_spec.rb
@@ -0,0 +1,17 @@
+require 'spec_helper'
+
+module RedisFailover
+ describe FailoverStrategy do
+
+ describe '.for' do
+ it 'creates a new latency strategy instance' do
+ s = FailoverStrategy.for('latency')
+ s.should be_a RedisFailover::FailoverStrategy::Latency
+ end
+
+ it 'rejects unknown strategies' do
+ expect { FailoverStrategy.for('foobar') }.to raise_error(RuntimeError)
+ end
+ end
+ end
+end
View
24 spec/support/node_manager_stub.rb
@@ -17,6 +17,11 @@ def discover_nodes
@unavailable = []
@master = master
@slaves = [slave]
+ @failover_strategy = Object.new
+ slaves = @slaves
+ @failover_strategy.define_singleton_method(:find_candidate) do |*args|
+ slaves.pop
+ end
@nodes_discovered = true
end
@@ -40,23 +45,35 @@ def stop_processing
def force_unavailable(node)
start_processing
node.redis.make_unavailable!
- snapshot = OpenStruct.new(:available_count => 0, :unavailable_count => 1)
+ snapshot = OpenStruct.new(
+ :node => node,
+ :available_count => 0,
+ :unavailable_count => 1,
+ :node_managers => ['nm'])
update_master_state(node, snapshot)
stop_processing
end
def force_available(node)
start_processing
node.redis.make_available!
- snapshot = OpenStruct.new(:available_count => 1, :unavailable_count => 0)
+ snapshot = OpenStruct.new(
+ :node => node,
+ :available_count => 1,
+ :unavailable_count => 0,
+ :node_managers => ['nm'])
update_master_state(node, snapshot)
stop_processing
end
def force_syncing(node, serve_stale_reads)
start_processing
node.redis.force_sync_with_master(serve_stale_reads)
- snapshot = OpenStruct.new(:available_count => 1, :unavailable_count => 0)
+ snapshot = OpenStruct.new(
+ :node => node,
+ :available_count => 1,
+ :unavailable_count => 0,
+ :node_managers => ['nm'])
update_master_state(node, snapshot)
stop_processing
end
@@ -65,5 +82,6 @@ def delete_path(*args); end
def create_path(*args); end
def write_state(*args); end
def wait_until_master; end
+ def current_node_snapshots; {} end
end
end

0 comments on commit cf4ce54

Please sign in to comment.
Something went wrong with that request. Please try again.