From 5bdb9e8c55cc4cfbd311eda9f12ab0505954ccd9 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Wed, 21 Feb 2024 13:52:23 +1100 Subject: [PATCH] Implement #watch and #multi specially for cluster-client This PR makes watch & multi work more or less the same way for clustering as they do for normal redis. Since it's supposed to be valid to perform your multi call on the original redis object, like this: ``` redis.watch('key') do redis.multi do |tx| # tx is performed on the same connection as the watch end end ``` we need to keeps some state in an ivar @active_watcher so we know to call MULTI on the same actual connection as WATCH (and appropriately fail if the keys got redirected or the node went down). This is technically threadsafe, because the watch/multi implementation is wrapped in the `synchronize` monitor; however, for good performance in multithreaded environments, you will most likely want to use a connection pool of Redis::Cluster instances. --- cluster/lib/redis/cluster.rb | 56 +++++++++++++++ cluster/lib/redis/cluster/client.rb | 4 ++ cluster/test/client_transactions_test.rb | 69 +++++++++++++++++++ cluster/test/commands_on_transactions_test.rb | 6 +- 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/cluster/lib/redis/cluster.rb b/cluster/lib/redis/cluster.rb index 58c775f56..1309d7892 100644 --- a/cluster/lib/redis/cluster.rb +++ b/cluster/lib/redis/cluster.rb @@ -96,6 +96,62 @@ def cluster(subcommand, *args) send_command([:cluster, subcommand] + args, &block) end + # Transactions need different implementations in cluster mode, using purpose-built + # primitives available in redis-cluster-client. These methods (watch and multii + # implement the same interface as the methods in ::Redis::Commands::Transactions. + + def watch(*keys) + synchronize do |client| + # client is a ::Redis::Cluster::Client instance, which is a subclass of + # ::RedisClient::Cluster + + if @active_watcher + # We're already within a #watch block, just add keys to the existing watch + @active_watcher.watch(keys) + else + unless block_given? + raise ArgumentError, "#{self.class.name} requires that the initial #watch call of a transaction " \ + "passes a block" + end + + client.watch(keys) do |watcher| + @active_watcher = watcher + yield self + ensure + @active_watcher = nil + end + + end + end + end + + def multi + synchronize do |client| + if @active_watcher + # If we're inside a #watch block, use that to execute the transaction + @active_watcher.multi do |tx| + yield MultiConnection.new(tx) + end + else + # Make a new transaction from whole cloth. + client.multi do |tx| + yield MultiConnection.new(tx) + end + end + end + end + + def unwatch + synchronize do + if @active_watcher + @active_watcher.unwatch + else + # This will raise an AmbiguiousNodeError + super + end + end + end + private def initialize_client(options) diff --git a/cluster/lib/redis/cluster/client.rb b/cluster/lib/redis/cluster/client.rb index a14845c2b..e68d07703 100644 --- a/cluster/lib/redis/cluster/client.rb +++ b/cluster/lib/redis/cluster/client.rb @@ -98,6 +98,10 @@ def multi(watch: nil, &block) handle_errors { super(watch: watch, &block) } end + def watch(keys, &block) + handle_errors { super(keys, &block) } + end + private def handle_errors diff --git a/cluster/test/client_transactions_test.rb b/cluster/test/client_transactions_test.rb index b60a8bc4d..bd6f002e1 100644 --- a/cluster/test/client_transactions_test.rb +++ b/cluster/test/client_transactions_test.rb @@ -48,4 +48,73 @@ def test_cluster_client_does_not_support_transaction_by_multiple_keys assert_nil(redis.get("key#{i}")) end end + + def test_cluster_client_does_support_transaction_with_optimistic_locking + redis.mset('{key}1', '1', '{key}2', '2') + + another = Fiber.new do + cli = build_another_client + cli.mset('{key}1', '3', '{key}2', '4') + cli.close + end + + redis.watch('{key}1', '{key}2') do + another.resume + v1 = redis.get('{key}1') + v2 = redis.get('{key}2') + redis.multi do |tx| + tx.set('{key}1', v2) + tx.set('{key}2', v1) + end + end + + assert_equal %w[3 4], redis.mget('{key}1', '{key}2') + end + + def test_cluster_client_can_unwatch_transaction + redis.set('key1', 'initial_value') + + another = Fiber.new do + cli = build_another_client + cli.set('key1', 'another_value') + end + + redis.watch('key1') do + another.resume + redis.unwatch + end + # After calling unwatch, the same connection can be used to open a transaction which + # isn't conditional and so will commit + got = redis.multi do |tx| + tx.set('key1', 'final_value') + end + + assert_equal ['OK'], got + assert_equal 'final_value', redis.get('key1') + end + + def test_cluster_client_unwatches_on_exception + redis.set('key1', 'initial_value') + + another = Fiber.new do + cli = build_another_client + cli.set('key1', 'another_value') + end + + assert_raises(RuntimeError) do + redis.watch('key1') do + another.resume + raise 'bang' + end + end + # After catching the exception, the same connection can be used to open a transaction which + # isn't conditional and so will commit + # n.b. the actual behaviour which ensures this is actually in redis-cluster-client + got = redis.multi do |tx| + tx.set('key1', 'final_value') + end + + assert_equal ['OK'], got + assert_equal 'final_value', redis.get('key1') + end end diff --git a/cluster/test/commands_on_transactions_test.rb b/cluster/test/commands_on_transactions_test.rb index 7935ba4a8..bf2f2e554 100644 --- a/cluster/test/commands_on_transactions_test.rb +++ b/cluster/test/commands_on_transactions_test.rb @@ -38,10 +38,8 @@ def test_unwatch end def test_watch - assert_raises(Redis::CommandError, "CROSSSLOT Keys in request don't hash to the same slot") do - redis.watch('key1', 'key2') + assert_raises(Redis::Cluster::TransactionConsistencyError) do + redis.watch('key1', 'key2') {} end - - assert_equal 'OK', redis.watch('{key}1', '{key}2') end end