Skip to content

Commit

Permalink
fix: use the slot for the validation in the transaction (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Feb 18, 2024
1 parent 9944228 commit 966ae6c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 42 deletions.
4 changes: 2 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def multi(watch: nil)
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, resharding|
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, resharding: resharding
@router, @command_builder, node: c, slot: slot
)
yield transaction
transaction.execute
Expand Down
37 changes: 10 additions & 27 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/key_slot_converter'
require 'redis_client/cluster/transaction'

class RedisClient
Expand All @@ -12,15 +11,14 @@ def initialize(router)
end

def watch(keys)
ensure_safe_keys(keys)
node = find_node(keys)
cnt = 0 # We assume redirects occurred when incrementing it.
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

node = @router.find_primary_node_by_slot(slot)
@router.handle_redirection(node, retry_count: 1) do |nd|
cnt += 1
nd.with do |c|
c.call('WATCH', *keys)
reply = yield(c, cnt > 1)
reply = yield(c, slot)
c.call('UNWATCH')
reply
end
Expand All @@ -29,29 +27,14 @@ def watch(keys)

private

def ensure_safe_keys(keys)
return if safe?(keys)
def find_slot(keys)
return if keys.empty?
return if keys.any? { |k| k.nil? || k.empty? }

raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}"
end

def safe?(keys)
return false if keys.empty?

slots = keys.map do |k|
return false if k.nil? || k.empty?

::RedisClient::Cluster::KeySlotConverter.convert(k)
end

slots.uniq.size == 1
end

def find_node(keys)
node_key = @router.find_primary_node_key(['WATCH', *keys])
return @router.find_node(node_key) unless node_key.nil?
slots = keys.map { |k| @router.find_slot_by_key(k) }
return if slots.uniq.size != 1

raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node"
slots.first
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ def find_primary_node_key(command)
find_node_key_by_key(key, primary: true)
end

def find_slot(command)
find_slot_by_key(@command.extract_first_key(command))
end

def find_slot_by_key(key)
return if key.empty?

::RedisClient::Cluster::KeySlotConverter.convert(key)
end

def find_node(node_key, retry_count: 3)
@node.find_by(node_key)
rescue ::RedisClient::Cluster::Node::ReloadNeeded
Expand Down
23 changes: 10 additions & 13 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@

require 'redis_client'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/node_key'

class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
MAX_REDIRECTION = 2

def initialize(router, command_builder, node: nil, resharding: false)
def initialize(router, command_builder, node: nil, slot: nil)
@router = router
@command_builder = command_builder
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@pending_commands = []
@node = node
prepare_tx unless @node.nil?
@resharding_state = resharding
@watching_slot = slot
end

def call(*command, **kwargs, &block)
Expand Down Expand Up @@ -111,7 +110,7 @@ def send_pipeline(client, redirect:)
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(client, commands, e, redirect: redirect) unless redirect.zero?
return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero?

raise
end
Expand Down Expand Up @@ -140,28 +139,26 @@ def coerce_results!(results, offset: 1)
results
end

def handle_command_error!(client, commands, err, redirect:) # rubocop:disable Metrics/AbcSize
def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED')
ensure_the_same_node!(client, commands)
ensure_the_same_slot!(commands)
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
elsif err.message.start_with?('ASK')
ensure_the_same_node!(client, commands)
ensure_the_same_slot!(commands)
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
else
raise err
end
end

def ensure_the_same_node!(client, commands)
node_keys = commands.map { |command| @router.find_primary_node_key(command) }.compact.uniq
expected_node_key = ::RedisClient::Cluster::NodeKey.build_from_client(client)

return if !@resharding_state && node_keys.size == 1 && node_keys.first == expected_node_key
return if @resharding_state && node_keys.size == 1
def ensure_the_same_slot!(commands)
slots = commands.map { |command| @router.find_slot(command) }.compact.uniq
return if slots.size == 1 && @watching_slot.nil?
return if slots.size == 1 && @watching_slot == slots.first

raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}")
end
Expand Down

0 comments on commit 966ae6c

Please sign in to comment.