Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
287 lines (248 sloc) 8.72 KB
# frozen_string_literal: true
require_relative 'errors'
require_relative 'client'
require_relative 'cluster/command'
require_relative 'cluster/command_loader'
require_relative 'cluster/key_slot_converter'
require_relative 'cluster/node'
require_relative 'cluster/node_key'
require_relative 'cluster/node_loader'
require_relative 'cluster/option'
require_relative 'cluster/slot'
require_relative 'cluster/slot_loader'
class Redis
# Redis Cluster client
#
# @see https://github.com/antirez/redis-rb-cluster POC implementation
# @see https://redis.io/topics/cluster-spec Redis Cluster specification
# @see https://redis.io/topics/cluster-tutorial Redis Cluster tutorial
#
# Copyright (C) 2013 Salvatore Sanfilippo <antirez@gmail.com>
class Cluster
def initialize(options = {})
@option = Option.new(options)
@node, @slot = fetch_cluster_info!(@option)
@command = fetch_command_details(@node)
end
def id
@node.map(&:id).sort.join(' ')
end
# db feature is disabled in cluster mode
def db
0
end
# db feature is disabled in cluster mode
def db=(_db); end
def timeout
@node.first.timeout
end
def connected?
@node.any?(&:connected?)
end
def disconnect
@node.each(&:disconnect)
true
end
def connection_info
@node.sort_by(&:id).map do |client|
{
host: client.host,
port: client.port,
db: client.db,
id: client.id,
location: client.location
}
end
end
def with_reconnect(val = true, &block)
try_send(@node.sample, :with_reconnect, val, &block)
end
def call(command, &block)
send_command(command, &block)
end
def call_loop(command, timeout = 0, &block)
node = assign_node(command)
try_send(node, :call_loop, command, timeout, &block)
end
def call_pipeline(pipeline)
node_keys, command_keys = extract_keys_in_pipeline(pipeline)
raise CrossSlotPipeliningError, command_keys if node_keys.size > 1
node = find_node(node_keys.first)
try_send(node, :call_pipeline, pipeline)
end
def call_with_timeout(command, timeout, &block)
node = assign_node(command)
try_send(node, :call_with_timeout, command, timeout, &block)
end
def call_without_timeout(command, &block)
call_with_timeout(command, 0, &block)
end
def process(commands, &block)
if commands.size == 1 &&
%w[unsubscribe punsubscribe].include?(commands.first.first.to_s.downcase) &&
commands.first.size == 1
# Node is indeterminate. We do just a best-effort try here.
@node.process_all(commands, &block)
else
node = assign_node(commands.first)
try_send(node, :process, commands, &block)
end
end
private
def fetch_cluster_info!(option)
node = Node.new(option.per_node_key)
available_slots = SlotLoader.load(node)
node_flags = NodeLoader.load_flags(node)
available_node_urls = NodeKey.to_node_urls(available_slots.keys, secure: option.secure?)
option.update_node(available_node_urls)
[Node.new(option.per_node_key, node_flags, option.use_replica?),
Slot.new(available_slots, node_flags, option.use_replica?)]
ensure
node.map(&:disconnect)
end
def fetch_command_details(nodes)
details = CommandLoader.load(nodes)
Command.new(details)
end
def send_command(command, &block)
cmd = command.first.to_s.downcase
case cmd
when 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
@node.call_all(command, &block).first
when 'flushall', 'flushdb'
@node.call_master(command, &block).first
when 'wait' then @node.call_master(command, &block).reduce(:+)
when 'keys' then @node.call_slave(command, &block).flatten.sort
when 'dbsize' then @node.call_slave(command, &block).reduce(:+)
when 'lastsave' then @node.call_all(command, &block).sort
when 'role' then @node.call_all(command, &block)
when 'config' then send_config_command(command, &block)
when 'client' then send_client_command(command, &block)
when 'cluster' then send_cluster_command(command, &block)
when 'readonly', 'readwrite', 'shutdown'
raise OrchestrationCommandNotSupported, cmd
when 'memory' then send_memory_command(command, &block)
when 'script' then send_script_command(command, &block)
when 'pubsub' then send_pubsub_command(command, &block)
when 'discard', 'exec', 'multi', 'unwatch'
raise AmbiguousNodeError, cmd
else
node = assign_node(command)
try_send(node, :call, command, &block)
end
end
def send_config_command(command, &block)
case command[1].to_s.downcase
when 'resetstat', 'rewrite', 'set'
@node.call_all(command, &block).first
else assign_node(command).call(command, &block)
end
end
def send_memory_command(command, &block)
case command[1].to_s.downcase
when 'stats' then @node.call_all(command, &block)
when 'purge' then @node.call_all(command, &block).first
else assign_node(command).call(command, &block)
end
end
def send_client_command(command, &block)
case command[1].to_s.downcase
when 'list' then @node.call_all(command, &block).flatten
when 'pause', 'reply', 'setname'
@node.call_all(command, &block).first
else assign_node(command).call(command, &block)
end
end
def send_cluster_command(command, &block)
subcommand = command[1].to_s.downcase
case subcommand
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
'reset', 'set-config-epoch', 'setslot'
raise OrchestrationCommandNotSupported, 'cluster', subcommand
when 'saveconfig' then @node.call_all(command, &block).first
else assign_node(command).call(command, &block)
end
end
def send_script_command(command, &block)
case command[1].to_s.downcase
when 'debug', 'kill'
@node.call_all(command, &block).first
when 'flush', 'load'
@node.call_master(command, &block).first
else assign_node(command).call(command, &block)
end
end
def send_pubsub_command(command, &block)
case command[1].to_s.downcase
when 'channels' then @node.call_all(command, &block).flatten.uniq.sort
when 'numsub'
@node.call_all(command, &block).reject(&:empty?).map { |e| Hash[*e] }
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }
when 'numpat' then @node.call_all(command, &block).reduce(:+)
else assign_node(command).call(command, &block)
end
end
# @see https://redis.io/topics/cluster-spec#redirection-and-resharding
# Redirection and resharding
def try_send(node, method_name, *args, retry_count: 3, &block)
node.public_send(method_name, *args, &block)
rescue CommandError => err
if err.message.start_with?('MOVED')
assign_redirection_node(err.message).public_send(method_name, *args, &block)
elsif err.message.start_with?('ASK')
raise if retry_count <= 0
node = assign_asking_node(err.message)
node.call(%i[asking])
retry_count -= 1
retry
else
raise
end
end
def assign_redirection_node(err_msg)
_, slot, node_key = err_msg.split(' ')
slot = slot.to_i
@slot.put(slot, node_key)
find_node(node_key)
end
def assign_asking_node(err_msg)
_, _, node_key = err_msg.split(' ')
find_node(node_key)
end
def assign_node(command)
node_key = find_node_key(command)
find_node(node_key)
end
def find_node_key(command)
key = @command.extract_first_key(command)
return if key.empty?
slot = KeySlotConverter.convert(key)
return unless @slot.exists?(slot)
if @command.should_send_to_master?(command)
@slot.find_node_key_of_master(slot)
else
@slot.find_node_key_of_slave(slot)
end
end
def find_node(node_key)
return @node.sample if node_key.nil?
@node.find_by(node_key)
rescue Node::ReloadNeeded
update_cluster_info!(node_key)
@node.find_by(node_key)
end
def update_cluster_info!(node_key = nil)
unless node_key.nil?
host, port = NodeKey.split(node_key)
@option.add_node(host, port)
end
@node.map(&:disconnect)
@node, @slot = fetch_cluster_info!(@option)
end
def extract_keys_in_pipeline(pipeline)
node_keys = pipeline.commands.map { |cmd| find_node_key(cmd) }.compact.uniq
command_keys = pipeline.commands.map { |cmd| @command.extract_first_key(cmd) }.reject(&:empty?)
[node_keys, command_keys]
end
end
end
You can’t perform that action at this time.