Skip to content

Commit

Permalink
Assign a new node after calling update_cluster_info!
Browse files Browse the repository at this point in the history
If we catch a connection error and refresh the cluster topology, we need
to re-calculate what node to send the command to in the router; the node
we're using might not even be a valid node any longer.
  • Loading branch information
KJTsanaktsidis committed Mar 5, 2024
1 parent 3a7b7dd commit 60bb509
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
1 change: 0 additions & 1 deletion lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def close

private


# This API is called by redis-clustering/redis-rb, but requries further refinement before we commit
# to making it part of redis-cluster-client's official public API.
def watch(keys)
Expand Down
17 changes: 13 additions & 4 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def start_watch(keys)

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(@slot)
handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(retry_count: 1) do |nd|
nd.with do |c|
c.ensure_connected_cluster_scoped(retryable: false) do
@conn = c
Expand All @@ -72,10 +71,20 @@ def add_to_watch(keys)
@conn.call('WATCH', *keys)
end

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
def handle_redirection(retry_count: 1, &blk)
node = @router.find_primary_node_by_slot(@slot)
times_block_executed = 0
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
times_block_executed = 1
handle_asking_once(nd, &blk)
end
rescue ::RedisClient::ConnectionError
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
retry_count -= [times_block_executed, 1].min
raise if retry_count < 0

retry
end

def handle_asking_once(node)
Expand Down
20 changes: 16 additions & 4 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi

# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, command, retry_count: retry_count) do |on_node|
if args.empty?
# prevent memory allocation for variable-length args
on_node.public_send(method, command, &block)
Expand All @@ -86,12 +86,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
Expand Down Expand Up @@ -120,7 +120,19 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me

update_cluster_info!

raise if retry_count <= 0
# if command.nil?, then we don't have a way to know what node this command should be done on
# under the new topology. It might be e.g. part of a transaction which needs to be retried at a
# higher level.
raise if retry_count <= 0 || command.nil?

# Find the node to use for this command - if this fails for some reason, though, re-raise
# the original connection error.
node = begin
find_node(find_node_key(command), retry_count: 0)
rescue StandardError
nil
end
raise unless node

retry_count -= 1
retry
Expand Down

0 comments on commit 60bb509

Please sign in to comment.