diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index dc0fe7e..c2b58d0 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -125,7 +125,6 @@ def close private - # This API is called by redis-clustering/redis-rb, but requries further refinement before we commit # to making it part of redis-cluster-client's official public API. def watch(keys) diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index cfdfa2b..d818309 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -44,8 +44,7 @@ def start_watch(keys) # We have not yet selected a node for this transaction, initially, which means we can handle # redirections freely initially (i.e. for the first WATCH call) - node = @router.find_primary_node_by_slot(@slot) - handle_redirection(node, retry_count: 1) do |nd| + handle_redirection(retry_count: 1) do |nd| nd.with do |c| c.ensure_connected_cluster_scoped(retryable: false) do @conn = c @@ -72,10 +71,20 @@ def add_to_watch(keys) @conn.call('WATCH', *keys) end - def handle_redirection(node, retry_count: 1, &blk) - @router.handle_redirection(node, retry_count: retry_count) do |nd| + def handle_redirection(retry_count: 1, &blk) + node = @router.find_primary_node_by_slot(@slot) + times_block_executed = 0 + @router.handle_redirection(node, nil, retry_count: retry_count) do |nd| + times_block_executed = 1 handle_asking_once(nd, &blk) end + rescue ::RedisClient::ConnectionError + # Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining + # _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block. + retry_count -= [times_block_executed, 1].min + raise if retry_count < 0 + + retry end def handle_asking_once(node) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 898d3f3..e531cff 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -75,7 +75,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi # @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding def try_send(node, method, command, args, retry_count: 3, &block) - handle_redirection(node, retry_count: retry_count) do |on_node| + handle_redirection(node, command, retry_count: retry_count) do |on_node| if args.empty? # prevent memory allocation for variable-length args on_node.public_send(method, command, &block) @@ -86,12 +86,12 @@ def try_send(node, method, command, args, retry_count: 3, &block) end def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) - handle_redirection(node, retry_count: retry_count) do |on_node| + handle_redirection(node, nil, retry_count: retry_count) do |on_node| on_node.public_send(method, *args, **kwargs, &block) end end - def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity yield node rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise @@ -120,7 +120,19 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me update_cluster_info! - raise if retry_count <= 0 + # if command.nil?, then we don't have a way to know what node this command should be done on + # under the new topology. It might be e.g. part of a transaction which needs to be retried at a + # higher level. + raise if retry_count <= 0 || command.nil? + + # Find the node to use for this command - if this fails for some reason, though, re-raise + # the original connection error. + node = begin + find_node(find_node_key(command), retry_count: 0) + rescue StandardError + nil + end + raise unless node retry_count -= 1 retry