Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delay initialization to the first query #364

Merged
merged 4 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 37 additions & 24 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,47 @@ class Cluster
def initialize(config, pool: nil, concurrency: nil, **kwargs)
@config = config
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
@router = ::RedisClient::Cluster::Router.new(config, @concurrent_worker, pool: pool, **kwargs)
@command_builder = config.command_builder

@pool = pool
@kwargs = kwargs
@router = nil
@mutex = Mutex.new
end

def inspect
"#<#{self.class.name} #{@router.node_keys.join(', ')}>"
node_keys = @router.nil? ? @config.startup_nodes.keys : router.node_keys
"#<#{self.class.name} #{node_keys.join(', ')}>"
end

def call(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:call_v, command, &block)
router.send_command(:call_v, command, &block)
end

def call_v(command, &block)
command = @command_builder.generate(command)
@router.send_command(:call_v, command, &block)
router.send_command(:call_v, command, &block)
end

def call_once(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:call_once_v, command, &block)
router.send_command(:call_once_v, command, &block)
end

def call_once_v(command, &block)
command = @command_builder.generate(command)
@router.send_command(:call_once_v, command, &block)
router.send_command(:call_once_v, command, &block)
end

def blocking_call(timeout, *args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:blocking_call_v, command, timeout, &block)
router.send_command(:blocking_call_v, command, timeout, &block)
end

def blocking_call_v(timeout, command, &block)
command = @command_builder.generate(command)
@router.send_command(:blocking_call_v, command, timeout, &block)
router.send_command(:blocking_call_v, command, timeout, &block)
end

def scan(*args, **kwargs, &block)
Expand All @@ -60,31 +65,31 @@ def scan(*args, **kwargs, &block)
seed = Random.new_seed
cursor = ZERO_CURSOR_FOR_SCAN
loop do
cursor, keys = @router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
keys.each(&block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end

def sscan(key, *args, **kwargs, &block)
node = @router.assign_node(['SSCAN', key])
@router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
node = router.assign_node(['SSCAN', key])
router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
end

def hscan(key, *args, **kwargs, &block)
node = @router.assign_node(['HSCAN', key])
@router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
node = router.assign_node(['HSCAN', key])
router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
end

def zscan(key, *args, **kwargs, &block)
node = @router.assign_node(['ZSCAN', key])
@router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
node = router.assign_node(['ZSCAN', key])
router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
end

def pipelined(exception: true)
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
pipeline = ::RedisClient::Cluster::Pipeline.new(
@router,
router,
@command_builder,
@concurrent_worker,
exception: exception,
Expand All @@ -99,48 +104,56 @@ def pipelined(exception: true)

def multi(watch: nil)
if watch.nil? || watch.empty?
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
transaction = ::RedisClient::Cluster::Transaction.new(router, @command_builder)
yield transaction
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
::RedisClient::Cluster::OptimisticLocking.new(router).watch(watch) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, slot: slot, asking: asking
router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
end
end

def pubsub
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
::RedisClient::Cluster::PubSub.new(router, @command_builder)
end

def with(...)
raise NotImplementedError, 'No way to use'
end

def close
@router&.close
@concurrent_worker.close
@router.close
nil
end

private

def router
return @router unless @router.nil?

@mutex.synchronize do
@router ||= ::RedisClient::Cluster::Router.new(@config, @concurrent_worker, pool: @pool, **@kwargs)
end
end

def method_missing(name, *args, **kwargs, &block)
if @router.command_exists?(name)
if router.command_exists?(name)
args.unshift(name)
command = @command_builder.generate(args, kwargs)
return @router.send_command(:call_v, command, &block)
return router.send_command(:call_v, command, &block)
end

super
end

def respond_to_missing?(name, include_private = false)
return true if @router.command_exists?(name)
return true if router.command_exists?(name)

super
end
Expand Down
7 changes: 7 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,13 @@ def test_only_reshards_own_errors
client2.close
end

def test_initialization_delayed
config = ::RedisClient::ClusterConfig.new(nodes: 'redis://127.0.0.1:11211')
client = ::RedisClient::Cluster.new(config)
assert_instance_of(::RedisClient::Cluster, client)
assert_raises(RedisClient::Cluster::InitialSetupError) { client.call('PING') }
end

private

def wait_for_replication
Expand Down
1 change: 1 addition & 0 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def setup
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
@controller = ClusterController.new(
TEST_NODE_URIS,
replica_size: TEST_REPLICA_SIZE,
Expand Down
1 change: 1 addition & 0 deletions test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def setup
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
end

def teardown
Expand Down
1 change: 1 addition & 0 deletions test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def setup
)
@controller.rebuild
@client = new_test_client
@client.call('echo', 'init')
end

def teardown
Expand Down