Skip to content

Commit

Permalink
fix: make the error identification middleware optional because of the…
Browse files Browse the repository at this point in the history
… use case (#344)
  • Loading branch information
supercaracal committed Apr 13, 2024
1 parent 8af34d0 commit 5d49a47
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 35 deletions.
8 changes: 7 additions & 1 deletion lib/redis_client/cluster/error_identification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ class RedisClient
class Cluster
module ErrorIdentification
def self.client_owns_error?(err, client)
err.is_a?(TaggedError) && err.from?(client)
return true unless identifiable?(err)

err.from?(client)
end

def self.identifiable?(err)
err.is_a?(TaggedError)
end

module TaggedError
Expand Down
11 changes: 2 additions & 9 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

require 'redis_client'
require 'redis_client/config'
require 'redis_client/cluster/error_identification'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node/primary_only'
require 'redis_client/cluster/node/random_replica'
Expand Down Expand Up @@ -79,11 +78,9 @@ def []=(index, element)
end

class Config < ::RedisClient::Config
def initialize(scale_read: false, middlewares: nil, **kwargs)
def initialize(scale_read: false, **kwargs)
@scale_read = scale_read
middlewares ||= []
middlewares.unshift ErrorIdentification::Middleware
super(middlewares: middlewares, **kwargs)
super(**kwargs)
end

private
Expand Down Expand Up @@ -217,10 +214,6 @@ def reload!
end
end

def owns_error?(err)
any? { |c| ErrorIdentification.client_owns_error?(err, c) }
end

private

def make_topology_class(with_replica, replica_affinity)
Expand Down
9 changes: 6 additions & 3 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require 'redis_client/cluster/normalized_cmd_name'
require 'redis_client/cluster/transaction'
require 'redis_client/cluster/optimistic_locking'
require 'redis_client/cluster/error_identification'

class RedisClient
class Cluster
Expand Down Expand Up @@ -68,7 +69,9 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)

update_cluster_info! if e.errors.values.any? do |err|
@node.owns_error?(err) && err.message.start_with?('CLUSTERDOWN Hash slot not served')
next false if ::RedisClient::Cluster::ErrorIdentification.identifiable?(err) && @node.none? { |c| ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(err, c) }

err.message.start_with?('CLUSTERDOWN Hash slot not served')
end

raise
Expand Down Expand Up @@ -97,7 +100,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
rescue ::RedisClient::CommandError => e
raise unless ErrorIdentification.client_owns_error?(e, node)
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

if e.message.start_with?('MOVED')
node = assign_redirection_node(e.message)
Expand All @@ -117,7 +120,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
end
raise
rescue ::RedisClient::ConnectionError => e
raise unless ErrorIdentification.client_owns_error?(e, node)
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

update_cluster_info!

Expand Down
22 changes: 22 additions & 0 deletions test/redirection_emulation_middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

module RedirectionEmulationMiddleware
Setting = Struct.new(
'RedirectionEmulationMiddlewareSetting',
:slot, :to, :command, keyword_init: true
)

def call(cmd, cfg)
s = cfg.custom.fetch(:redirect)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end

def call_pipelined(cmd, cfg)
s = cfg.custom.fetch(:redirect)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end
end
68 changes: 46 additions & 22 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -739,21 +739,45 @@ def test_circuit_breakers
end

def test_only_reshards_own_errors
@client.call_v(%w[SADD testkey testvalue1])
@client.call_v(%w[SADD testkey testvalue2])
slot = ::RedisClient::Cluster::KeySlotConverter.convert('testkey')
router = @client.instance_variable_get(:@router)
correct_primary_key = router.find_node_key_by_key('testkey', primary: true)
broken_primary_key = (router.node_keys - [correct_primary_key]).first

client1 = new_test_client(
middlewares: [
::RedisClient::Cluster::ErrorIdentification::Middleware
]
)

client2 = new_test_client(
middlewares: [
::RedisClient::Cluster::ErrorIdentification::Middleware,
RedirectionEmulationMiddleware
],
custom: {
redirect: RedirectionEmulationMiddleware::Setting.new(
slot: slot, to: broken_primary_key, command: %w[SET testkey client2]
)
}
)

assert_raises(RedisClient::CommandError) do
@client.sscan('testkey', retry_count: 0) do
raise RedisClient::CommandError, "MOVED #{slot} #{broken_primary_key}"
client1.call('SET', 'testkey', 'client1') do |got|
assert_equal('OK', got)
client2.call('SET', 'testkey', 'client2')
end
end

# The exception should not have causes @client to update its shard mappings, because it didn't
# come from a RedisClient instance that @client knows about.
assert_equal correct_primary_key, router.find_node_key_by_key('testkey', primary: true)
# The exception should not have causes client1 to update its shard mappings, because it didn't
# come from a RedisClient instance that client1 knows about.
assert_equal(
correct_primary_key,
client1.instance_variable_get(:@router).find_node_key_by_key('testkey', primary: true)
)

client1.close
client2.close
end

def test_pinning_single_key
Expand Down Expand Up @@ -832,12 +856,12 @@ def hiredis_used?
class PrimaryOnly < TestingWrapper
include Mixin

def new_test_client(capture_buffer: @captured_commands, **opts)
def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts)
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: capture_buffer },
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
**opts
)
Expand All @@ -848,14 +872,14 @@ def new_test_client(capture_buffer: @captured_commands, **opts)
class ScaleReadRandom < TestingWrapper
include Mixin

def new_test_client(capture_buffer: @captured_commands, **opts)
def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts)
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: capture_buffer },
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
**opts
)
Expand All @@ -866,14 +890,14 @@ def new_test_client(capture_buffer: @captured_commands, **opts)
class ScaleReadRandomWithPrimary < TestingWrapper
include Mixin

def new_test_client(capture_buffer: @captured_commands, **opts)
def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts)
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random_with_primary,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: capture_buffer },
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
**opts
)
Expand All @@ -884,14 +908,14 @@ def new_test_client(capture_buffer: @captured_commands, **opts)
class ScaleReadLatency < TestingWrapper
include Mixin

def new_test_client(capture_buffer: @captured_commands, **opts)
def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts)
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :latency,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: capture_buffer },
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
**opts
)
Expand All @@ -902,12 +926,12 @@ def new_test_client(capture_buffer: @captured_commands, **opts)
class Pooled < TestingWrapper
include Mixin

def new_test_client(capture_buffer: @captured_commands, **opts)
def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts)
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: capture_buffer },
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
**opts
)
Expand Down
1 change: 1 addition & 0 deletions test/testing_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'testing_constants'
require 'cluster_controller'
require 'command_capture_middleware'
require 'redirection_emulation_middleware'

case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby')
when 'hiredis' then require 'hiredis-client'
Expand Down

0 comments on commit 5d49a47

Please sign in to comment.