diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 7158b47..9d94552 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -16,42 +16,47 @@ class Cluster def initialize(config, pool: nil, concurrency: nil, **kwargs) @config = config @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {})) - @router = ::RedisClient::Cluster::Router.new(config, @concurrent_worker, pool: pool, **kwargs) @command_builder = config.command_builder + + @pool = pool + @kwargs = kwargs + @router = nil + @mutex = Mutex.new end def inspect - "#<#{self.class.name} #{@router.node_keys.join(', ')}>" + node_keys = @router.nil? ? @config.startup_nodes.keys : router.node_keys + "#<#{self.class.name} #{node_keys.join(', ')}>" end def call(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:call_v, command, &block) + router.send_command(:call_v, command, &block) end def call_v(command, &block) command = @command_builder.generate(command) - @router.send_command(:call_v, command, &block) + router.send_command(:call_v, command, &block) end def call_once(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:call_once_v, command, &block) + router.send_command(:call_once_v, command, &block) end def call_once_v(command, &block) command = @command_builder.generate(command) - @router.send_command(:call_once_v, command, &block) + router.send_command(:call_once_v, command, &block) end def blocking_call(timeout, *args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:blocking_call_v, command, timeout, &block) + router.send_command(:blocking_call_v, command, timeout, &block) end def blocking_call_v(timeout, command, &block) command = @command_builder.generate(command) - @router.send_command(:blocking_call_v, command, timeout, &block) + router.send_command(:blocking_call_v, command, timeout, &block) end def scan(*args, **kwargs, &block) @@ -60,31 +65,31 @@ def scan(*args, **kwargs, &block) seed = Random.new_seed cursor = ZERO_CURSOR_FOR_SCAN loop do - cursor, keys = @router.scan('SCAN', cursor, *args, seed: seed, **kwargs) + cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs) keys.each(&block) break if cursor == ZERO_CURSOR_FOR_SCAN end end def sscan(key, *args, **kwargs, &block) - node = @router.assign_node(['SSCAN', key]) - @router.try_delegate(node, :sscan, key, *args, **kwargs, &block) + node = router.assign_node(['SSCAN', key]) + router.try_delegate(node, :sscan, key, *args, **kwargs, &block) end def hscan(key, *args, **kwargs, &block) - node = @router.assign_node(['HSCAN', key]) - @router.try_delegate(node, :hscan, key, *args, **kwargs, &block) + node = router.assign_node(['HSCAN', key]) + router.try_delegate(node, :hscan, key, *args, **kwargs, &block) end def zscan(key, *args, **kwargs, &block) - node = @router.assign_node(['ZSCAN', key]) - @router.try_delegate(node, :zscan, key, *args, **kwargs, &block) + node = router.assign_node(['ZSCAN', key]) + router.try_delegate(node, :zscan, key, *args, **kwargs, &block) end def pipelined(exception: true) seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed pipeline = ::RedisClient::Cluster::Pipeline.new( - @router, + router, @command_builder, @concurrent_worker, exception: exception, @@ -99,14 +104,14 @@ def pipelined(exception: true) def multi(watch: nil) if watch.nil? || watch.empty? - transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder) + transaction = ::RedisClient::Cluster::Transaction.new(router, @command_builder) yield transaction return transaction.execute end - ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking| + ::RedisClient::Cluster::OptimisticLocking.new(router).watch(watch) do |c, slot, asking| transaction = ::RedisClient::Cluster::Transaction.new( - @router, @command_builder, node: c, slot: slot, asking: asking + router, @command_builder, node: c, slot: slot, asking: asking ) yield transaction transaction.execute @@ -114,7 +119,7 @@ def multi(watch: nil) end def pubsub - ::RedisClient::Cluster::PubSub.new(@router, @command_builder) + ::RedisClient::Cluster::PubSub.new(router, @command_builder) end def with(...) @@ -122,25 +127,33 @@ def with(...) end def close + @router&.close @concurrent_worker.close - @router.close nil end private + def router + return @router unless @router.nil? + + @mutex.synchronize do + @router ||= ::RedisClient::Cluster::Router.new(@config, @concurrent_worker, pool: @pool, **@kwargs) + end + end + def method_missing(name, *args, **kwargs, &block) - if @router.command_exists?(name) + if router.command_exists?(name) args.unshift(name) command = @command_builder.generate(args, kwargs) - return @router.send_command(:call_v, command, &block) + return router.send_command(:call_v, command, &block) end super end def respond_to_missing?(name, include_private = false) - return true if @router.command_exists?(name) + return true if router.command_exists?(name) super end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 44aa820..e147b28 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -863,6 +863,13 @@ def test_only_reshards_own_errors client2.close end + def test_initialization_delayed + config = ::RedisClient::ClusterConfig.new(nodes: 'redis://127.0.0.1:11211') + client = ::RedisClient::Cluster.new(config) + assert_instance_of(::RedisClient::Cluster, client) + assert_raises(RedisClient::Cluster::InitialSetupError) { client.call('PING') } + end + private def wait_for_replication diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 9ce25d9..3038b94 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -12,6 +12,7 @@ def setup fixed_hostname: TEST_FIXED_HOSTNAME, **TEST_GENERIC_OPTIONS ).new_client + @client.call('echo', 'init') @controller = ClusterController.new( TEST_NODE_URIS, replica_size: TEST_REPLICA_SIZE, diff --git a/test/test_against_cluster_scale.rb b/test/test_against_cluster_scale.rb index 1bad31c..a0400bb 100644 --- a/test/test_against_cluster_scale.rb +++ b/test/test_against_cluster_scale.rb @@ -16,6 +16,7 @@ def setup fixed_hostname: TEST_FIXED_HOSTNAME, **TEST_GENERIC_OPTIONS ).new_client + @client.call('echo', 'init') end def teardown diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 27ec09b..0db5182 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -14,6 +14,7 @@ def setup ) @controller.rebuild @client = new_test_client + @client.call('echo', 'init') end def teardown