From c81f86bdbc51a30ab40ed5679d178be384faa00d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 13 Aug 2025 20:29:24 +1200 Subject: [PATCH 01/11] 100% documentation coverage. --- lib/async/redis/endpoint.rb | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/lib/async/redis/endpoint.rb b/lib/async/redis/endpoint.rb index 0ca9596..8cfb31f 100644 --- a/lib/async/redis/endpoint.rb +++ b/lib/async/redis/endpoint.rb @@ -210,6 +210,8 @@ def database end end + # Get the credentials for authentication. + # @returns [Array(String) | Nil] The username and password credentials or nil if not specified. def credentials @options[:credentials] || extract_userinfo(@url.userinfo) end @@ -224,6 +226,8 @@ def credentials end end + # Check if the endpoint is connecting to localhost. + # @returns [Boolean] True if connecting to localhost. def localhost? @url.hostname =~ /^(.*?\.)?localhost\.?$/ end @@ -237,6 +241,8 @@ def ssl_verify_mode end end + # Get the SSL context for secure connections. + # @returns [OpenSSL::SSL::SSLContext] The SSL context configured for this endpoint. def ssl_context @options[:ssl_context] || OpenSSL::SSL::SSLContext.new.tap do |context| context.set_params( @@ -245,6 +251,9 @@ def ssl_context end end + # Build the underlying endpoint with optional SSL wrapping. + # @parameter endpoint [IO::Endpoint] Optional base endpoint to wrap. + # @returns [IO::Endpoint] The built endpoint, potentially wrapped with SSL. def build_endpoint(endpoint = nil) endpoint ||= tcp_endpoint @@ -260,22 +269,33 @@ def build_endpoint(endpoint = nil) return endpoint end + # Get the underlying endpoint, building it if necessary. + # @returns [IO::Endpoint] The underlying endpoint for connections. def endpoint @endpoint ||= build_endpoint end + # Set the underlying endpoint. + # @parameter endpoint [IO::Endpoint] The endpoint to wrap and use. def endpoint=(endpoint) @endpoint = build_endpoint(endpoint) end + # Bind to the endpoint and yield the server socket. + # @parameter arguments [Array] Arguments to pass to the underlying endpoint bind method. + # @yields [IO] The bound server socket. def bind(*arguments, &block) endpoint.bind(*arguments, &block) end + # Connect to the endpoint and yield the client socket. + # @yields [IO] The connected client socket. def connect(&block) endpoint.connect(&block) end + # Iterate over each possible endpoint variation. + # @yields [Endpoint] Each endpoint variant. def each return to_enum unless block_given? @@ -284,14 +304,21 @@ def each end end + # Get the key for hashing and equality comparison. + # @returns [Array] The key components for this endpoint. def key [@url, @options] end + # Check if this endpoint is equal to another. + # @parameter other [Endpoint] The other endpoint to compare with. + # @returns [Boolean] True if the endpoints are equal. def eql? other self.key.eql? other.key end + # Get the hash code for this endpoint. + # @returns [Integer] The hash code based on the endpoint's key. def hash self.key.hash end From b733c2038290f3f78593cb38f3a0328cdae30677 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 13 Aug 2025 21:54:53 +1200 Subject: [PATCH 02/11] WIP --- async-redis.gemspec | 2 +- .../test/async/redis/cluster_subscriptions.rb | 269 ++++++++++++++++ cluster/test/async/redis/context/subscribe.rb | 300 ++++++++++++++++++ gems.rb | 2 + guides/getting-started/readme.md | 35 +- guides/links.yaml | 3 + guides/subscriptions/readme.md | 240 ++++++++++++++ lib/async/redis/client.rb | 38 +++ lib/async/redis/cluster_client.rb | 63 ++++ lib/async/redis/context/subscribe.rb | 34 +- releases.md | 6 + 11 files changed, 959 insertions(+), 33 deletions(-) create mode 100644 cluster/test/async/redis/cluster_subscriptions.rb create mode 100644 cluster/test/async/redis/context/subscribe.rb create mode 100644 guides/subscriptions/readme.md diff --git a/async-redis.gemspec b/async-redis.gemspec index 936c443..bc41697 100644 --- a/async-redis.gemspec +++ b/async-redis.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |spec| "source_code_uri" => "https://github.com/socketry/async-redis.git", } - spec.files = Dir.glob(["{lib}/**/*", "*.md"], File::FNM_DOTMATCH, base: __dir__) + spec.files = Dir.glob(["{context,lib}/**/*", "*.md"], File::FNM_DOTMATCH, base: __dir__) spec.required_ruby_version = ">= 3.2" diff --git a/cluster/test/async/redis/cluster_subscriptions.rb b/cluster/test/async/redis/cluster_subscriptions.rb new file mode 100644 index 0000000..d35ca64 --- /dev/null +++ b/cluster/test/async/redis/cluster_subscriptions.rb @@ -0,0 +1,269 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/redis/cluster_client" +require "sus/fixtures/async" +require "securerandom" + +describe Async::Redis::ClusterClient do + include Sus::Fixtures::Async::ReactorContext + + let(:node_a) {"redis://redis-a:6379"} + let(:node_b) {"redis://redis-b:6379"} + let(:node_c) {"redis://redis-c:6379"} + + let(:endpoints) {[ + Async::Redis::Endpoint.parse(node_a), + Async::Redis::Endpoint.parse(node_b), + Async::Redis::Endpoint.parse(node_c) + ]} + + let(:cluster) {subject.new(endpoints)} + + with "sharded subscriptions" do + let(:shard_channel) {"cluster-shard:test:#{SecureRandom.uuid}"} + let(:shard_message) {"cluster sharded message"} + + it "can subscribe to sharded channels and receive messages" do + received_message = nil + condition = Async::Condition.new + + # Get a client for the sharded channel + slot = cluster.slot_for(shard_channel) + subscriber_client = cluster.client_for(slot) + + # Set up the subscriber + subscriber_task = reactor.async do + subscriber_client.ssubscribe(shard_channel) do |context| + condition.signal # Signal that we're ready + + type, name, message = context.listen + + expect(type).to be == "smessage" + expect(name).to be == shard_channel + received_message = message + end + end + + # Set up the publisher + publisher_task = reactor.async do + condition.wait # Wait for subscriber to be ready + + # For sharded pub/sub, we need to use SPUBLISH + # Get a client (can be any node) to publish the message + publisher_client = cluster.client_for(slot) + + begin + # Try to use SPUBLISH if available (Redis 7.0+) + publisher_client.call("SPUBLISH", shard_channel, shard_message) + rescue => error + # If SPUBLISH is not available, skip this test + Console.warn("SPUBLISH not available, skipping sharded pub/sub test: #{error}") + subscriber_task.stop + return + end + end + + publisher_task.wait + subscriber_task.stop + + expect(received_message).to be == shard_message + end + + it "distributes sharded messages across cluster nodes" do + # This test verifies that sharded pub/sub properly distributes + # messages across different cluster nodes based on channel hash + + channels = [ + "shard:node:a:#{SecureRandom.uuid}", + "shard:node:b:#{SecureRandom.uuid}", + "shard:node:c:#{SecureRandom.uuid}" + ] + + # Find channels that map to different slots/nodes + channel_slots = channels.map {|ch| [ch, cluster.slot_for(ch)]} + unique_slots = channel_slots.map(&:last).uniq + + # We should have channels distributed across different slots + expect(unique_slots.size).to be > 1 + + received_messages = [] + condition = Async::Condition.new + subscriber_count = 0 + target_count = channels.size + + # Set up subscribers for each channel + subscriber_tasks = channels.map do |channel| + reactor.async do + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + client.ssubscribe(channel) do |context| + subscriber_count += 1 + condition.signal if subscriber_count == target_count + + type, name, message = context.listen + received_messages << {channel: name, message: message, slot: slot} + end + end + end + + # Set up publisher + publisher_task = reactor.async do + condition.wait # Wait for all subscribers + + channels.each_with_index do |channel, index| + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + begin + client.call("SPUBLISH", channel, "message-#{index}") + rescue => error + Console.warn("SPUBLISH failed for #{channel}: #{error}") + # Clean up and skip if SPUBLISH not available + subscriber_tasks.each(&:stop) + return + end + end + end + + publisher_task.wait + sleep(0.1) # Allow time for message delivery + subscriber_tasks.each(&:stop) + + # Verify we received messages for different channels + expect(received_messages.size).to be == channels.size + + # Verify messages were distributed to different slots + received_slots = received_messages.map {|msg| msg[:slot]}.uniq + expect(received_slots.size).to be > 1 + end + + it "can mix sharded and regular subscriptions on different nodes" do + regular_channel = "regular:#{SecureRandom.uuid}" + shard_channel = "shard:#{SecureRandom.uuid}" + + regular_slot = cluster.slot_for(regular_channel) + shard_slot = cluster.slot_for(shard_channel) + + received_messages = [] + condition = Async::Condition.new + ready_count = 0 + + # Regular subscription on one node + regular_task = reactor.async do + client = cluster.client_for(regular_slot) + client.subscribe(regular_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Sharded subscription on another node (if different) + shard_task = reactor.async do + client = cluster.client_for(shard_slot) + client.ssubscribe(shard_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait # Wait for both subscribers + + # Publish to regular channel + regular_client = cluster.client_for(regular_slot) + regular_client.publish(regular_channel, "regular message") + + # Publish to sharded channel + shard_client = cluster.client_for(shard_slot) + begin + shard_client.call("SPUBLISH", shard_channel, "sharded message") + rescue => error + Console.warn("SPUBLISH not available: #{error}") + regular_task.stop + shard_task.stop + return + end + end + + publisher_task.wait + sleep(0.1) # Allow time for message delivery + regular_task.stop + shard_task.stop + + # Should have received both messages + expect(received_messages.size).to be == 2 + + # Verify message types + message_types = received_messages.map {|msg| msg[:type]} + expect(message_types).to include("message") # Regular pub/sub + expect(message_types).to include("smessage") # Sharded pub/sub + end + + it "handles sharded subscription on same connection as regular subscription" do + # Test that the unified Subscribe context works in cluster environment + channel = "unified:test:#{SecureRandom.uuid}" + shard_channel = "shard:unified:#{SecureRandom.uuid}" + + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + received_messages = [] + condition = Async::Condition.new + + # Set up unified subscription + subscriber_task = reactor.async do + client.subscribe(channel) do |context| + # Add sharded subscription to same context + context.ssubscribe([shard_channel]) + + condition.signal # Ready to receive + + # Listen for both message types + 2.times do + response = context.listen + received_messages << response + end + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait + + # Publish regular message + client.publish(channel, "unified regular") + + # Publish sharded message + begin + client.call("SPUBLISH", shard_channel, "unified sharded") + rescue => error + Console.warn("SPUBLISH not available: #{error}") + subscriber_task.stop + return + end + end + + publisher_task.wait + sleep(0.1) # Allow message delivery + subscriber_task.stop + + # Should receive both message types on same context + expect(received_messages.size).to be == 2 + + message_types = received_messages.map(&:first) + expect(message_types).to include("message") # Regular + expect(message_types).to include("smessage") # Sharded + end + end +end diff --git a/cluster/test/async/redis/context/subscribe.rb b/cluster/test/async/redis/context/subscribe.rb new file mode 100644 index 0000000..1e19817 --- /dev/null +++ b/cluster/test/async/redis/context/subscribe.rb @@ -0,0 +1,300 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/redis/cluster_client" +require "sus/fixtures/async" +require "securerandom" + +describe Async::Redis::Context::Subscribe do + include Sus::Fixtures::Async::ReactorContext + + with "in cluster environment" do + let(:node_a) {"redis://redis-a:6379"} + let(:node_b) {"redis://redis-b:6379"} + let(:node_c) {"redis://redis-c:6379"} + + let(:endpoints) {[ + Async::Redis::Endpoint.parse(node_a), + Async::Redis::Endpoint.parse(node_b), + Async::Redis::Endpoint.parse(node_c) + ]} + + let(:cluster) {Async::Redis::ClusterClient.new(endpoints)} + + with "sharded subscriptions" do + let(:shard_channel) {"cluster-shard:test:#{SecureRandom.uuid}"} + let(:shard_message) {"cluster sharded message"} + + it "can subscribe to sharded channels and receive messages" do + received_message = nil + condition = Async::Condition.new + spublish_available = false + + # Set up the subscriber using cluster client's ssubscribe method + subscriber_task = reactor.async do + begin + cluster.ssubscribe(shard_channel) do |context| + condition.signal # Signal that we're ready + spublish_available = true + + type, name, message = context.listen + + expect(type).to be == "smessage" + expect(name).to be == shard_channel + received_message = message + end + rescue Protocol::Redis::ServerError => error + if error.message.include?("unknown command") + Console.warn("SSUBSCRIBE not available on this Redis version") + condition.signal + else + raise + end + end + end + + # Set up the publisher + publisher_task = reactor.async do + condition.wait # Wait for subscriber to be ready + + if spublish_available + begin + # Get a client on the same node as the subscriber for SPUBLISH + slot = cluster.slot_for(shard_channel) + publisher_client = cluster.client_for(slot) + publisher_client.call("SPUBLISH", shard_channel, shard_message) + rescue => error + Console.warn("SPUBLISH failed: #{error}") + end + end + end + + publisher_task.wait + sleep(0.1) # Allow message delivery + subscriber_task.stop + + # Only check message if sharded pub/sub was available + if spublish_available && received_message + expect(received_message).to be == shard_message + else + Console.warn("Skipping assertion - sharded pub/sub not available on this Redis version") + end + end + + it "distributes sharded messages across cluster nodes" do + # This test verifies that sharded pub/sub properly distributes + # messages across different cluster nodes based on channel hash + + channels = [ + "shard:node:a:#{SecureRandom.uuid}", + "shard:node:b:#{SecureRandom.uuid}", + "shard:node:c:#{SecureRandom.uuid}" + ] + + # Find channels that map to different slots/nodes + channel_slots = channels.map {|ch| [ch, cluster.slot_for(ch)]} + unique_slots = channel_slots.map(&:last).uniq + + # We should have channels distributed across different slots + expect(unique_slots.size).to be > 1 + + received_messages = [] + condition = Async::Condition.new + subscriber_count = 0 + target_count = channels.size + + # Set up subscribers for each channel + subscriber_tasks = channels.map do |channel| + reactor.async do + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + client.ssubscribe(channel) do |context| + subscriber_count += 1 + condition.signal if subscriber_count == target_count + + type, name, message = context.listen + received_messages << {channel: name, message: message, slot: slot} + end + end + end + + # Set up publisher + publisher_task = reactor.async do + condition.wait # Wait for all subscribers + + channels.each_with_index do |channel, index| + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + begin + client.call("SPUBLISH", channel, "message-#{index}") + rescue => error + Console.warn("SPUBLISH failed for #{channel}: #{error}") + # Clean up and skip if SPUBLISH not available + subscriber_tasks.each(&:stop) + return + end + end + end + + publisher_task.wait + sleep(0.1) # Allow time for message delivery + subscriber_tasks.each(&:stop) + + # Verify we received messages for different channels + expect(received_messages.size).to be == channels.size + + # Verify messages were distributed to different slots + received_slots = received_messages.map {|msg| msg[:slot]}.uniq + expect(received_slots.size).to be > 1 + end + + it "can mix sharded and regular subscriptions on different nodes" do + regular_channel = "regular:#{SecureRandom.uuid}" + shard_channel = "shard:#{SecureRandom.uuid}" + + regular_slot = cluster.slot_for(regular_channel) + shard_slot = cluster.slot_for(shard_channel) + + received_messages = [] + condition = Async::Condition.new + ready_count = 0 + + # Regular subscription on one node + regular_task = reactor.async do + client = cluster.client_for(regular_slot) + client.subscribe(regular_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Sharded subscription on another node (if different) + shard_task = reactor.async do + client = cluster.client_for(shard_slot) + client.ssubscribe(shard_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait # Wait for both subscribers + + # Publish to regular channel + regular_client = cluster.client_for(regular_slot) + regular_client.publish(regular_channel, "regular message") + + # Publish to sharded channel + shard_client = cluster.client_for(shard_slot) + begin + shard_client.call("SPUBLISH", shard_channel, "sharded message") + rescue => error + Console.warn("SPUBLISH not available: #{error}") + regular_task.stop + shard_task.stop + return + end + end + + publisher_task.wait + sleep(0.1) # Allow time for message delivery + regular_task.stop + shard_task.stop + + # Should have received both messages + expect(received_messages.size).to be == 2 + + # Verify message types + message_types = received_messages.map {|msg| msg[:type]} + expect(message_types).to be(:include?, "message") # Regular pub/sub + expect(message_types).to be(:include?, "smessage") # Sharded pub/sub + end + + it "handles sharded subscription on same connection as regular subscription" do + # Test that the unified Subscribe context works in cluster environment + channel = "unified:test:#{SecureRandom.uuid}" + shard_channel = "shard:unified:#{SecureRandom.uuid}" + + # Check if both channels hash to the same slot + channel_slot = cluster.slot_for(channel) + shard_slot = cluster.slot_for(shard_channel) + + # For this test to work, both channels must be on the same node + # If they're not, we need to use the same hash tag to force them to the same slot + if channel_slot != shard_slot + # Use hash tags to force both channels to the same slot + base_key = "{unified:#{SecureRandom.uuid}}" + channel = "#{base_key}:regular" + shard_channel = "#{base_key}:shard" + + # Verify they now hash to the same slot + channel_slot = cluster.slot_for(channel) + shard_slot = cluster.slot_for(shard_channel) + expect(channel_slot).to be == shard_slot + end + + client = cluster.client_for(channel_slot) + + received_messages = [] + condition = Async::Condition.new + + # Set up unified subscription + subscriber_task = reactor.async do + client.subscribe(channel) do |context| + # Add sharded subscription to same context + context.ssubscribe([shard_channel]) + + condition.signal # Ready to receive + + # Listen for both message types + 2.times do + response = context.listen + received_messages << response + end + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait + + # Both messages must be published from the same node (same slot) + publisher_client = cluster.client_for(channel_slot) + + # Publish regular message + publisher_client.publish(channel, "unified regular") + + # Publish sharded message + begin + publisher_client.call("SPUBLISH", shard_channel, "unified sharded") + rescue => error + Console.warn("SPUBLISH not available: #{error}") + # Skip sharded part if not available + end + end + + publisher_task.wait + sleep(0.1) # Allow message delivery + subscriber_task.stop + + # Should receive both message types on same context + expect(received_messages.size).to be == 2 + + message_types = received_messages.map(&:first) + expect(message_types).to be(:include?, "message") # Regular + expect(message_types).to be(:include?, "smessage") # Sharded + end + end + end +end diff --git a/gems.rb b/gems.rb index 33f28a0..3c3e552 100644 --- a/gems.rb +++ b/gems.rb @@ -14,6 +14,8 @@ gem "bake-gem" gem "bake-releases" + gem "agent-context" + gem "utopia-project" end diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index e728239..b599258 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -27,6 +27,9 @@ Async do client = Async::Redis::Client.new(endpoint) puts client.info + + client.set("mykey", "myvalue") + puts client.get("mykey") end ``` @@ -86,34 +89,6 @@ ensure end ``` -### Subscriptions - -``` ruby -require 'async' -require 'async/redis' - -endpoint = Async::Redis.local_endpoint -client = Async::Redis::Client.new(endpoint) +## Next Steps -Async do |task| - condition = Async::Condition.new - - publisher = task.async do - condition.wait - - client.publish 'status.frontend', 'good' - end - - subscriber = task.async do - client.subscribe 'status.frontend' do |context| - condition.signal # We are waiting for messages. - - type, name, message = context.listen - - pp type, name, message - end - end -ensure - client.close -end -``` \ No newline at end of file +- [Subscriptions](../subscriptions/) - Learn how to use Redis pub/sub functionality for real-time messaging. \ No newline at end of file diff --git a/guides/links.yaml b/guides/links.yaml index 7f527b0..e83ae52 100644 --- a/guides/links.yaml +++ b/guides/links.yaml @@ -1,2 +1,5 @@ getting-started: order: 1 + +subscriptions: + order: 2 diff --git a/guides/subscriptions/readme.md b/guides/subscriptions/readme.md new file mode 100644 index 0000000..5a56150 --- /dev/null +++ b/guides/subscriptions/readme.md @@ -0,0 +1,240 @@ +# Subscriptions + +This guide explains how to use Redis pub/sub functionality with `async-redis` to publish and subscribe to messages. + +## Usage + +First, let's create a simple listener that subscribes to messages on a channel: + +``` ruby +require 'async' +require 'async/redis' + +endpoint = Async::Redis.local_endpoint +client = Async::Redis::Client.new(endpoint) + +Async do + client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + type, name, message = context.listen + + puts "Received: #{message}" + end +ensure + client.close +end +``` + +Now, let's create a publisher that sends messages to the same channel: + +``` ruby +require 'async' +require 'async/redis' + +endpoint = Async::Redis.local_endpoint +client = Async::Redis::Client.new(endpoint) + +Async do + puts "Publishing message..." + client.publish 'status.frontend', 'good' + puts "Message sent!" +ensure + client.close +end +``` + +To see pub/sub in action, you can run the listener in one terminal and the publisher in another. The listener will receive any messages sent by the publisher to the `status.frontend` channel: + +```bash +$ ruby listener.rb +Listening for messages on 'status.frontend'... +Received: good +``` + +### How It Works + +**Listener:** +- Uses the `subscribe` method with a channel name and block. +- The block receives a context object for listening to messages. +- `context.listen` returns an array: `[type, name, message]`. +- Runs continuously waiting for messages. + +**Publisher:** +- Uses the `publish` method to send messages to a channel. +- Takes a channel name and message content. +- Sends the message and exits. + +**Channel Communication:** +- Both listener and publisher use the same channel name (`'status.frontend'`). +- Messages are delivered in real-time when published. +- Multiple listeners can subscribe to the same channel. + +## Message Format + +When you call `context.listen`, it returns an array with three elements: + +```ruby +type, name, message = context.listen +``` + +- **`type`**: The type of Redis pub/sub event. Common values include: + - `"message"` - A regular published message. + - `"subscribe"` - Confirmation that you've subscribed to a channel. + - `"unsubscribe"` - Confirmation that you've unsubscribed from a channel. + - `"pmessage"` - A message from a pattern subscription. + - `"smessage"` - A message from a sharded channel subscription. + +- **`name`**: The channel name where the message was received (e.g., `"status.frontend"`). + +- **`message`**: The actual message content that was published. + +**Note**: For pattern subscriptions (`pmessage`), the format is slightly different: +```ruby +type, pattern, name, message = context.listen +``` +Where `pattern` is the pattern that matched, and `name` is the actual channel name. + +### Example Output + +```ruby +client.subscribe 'notifications' do |context| + type, name, message = context.listen + puts "Type: #{type}, Channel: #{name}, Message: #{message}" +end + +# When someone publishes: client.publish('notifications', 'Hello World!') +# Output: Type: message, Channel: notifications, Message: Hello World! +``` + +## Multiple Channels + +You can subscribe to multiple channels at once: + +``` ruby +client.subscribe 'channel1', 'channel2', 'channel3' do |context| + while true + type, name, message = context.listen + puts "Received on #{name}: #{message}" + end +end +``` + +## Pattern Subscriptions + +Redis also supports pattern-based subscriptions using `psubscribe`: + +``` ruby +client.psubscribe 'status.*' do |context| + while true + response = context.listen + + if response.first == "pmessage" + type, pattern, name, message = response + puts "Pattern #{pattern} matched channel #{name}: #{message}" + end + end +end +``` + +## Mixing Regular and Pattern Subscriptions + +You can mix regular channel subscriptions and pattern subscriptions on the same context: + +``` ruby +client.subscribe 'exact-channel' do |context| + # Add pattern subscription to the same context: + context.psubscribe(['pattern.*']) + + while true + response = context.listen + + case response.first + when "message" + type, name, message = response + puts "Regular message on #{name}: #{message}" + when "pmessage" + type, pattern, name, message = response + puts "Pattern #{pattern} matched #{name}: #{message}" + end + end +end +``` + +## Sharded Subscriptions + +Redis 7.0 introduced sharded pub/sub for better scalability in cluster environments. You can use sharded subscriptions with the same `Subscribe` context: + +``` ruby +client.ssubscribe 'user-notifications' do |context| + while true + type, name, message = context.listen + puts "Sharded message on #{name}: #{message}" + end +end +``` + +**Key differences from regular pub/sub:** +- Messages are distributed across cluster nodes for better performance. +- Only supports exact channel names (no pattern matching). +- Same message format as regular subscriptions: `[type, channel, message]`. +- Requires Redis 7.0+ and works best in cluster mode. + +## Mixing All Subscription Types + +Since all subscription types use the same `Subscribe` context, you can mix them freely: + +``` ruby +client.subscribe 'exact-channel' do |context| + # Add pattern and sharded subscriptions to the same context: + context.psubscribe(['pattern.*']) + context.ssubscribe(['shard-channel']) + + while true + response = context.listen + + case response.first + when "message" + type, name, message = response + puts "Regular message on #{name}: #{message}" + when "pmessage" + type, pattern, name, message = response + puts "Pattern #{pattern} matched #{name}: #{message}" + when "smessage" + type, name, message = response + puts "Sharded message on #{name}: #{message}" + end + end +end +``` + +## Important: Subscription Type Behavior + +Redis supports mixing different subscription types on the same connection: + +- ✅ **SUBSCRIBE + PSUBSCRIBE + SSUBSCRIBE**: All can be mixed on the same connection/context +- 🎯 **Unified Interface**: `async-redis` uses a single `Subscribe` context for all subscription types + +**Benefits of the unified approach:** +- **Simplicity**: One context handles all subscription types +- **Flexibility**: Mix any combination of subscription types as needed +- **Consistency**: Same `listen` method handles all message types +- **Convenience**: No need to manage multiple contexts for different subscription types + +## Error Handling + +Always ensure proper cleanup of Redis connections: + +``` ruby +Async do + begin + client.subscribe 'my-channel' do |context| + # Handle messages... + end + rescue => error + puts "Subscription error: #{error}" + ensure + client.close + end +end +``` diff --git a/lib/async/redis/client.rb b/lib/async/redis/client.rb index 6621a44..83e0103 100755 --- a/lib/async/redis/client.rb +++ b/lib/async/redis/client.rb @@ -47,6 +47,44 @@ def subscribe(*channels) end end + # Subscribe to one or more channel patterns for pub/sub messaging. + # @parameter patterns [Array(String)] The channel patterns to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscribe] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscribe] The subscription context if no block given. + def psubscribe(*patterns) + context = Context::Subscribe.new(@pool, []) + context.psubscribe(patterns) + + return context unless block_given? + + begin + yield context + ensure + context.close + end + end + + # Subscribe to one or more sharded channels for pub/sub messaging (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscribe] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscribe] The subscription context if no block given. + def ssubscribe(*channels) + context = Context::Subscribe.new(@pool, []) + context.ssubscribe(channels) + + return context unless block_given? + + begin + yield context + ensure + context.close + end + end + # Execute commands within a Redis transaction. # @yields {|context| ...} If a block is given, it will be executed within the transaction context. # @parameter context [Context::Transaction] The transaction context. diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index d57659b..5fbd901 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -243,6 +243,69 @@ def slots_for(keys) return slots end + + # Subscribe to one or more channels for pub/sub messaging in cluster environment. + # The subscription will be created on the node responsible for the first channel. + # @parameter channels [Array(String)] The channels to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscribe] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscribe] The subscription context if no block given. + def subscribe(*channels) + # For regular pub/sub, route to node based on first channel + slot = channels.any? ? slot_for(channels.first) : 0 + client = client_for(slot) + + client.subscribe(*channels) do |context| + if block_given? + yield context + else + return context + end + end + end + + # Subscribe to one or more channel patterns for pub/sub messaging in cluster environment. + # The subscription will be created on the node responsible for a deterministic slot. + # @parameter patterns [Array(String)] The channel patterns to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscribe] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscribe] The subscription context if no block given. + def psubscribe(*patterns) + # For pattern subscriptions, use a deterministic slot since patterns can match any channel + slot = patterns.any? ? slot_for(patterns.first) : 0 + client = client_for(slot) + + client.psubscribe(*patterns) do |context| + if block_given? + yield context + else + return context + end + end + end + + # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment (Redis 7.0+). + # The subscription will be created on the node responsible for the channel's hash slot. + # @parameter channels [Array(String)] The sharded channels to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscribe] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscribe] The subscription context if no block given. + def ssubscribe(*channels) + # For sharded subscriptions, route to appropriate node based on channel hash + slot = channels.any? ? slot_for(channels.first) : 0 + client = client_for(slot) + + client.ssubscribe(*channels) do |context| + if block_given? + yield context + else + return context + end + end + end end end end diff --git a/lib/async/redis/context/subscribe.rb b/lib/async/redis/context/subscribe.rb index dfbe8e4..bb88740 100644 --- a/lib/async/redis/context/subscribe.rb +++ b/lib/async/redis/context/subscribe.rb @@ -12,6 +12,8 @@ module Context # Context for Redis pub/sub subscription operations. class Subscribe < Generic MESSAGE = "message" + PMESSAGE = "pmessage" + SMESSAGE = "smessage" # Initialize a new subscription context. # @parameter pool [Pool] The connection pool to use. @@ -19,7 +21,7 @@ class Subscribe < Generic def initialize(pool, channels) super(pool) - subscribe(channels) + subscribe(channels) if channels.any? end # Close the subscription context. @@ -34,7 +36,7 @@ def close # @returns [Array] The next message response, or nil if connection closed. def listen while response = @connection.read_response - return response if response.first == MESSAGE + return response if response.first == MESSAGE || response.first == PMESSAGE || response.first == SMESSAGE end end @@ -62,6 +64,34 @@ def unsubscribe(channels) @connection.write_request ["UNSUBSCRIBE", *channels] @connection.flush end + + # Subscribe to channel patterns. + # @parameter patterns [Array(String)] The channel patterns to subscribe to. + def psubscribe(patterns) + @connection.write_request ["PSUBSCRIBE", *patterns] + @connection.flush + end + + # Unsubscribe from channel patterns. + # @parameter patterns [Array(String)] The channel patterns to unsubscribe from. + def punsubscribe(patterns) + @connection.write_request ["PUNSUBSCRIBE", *patterns] + @connection.flush + end + + # Subscribe to sharded channels (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to subscribe to. + def ssubscribe(channels) + @connection.write_request ["SSUBSCRIBE", *channels] + @connection.flush + end + + # Unsubscribe from sharded channels (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to unsubscribe from. + def sunsubscribe(channels) + @connection.write_request ["SUNSUBSCRIBE", *channels] + @connection.flush + end end end end diff --git a/releases.md b/releases.md index 426e6c9..b6e0b55 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,11 @@ # Releases +## Unreleased + + - Add agent context. + - Add support for pattern pub/sub. + - Add support for sharded pub/sub. + ## v0.11.2 - Fix handling of IPv6 address literals, including those returned by Redis Cluster / Sentinel. From 4e21c262af5fdc81c3ad3f010749432a94f1b3d5 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 13 Aug 2025 23:29:27 +1200 Subject: [PATCH 03/11] Remove duplicate test. --- .../test/async/redis/cluster_subscriptions.rb | 269 ------------------ 1 file changed, 269 deletions(-) delete mode 100644 cluster/test/async/redis/cluster_subscriptions.rb diff --git a/cluster/test/async/redis/cluster_subscriptions.rb b/cluster/test/async/redis/cluster_subscriptions.rb deleted file mode 100644 index d35ca64..0000000 --- a/cluster/test/async/redis/cluster_subscriptions.rb +++ /dev/null @@ -1,269 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2025, by Samuel Williams. - -require "async/redis/cluster_client" -require "sus/fixtures/async" -require "securerandom" - -describe Async::Redis::ClusterClient do - include Sus::Fixtures::Async::ReactorContext - - let(:node_a) {"redis://redis-a:6379"} - let(:node_b) {"redis://redis-b:6379"} - let(:node_c) {"redis://redis-c:6379"} - - let(:endpoints) {[ - Async::Redis::Endpoint.parse(node_a), - Async::Redis::Endpoint.parse(node_b), - Async::Redis::Endpoint.parse(node_c) - ]} - - let(:cluster) {subject.new(endpoints)} - - with "sharded subscriptions" do - let(:shard_channel) {"cluster-shard:test:#{SecureRandom.uuid}"} - let(:shard_message) {"cluster sharded message"} - - it "can subscribe to sharded channels and receive messages" do - received_message = nil - condition = Async::Condition.new - - # Get a client for the sharded channel - slot = cluster.slot_for(shard_channel) - subscriber_client = cluster.client_for(slot) - - # Set up the subscriber - subscriber_task = reactor.async do - subscriber_client.ssubscribe(shard_channel) do |context| - condition.signal # Signal that we're ready - - type, name, message = context.listen - - expect(type).to be == "smessage" - expect(name).to be == shard_channel - received_message = message - end - end - - # Set up the publisher - publisher_task = reactor.async do - condition.wait # Wait for subscriber to be ready - - # For sharded pub/sub, we need to use SPUBLISH - # Get a client (can be any node) to publish the message - publisher_client = cluster.client_for(slot) - - begin - # Try to use SPUBLISH if available (Redis 7.0+) - publisher_client.call("SPUBLISH", shard_channel, shard_message) - rescue => error - # If SPUBLISH is not available, skip this test - Console.warn("SPUBLISH not available, skipping sharded pub/sub test: #{error}") - subscriber_task.stop - return - end - end - - publisher_task.wait - subscriber_task.stop - - expect(received_message).to be == shard_message - end - - it "distributes sharded messages across cluster nodes" do - # This test verifies that sharded pub/sub properly distributes - # messages across different cluster nodes based on channel hash - - channels = [ - "shard:node:a:#{SecureRandom.uuid}", - "shard:node:b:#{SecureRandom.uuid}", - "shard:node:c:#{SecureRandom.uuid}" - ] - - # Find channels that map to different slots/nodes - channel_slots = channels.map {|ch| [ch, cluster.slot_for(ch)]} - unique_slots = channel_slots.map(&:last).uniq - - # We should have channels distributed across different slots - expect(unique_slots.size).to be > 1 - - received_messages = [] - condition = Async::Condition.new - subscriber_count = 0 - target_count = channels.size - - # Set up subscribers for each channel - subscriber_tasks = channels.map do |channel| - reactor.async do - slot = cluster.slot_for(channel) - client = cluster.client_for(slot) - - client.ssubscribe(channel) do |context| - subscriber_count += 1 - condition.signal if subscriber_count == target_count - - type, name, message = context.listen - received_messages << {channel: name, message: message, slot: slot} - end - end - end - - # Set up publisher - publisher_task = reactor.async do - condition.wait # Wait for all subscribers - - channels.each_with_index do |channel, index| - slot = cluster.slot_for(channel) - client = cluster.client_for(slot) - - begin - client.call("SPUBLISH", channel, "message-#{index}") - rescue => error - Console.warn("SPUBLISH failed for #{channel}: #{error}") - # Clean up and skip if SPUBLISH not available - subscriber_tasks.each(&:stop) - return - end - end - end - - publisher_task.wait - sleep(0.1) # Allow time for message delivery - subscriber_tasks.each(&:stop) - - # Verify we received messages for different channels - expect(received_messages.size).to be == channels.size - - # Verify messages were distributed to different slots - received_slots = received_messages.map {|msg| msg[:slot]}.uniq - expect(received_slots.size).to be > 1 - end - - it "can mix sharded and regular subscriptions on different nodes" do - regular_channel = "regular:#{SecureRandom.uuid}" - shard_channel = "shard:#{SecureRandom.uuid}" - - regular_slot = cluster.slot_for(regular_channel) - shard_slot = cluster.slot_for(shard_channel) - - received_messages = [] - condition = Async::Condition.new - ready_count = 0 - - # Regular subscription on one node - regular_task = reactor.async do - client = cluster.client_for(regular_slot) - client.subscribe(regular_channel) do |context| - ready_count += 1 - condition.signal if ready_count == 2 - - type, name, message = context.listen - received_messages << {type: type, channel: name, message: message} - end - end - - # Sharded subscription on another node (if different) - shard_task = reactor.async do - client = cluster.client_for(shard_slot) - client.ssubscribe(shard_channel) do |context| - ready_count += 1 - condition.signal if ready_count == 2 - - type, name, message = context.listen - received_messages << {type: type, channel: name, message: message} - end - end - - # Publisher - publisher_task = reactor.async do - condition.wait # Wait for both subscribers - - # Publish to regular channel - regular_client = cluster.client_for(regular_slot) - regular_client.publish(regular_channel, "regular message") - - # Publish to sharded channel - shard_client = cluster.client_for(shard_slot) - begin - shard_client.call("SPUBLISH", shard_channel, "sharded message") - rescue => error - Console.warn("SPUBLISH not available: #{error}") - regular_task.stop - shard_task.stop - return - end - end - - publisher_task.wait - sleep(0.1) # Allow time for message delivery - regular_task.stop - shard_task.stop - - # Should have received both messages - expect(received_messages.size).to be == 2 - - # Verify message types - message_types = received_messages.map {|msg| msg[:type]} - expect(message_types).to include("message") # Regular pub/sub - expect(message_types).to include("smessage") # Sharded pub/sub - end - - it "handles sharded subscription on same connection as regular subscription" do - # Test that the unified Subscribe context works in cluster environment - channel = "unified:test:#{SecureRandom.uuid}" - shard_channel = "shard:unified:#{SecureRandom.uuid}" - - slot = cluster.slot_for(channel) - client = cluster.client_for(slot) - - received_messages = [] - condition = Async::Condition.new - - # Set up unified subscription - subscriber_task = reactor.async do - client.subscribe(channel) do |context| - # Add sharded subscription to same context - context.ssubscribe([shard_channel]) - - condition.signal # Ready to receive - - # Listen for both message types - 2.times do - response = context.listen - received_messages << response - end - end - end - - # Publisher - publisher_task = reactor.async do - condition.wait - - # Publish regular message - client.publish(channel, "unified regular") - - # Publish sharded message - begin - client.call("SPUBLISH", shard_channel, "unified sharded") - rescue => error - Console.warn("SPUBLISH not available: #{error}") - subscriber_task.stop - return - end - end - - publisher_task.wait - sleep(0.1) # Allow message delivery - subscriber_task.stop - - # Should receive both message types on same context - expect(received_messages.size).to be == 2 - - message_types = received_messages.map(&:first) - expect(message_types).to include("message") # Regular - expect(message_types).to include("smessage") # Sharded - end - end -end From 82dbcf5e871a345ebc04048b13f19f026b147469 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 13 Aug 2025 23:49:49 +1200 Subject: [PATCH 04/11] Fix tests again. --- cluster/test/async/redis/context/subscribe.rb | 107 +++++------------- lib/async/redis/cluster_client.rb | 68 +++++++++-- 2 files changed, 91 insertions(+), 84 deletions(-) diff --git a/cluster/test/async/redis/context/subscribe.rb b/cluster/test/async/redis/context/subscribe.rb index 1e19817..58fcd50 100644 --- a/cluster/test/async/redis/context/subscribe.rb +++ b/cluster/test/async/redis/context/subscribe.rb @@ -3,6 +3,7 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. +require "async/variable" require "async/redis/cluster_client" require "sus/fixtures/async" require "securerandom" @@ -29,58 +30,34 @@ it "can subscribe to sharded channels and receive messages" do received_message = nil - condition = Async::Condition.new - spublish_available = false + ready = Async::Variable.new # Set up the subscriber using cluster client's ssubscribe method - subscriber_task = reactor.async do - begin - cluster.ssubscribe(shard_channel) do |context| - condition.signal # Signal that we're ready - spublish_available = true - - type, name, message = context.listen - - expect(type).to be == "smessage" - expect(name).to be == shard_channel - received_message = message - end - rescue Protocol::Redis::ServerError => error - if error.message.include?("unknown command") - Console.warn("SSUBSCRIBE not available on this Redis version") - condition.signal - else - raise - end + subscriber_task = Async do + cluster.ssubscribe(shard_channel) do |context| + ready.resolve + + type, name, message = context.listen + + expect(type).to be == "smessage" + expect(name).to be == shard_channel + received_message = message end end # Set up the publisher - publisher_task = reactor.async do - condition.wait # Wait for subscriber to be ready + publisher_task = Async do + ready.wait - if spublish_available - begin - # Get a client on the same node as the subscriber for SPUBLISH - slot = cluster.slot_for(shard_channel) - publisher_client = cluster.client_for(slot) - publisher_client.call("SPUBLISH", shard_channel, shard_message) - rescue => error - Console.warn("SPUBLISH failed: #{error}") - end - end + slot = cluster.slot_for(shard_channel) + publisher_client = cluster.client_for(slot) + publisher_client.call("SPUBLISH", shard_channel, shard_message) end publisher_task.wait - sleep(0.1) # Allow message delivery - subscriber_task.stop + subscriber_task.wait - # Only check message if sharded pub/sub was available - if spublish_available && received_message - expect(received_message).to be == shard_message - else - Console.warn("Skipping assertion - sharded pub/sub not available on this Redis version") - end + expect(received_message).to be == shard_message end it "distributes sharded messages across cluster nodes" do @@ -94,26 +71,26 @@ ] # Find channels that map to different slots/nodes - channel_slots = channels.map {|ch| [ch, cluster.slot_for(ch)]} + channel_slots = channels.map {|channel| [channel, cluster.slot_for(channel)]} unique_slots = channel_slots.map(&:last).uniq # We should have channels distributed across different slots expect(unique_slots.size).to be > 1 received_messages = [] - condition = Async::Condition.new + ready = Async::Variable.new subscriber_count = 0 target_count = channels.size # Set up subscribers for each channel subscriber_tasks = channels.map do |channel| - reactor.async do + Async do slot = cluster.slot_for(channel) client = cluster.client_for(slot) client.ssubscribe(channel) do |context| subscriber_count += 1 - condition.signal if subscriber_count == target_count + ready.resolve if subscriber_count == target_count type, name, message = context.listen received_messages << {channel: name, message: message, slot: slot} @@ -122,27 +99,19 @@ end # Set up publisher - publisher_task = reactor.async do - condition.wait # Wait for all subscribers + publisher_task = Async do + ready.wait # Wait for all subscribers channels.each_with_index do |channel, index| slot = cluster.slot_for(channel) client = cluster.client_for(slot) - begin - client.call("SPUBLISH", channel, "message-#{index}") - rescue => error - Console.warn("SPUBLISH failed for #{channel}: #{error}") - # Clean up and skip if SPUBLISH not available - subscriber_tasks.each(&:stop) - return - end + client.call("SPUBLISH", channel, "message-#{index}") end end publisher_task.wait - sleep(0.1) # Allow time for message delivery - subscriber_tasks.each(&:stop) + subscriber_tasks.each(&:wait) # Verify we received messages for different channels expect(received_messages.size).to be == channels.size @@ -197,20 +166,12 @@ # Publish to sharded channel shard_client = cluster.client_for(shard_slot) - begin - shard_client.call("SPUBLISH", shard_channel, "sharded message") - rescue => error - Console.warn("SPUBLISH not available: #{error}") - regular_task.stop - shard_task.stop - return - end + shard_client.call("SPUBLISH", shard_channel, "sharded message") end publisher_task.wait - sleep(0.1) # Allow time for message delivery - regular_task.stop - shard_task.stop + regular_task.wait + shard_task.wait # Should have received both messages expect(received_messages.size).to be == 2 @@ -276,17 +237,11 @@ publisher_client.publish(channel, "unified regular") # Publish sharded message - begin - publisher_client.call("SPUBLISH", shard_channel, "unified sharded") - rescue => error - Console.warn("SPUBLISH not available: #{error}") - # Skip sharded part if not available - end + publisher_client.call("SPUBLISH", shard_channel, "unified sharded") end publisher_task.wait - sleep(0.1) # Allow message delivery - subscriber_task.stop + subscriber_task.wait # Should receive both message types on same context expect(received_messages.size).to be == 2 diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 5fbd901..391c06e 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -63,6 +63,16 @@ def each end end + # Get a random value from the map. + # @returns [Object] A randomly selected value, or nil if map is empty. + def sample + return nil if @ranges.empty? + + range, value = @ranges.sample + + return value + end + # Clear all ranges from the map. def clear @ranges.clear @@ -127,6 +137,38 @@ def client_for(slot, role = :master) end end + # Get any available client from the cluster. + # This is useful for operations that don't require slot-specific routing, + # such as global pub/sub operations, INFO commands, or other cluster-wide operations. + # @parameter role [Symbol] The role of node to get (:master or :slave). + # @returns [Client] A Redis client for any available node. + def any_client(role = :master) + unless @shards + reload_cluster! + end + + # Sample a random shard to get better load distribution + if nodes = @shards.sample + nodes = nodes.select{|node| node.role == role} + + if node = nodes.sample + return (node.client ||= Client.new(node.endpoint, **@options)) + end + end + + # Fallback to slot 0 if sampling fails + client_for(0, role) + end + + # Execute a Redis command on any available cluster node. + # This is useful for commands that don't require slot-specific routing. + # @parameter command [String] The Redis command to execute. + # @parameter arguments [Array] The command arguments. + # @returns [Object] The result of the Redis command. + def call(command, *arguments) + any_client.call(command, *arguments) + end + protected def reload_cluster!(endpoints = @endpoints) @@ -245,16 +287,22 @@ def slots_for(keys) end # Subscribe to one or more channels for pub/sub messaging in cluster environment. - # The subscription will be created on the node responsible for the first channel. + # + # NOTE: Regular pub/sub in Redis Cluster is GLOBAL - messages propagate to all nodes. + # This method is a convenience that subscribes via an arbitrary cluster node. + # The choice of node does not affect which messages you receive, since regular + # pub/sub messages are broadcast to all nodes in the cluster. + # + # For slot-aware pub/sub, use ssubscribe() instead (Redis 7.0+). + # # @parameter channels [Array(String)] The channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. # @parameter context [Context::Subscribe] The subscription context. # @returns [Object] The result of the block if block given. # @returns [Context::Subscribe] The subscription context if no block given. def subscribe(*channels) - # For regular pub/sub, route to node based on first channel - slot = channels.any? ? slot_for(channels.first) : 0 - client = client_for(slot) + # For regular pub/sub, use any available node since messages are global + client = any_client client.subscribe(*channels) do |context| if block_given? @@ -266,16 +314,20 @@ def subscribe(*channels) end # Subscribe to one or more channel patterns for pub/sub messaging in cluster environment. - # The subscription will be created on the node responsible for a deterministic slot. + # + # NOTE: Pattern subscriptions in Redis Cluster are GLOBAL - they match channels + # across all nodes. This method is a convenience that subscribes via an arbitrary + # cluster node. Pattern matching works across the entire cluster regardless of + # which node you subscribe from. + # # @parameter patterns [Array(String)] The channel patterns to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. # @parameter context [Context::Subscribe] The subscription context. # @returns [Object] The result of the block if block given. # @returns [Context::Subscribe] The subscription context if no block given. def psubscribe(*patterns) - # For pattern subscriptions, use a deterministic slot since patterns can match any channel - slot = patterns.any? ? slot_for(patterns.first) : 0 - client = client_for(slot) + # For pattern subscriptions, use any available node since patterns are global + client = any_client client.psubscribe(*patterns) do |context| if block_given? From 7ffa7cc482ffe00a7b0c77e461815ca746a55eab Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 00:02:38 +1200 Subject: [PATCH 05/11] Introduce `SharedSubscribe` for state management. --- lib/async/redis/cluster_client.rb | 34 +++--- lib/async/redis/context/shard_subscribe.rb | 122 +++++++++++++++++++++ 2 files changed, 135 insertions(+), 21 deletions(-) create mode 100644 lib/async/redis/context/shard_subscribe.rb diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 391c06e..533ebe9 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -5,6 +5,7 @@ # Copyright, 2025, by Travis Bell. require_relative "client" +require_relative "context/shard_subscribe" require "io/stream" module Async @@ -138,8 +139,7 @@ def client_for(slot, role = :master) end # Get any available client from the cluster. - # This is useful for operations that don't require slot-specific routing, - # such as global pub/sub operations, INFO commands, or other cluster-wide operations. + # This is useful for operations that don't require slot-specific routing, such as global pub/sub operations, INFO commands, or other cluster-wide operations. # @parameter role [Symbol] The role of node to get (:master or :slave). # @returns [Client] A Redis client for any available node. def any_client(role = :master) @@ -160,15 +160,6 @@ def any_client(role = :master) client_for(0, role) end - # Execute a Redis command on any available cluster node. - # This is useful for commands that don't require slot-specific routing. - # @parameter command [String] The Redis command to execute. - # @parameter arguments [Array] The command arguments. - # @returns [Object] The result of the Redis command. - def call(command, *arguments) - any_client.call(command, *arguments) - end - protected def reload_cluster!(endpoints = @endpoints) @@ -339,23 +330,24 @@ def psubscribe(*patterns) end # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment (Redis 7.0+). - # The subscription will be created on the node responsible for the channel's hash slot. + # The subscription will be created on the appropriate nodes responsible for each channel's hash slot. # @parameter channels [Array(String)] The sharded channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. + # @parameter context [Context::ShardSubscribe] The shard subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. + # @returns [Context::ShardSubscribe] The shard subscription context if no block given. def ssubscribe(*channels) - # For sharded subscriptions, route to appropriate node based on channel hash - slot = channels.any? ? slot_for(channels.first) : 0 - client = client_for(slot) + context = Context::ShardSubscribe.new(self) + context.subscribe(channels) if channels.any? - client.ssubscribe(*channels) do |context| - if block_given? + if block_given? + begin yield context - else - return context + ensure + context.close end + else + return context end end end diff --git a/lib/async/redis/context/shard_subscribe.rb b/lib/async/redis/context/shard_subscribe.rb new file mode 100644 index 0000000..c41fd28 --- /dev/null +++ b/lib/async/redis/context/shard_subscribe.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require_relative "subscribe" + +module Async + module Redis + module Context + # Context for managing sharded subscriptions across multiple Redis cluster nodes. + # This class handles the complexity of subscribing to channels that may be distributed + # across different shards in a Redis cluster. + class ShardSubscribe + # Initialize a new shard subscription context. + # @parameter cluster_client [ClusterClient] The cluster client to use. + def initialize(cluster_client) + @cluster_client = cluster_client + @subscriptions = {} + @channels = [] + end + + # Close all shard subscriptions. + def close + @subscriptions.each_value(&:close) + @subscriptions.clear + end + + # Listen for the next message from any subscribed shard. + # This uses a simple round-robin approach to check each shard. + # @returns [Array] The next message response, or nil if all connections closed. + def listen + return nil if @subscriptions.empty? + + # Simple round-robin checking of subscriptions + @subscriptions.each_value do |subscription| + # Non-blocking check for messages + begin + if response = subscription.listen + return response + end + rescue => error + # Handle connection errors gracefully + Console.warn(self, "Error reading from shard subscription: #{error}") + end + end + + # If no immediate messages, do a blocking wait on the first subscription + if first_subscription = @subscriptions.values.first + first_subscription.listen + end + end + + # Iterate over all messages from all subscribed shards. + # @yields {|response| ...} Block called for each message. + # @parameter response [Array] The message response. + def each + return to_enum unless block_given? + + while response = self.listen + yield response + end + end + + # Subscribe to additional sharded channels. + # @parameter channels [Array(String)] The channels to subscribe to. + def subscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + # Add to existing subscription for this shard + subscription.ssubscribe(channels_for_slot) + else + # Create new subscription for this shard + client = @cluster_client.client_for(slot) + @subscriptions[slot] = client.ssubscribe(*channels_for_slot) + end + end + + @channels.concat(channels) + end + + # Unsubscribe from sharded channels. + # @parameter channels [Array(String)] The channels to unsubscribe from. + def unsubscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + subscription.sunsubscribe(channels_for_slot) + + # Remove channels from our tracking + @channels -= channels_for_slot + + # Check if this shard still has channels + remaining_channels_for_slot = @channels.select { |ch| @cluster_client.slot_for(ch) == slot } + + # If no channels left for this shard, close and remove it + if remaining_channels_for_slot.empty? + subscription.close + @subscriptions.delete(slot) + end + end + end + end + + # Get the list of currently subscribed channels. + # @returns [Array(String)] The list of subscribed channels. + def channels + @channels.dup + end + + # Get the number of active shard subscriptions. + # @returns [Integer] The number of shard connections. + def shard_count + @subscriptions.size + end + end + end + end +end From 2263c69a54131f3502770198368a23eebf2f42d4 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 00:26:04 +1200 Subject: [PATCH 06/11] Better orgnization. --- lib/async/redis/cluster_client.rb | 70 ++---------- lib/async/redis/cluster_subscribe.rb | 120 ++++++++++++++++++++ lib/async/redis/context/shard_subscribe.rb | 122 --------------------- lib/async/redis/range_map.rb | 58 ++++++++++ 4 files changed, 186 insertions(+), 184 deletions(-) create mode 100644 lib/async/redis/cluster_subscribe.rb delete mode 100644 lib/async/redis/context/shard_subscribe.rb create mode 100644 lib/async/redis/range_map.rb diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 533ebe9..58249a2 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -5,7 +5,8 @@ # Copyright, 2025, by Travis Bell. require_relative "client" -require_relative "context/shard_subscribe" +require_relative "cluster_subscribe" +require_relative "range_map" require "io/stream" module Async @@ -22,64 +23,6 @@ class SlotError < StandardError Node = Struct.new(:id, :endpoint, :role, :health, :client) - # A map that stores ranges and their associated values for efficient lookup. - class RangeMap - # Initialize a new RangeMap. - def initialize - @ranges = [] - end - - # Add a range-value pair to the map. - # @parameter range [Range] The range to map. - # @parameter value [Object] The value to associate with the range. - # @returns [Object] The added value. - def add(range, value) - @ranges << [range, value] - - return value - end - - # Find the value associated with a key within any range. - # @parameter key [Object] The key to find. - # @yields {...} Block called if no range contains the key. - # @returns [Object] The value if found, result of block if given, or nil. - def find(key) - @ranges.each do |range, value| - return value if range.include?(key) - end - - if block_given? - return yield - end - - return nil - end - - # Iterate over all values in the map. - # @yields {|value| ...} Block called for each value. - # @parameter value [Object] The value from the range-value pair. - def each - @ranges.each do |range, value| - yield value - end - end - - # Get a random value from the map. - # @returns [Object] A randomly selected value, or nil if map is empty. - def sample - return nil if @ranges.empty? - - range, value = @ranges.sample - - return value - end - - # Clear all ranges from the map. - def clear - @ranges.clear - end - end - # Create a new instance of the cluster client. # # @property endpoints [Array(Endpoint)] The list of cluster endpoints. @@ -166,7 +109,7 @@ def reload_cluster!(endpoints = @endpoints) @endpoints.each do |endpoint| client = Client.new(endpoint, **@options) - shards = RangeMap.new + shards = Async::Redis::RangeMap.new endpoints = [] client.call("CLUSTER", "SHARDS").each do |shard| @@ -337,9 +280,12 @@ def psubscribe(*patterns) # @returns [Object] The result of the block if block given. # @returns [Context::ShardSubscribe] The shard subscription context if no block given. def ssubscribe(*channels) - context = Context::ShardSubscribe.new(self) - context.subscribe(channels) if channels.any? + context = ClusterSubscribe.new(self) + if channels.any? + context.subscribe(channels) + end + if block_given? begin yield context diff --git a/lib/async/redis/cluster_subscribe.rb b/lib/async/redis/cluster_subscribe.rb new file mode 100644 index 0000000..795c054 --- /dev/null +++ b/lib/async/redis/cluster_subscribe.rb @@ -0,0 +1,120 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require_relative "subscribe" + +module Async + module Redis + # Context for managing sharded subscriptions across multiple Redis cluster nodes. + # This class handles the complexity of subscribing to channels that may be distributed + # across different shards in a Redis cluster. + class ClusterSubscribe + # Initialize a new shard subscription context. + # @parameter cluster_client [ClusterClient] The cluster client to use. + def initialize(cluster_client) + @cluster_client = cluster_client + @subscriptions = {} + @channels = [] + end + + # Close all shard subscriptions. + def close + @subscriptions.each_value(&:close) + @subscriptions.clear + end + + # Listen for the next message from any subscribed shard. + # This uses a simple round-robin approach to check each shard. + # @returns [Array] The next message response, or nil if all connections closed. + def listen + return nil if @subscriptions.empty? + + # Simple round-robin checking of subscriptions + @subscriptions.each_value do |subscription| + # Non-blocking check for messages + begin + if response = subscription.listen + return response + end + rescue => error + # Handle connection errors gracefully + Console.warn(self, "Error reading from shard subscription: #{error}") + end + end + + # If no immediate messages, do a blocking wait on the first subscription + if first_subscription = @subscriptions.values.first + first_subscription.listen + end + end + + # Iterate over all messages from all subscribed shards. + # @yields {|response| ...} Block called for each message. + # @parameter response [Array] The message response. + def each + return to_enum unless block_given? + + while response = self.listen + yield response + end + end + + # Subscribe to additional sharded channels. + # @parameter channels [Array(String)] The channels to subscribe to. + def subscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + # Add to existing subscription for this shard + subscription.ssubscribe(channels_for_slot) + else + # Create new subscription for this shard + client = @cluster_client.client_for(slot) + @subscriptions[slot] = client.ssubscribe(*channels_for_slot) + end + end + + @channels.concat(channels) + end + + # Unsubscribe from sharded channels. + # @parameter channels [Array(String)] The channels to unsubscribe from. + def unsubscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + subscription.sunsubscribe(channels_for_slot) + + # Remove channels from our tracking + @channels -= channels_for_slot + + # Check if this shard still has channels + remaining_channels_for_slot = @channels.select { |ch| @cluster_client.slot_for(ch) == slot } + + # If no channels left for this shard, close and remove it + if remaining_channels_for_slot.empty? + subscription.close + @subscriptions.delete(slot) + end + end + end + end + + # Get the list of currently subscribed channels. + # @returns [Array(String)] The list of subscribed channels. + def channels + @channels.dup + end + + # Get the number of active shard subscriptions. + # @returns [Integer] The number of shard connections. + def shard_count + @subscriptions.size + end + end + end +end diff --git a/lib/async/redis/context/shard_subscribe.rb b/lib/async/redis/context/shard_subscribe.rb deleted file mode 100644 index c41fd28..0000000 --- a/lib/async/redis/context/shard_subscribe.rb +++ /dev/null @@ -1,122 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2025, by Samuel Williams. - -require_relative "subscribe" - -module Async - module Redis - module Context - # Context for managing sharded subscriptions across multiple Redis cluster nodes. - # This class handles the complexity of subscribing to channels that may be distributed - # across different shards in a Redis cluster. - class ShardSubscribe - # Initialize a new shard subscription context. - # @parameter cluster_client [ClusterClient] The cluster client to use. - def initialize(cluster_client) - @cluster_client = cluster_client - @subscriptions = {} - @channels = [] - end - - # Close all shard subscriptions. - def close - @subscriptions.each_value(&:close) - @subscriptions.clear - end - - # Listen for the next message from any subscribed shard. - # This uses a simple round-robin approach to check each shard. - # @returns [Array] The next message response, or nil if all connections closed. - def listen - return nil if @subscriptions.empty? - - # Simple round-robin checking of subscriptions - @subscriptions.each_value do |subscription| - # Non-blocking check for messages - begin - if response = subscription.listen - return response - end - rescue => error - # Handle connection errors gracefully - Console.warn(self, "Error reading from shard subscription: #{error}") - end - end - - # If no immediate messages, do a blocking wait on the first subscription - if first_subscription = @subscriptions.values.first - first_subscription.listen - end - end - - # Iterate over all messages from all subscribed shards. - # @yields {|response| ...} Block called for each message. - # @parameter response [Array] The message response. - def each - return to_enum unless block_given? - - while response = self.listen - yield response - end - end - - # Subscribe to additional sharded channels. - # @parameter channels [Array(String)] The channels to subscribe to. - def subscribe(channels) - slots = @cluster_client.slots_for(channels) - - slots.each do |slot, channels_for_slot| - if subscription = @subscriptions[slot] - # Add to existing subscription for this shard - subscription.ssubscribe(channels_for_slot) - else - # Create new subscription for this shard - client = @cluster_client.client_for(slot) - @subscriptions[slot] = client.ssubscribe(*channels_for_slot) - end - end - - @channels.concat(channels) - end - - # Unsubscribe from sharded channels. - # @parameter channels [Array(String)] The channels to unsubscribe from. - def unsubscribe(channels) - slots = @cluster_client.slots_for(channels) - - slots.each do |slot, channels_for_slot| - if subscription = @subscriptions[slot] - subscription.sunsubscribe(channels_for_slot) - - # Remove channels from our tracking - @channels -= channels_for_slot - - # Check if this shard still has channels - remaining_channels_for_slot = @channels.select { |ch| @cluster_client.slot_for(ch) == slot } - - # If no channels left for this shard, close and remove it - if remaining_channels_for_slot.empty? - subscription.close - @subscriptions.delete(slot) - end - end - end - end - - # Get the list of currently subscribed channels. - # @returns [Array(String)] The list of subscribed channels. - def channels - @channels.dup - end - - # Get the number of active shard subscriptions. - # @returns [Integer] The number of shard connections. - def shard_count - @subscriptions.size - end - end - end - end -end diff --git a/lib/async/redis/range_map.rb b/lib/async/redis/range_map.rb new file mode 100644 index 0000000..cede5dd --- /dev/null +++ b/lib/async/redis/range_map.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +module Async + module Redis + # A map that stores ranges and their associated values for efficient lookup. + class RangeMap + # Initialize a new RangeMap. + def initialize + @ranges = [] + end + + # Add a range-value pair to the map. + # @parameter range [Range] The range to map. + # @parameter value [Object] The value to associate with the range. + # @returns [Object] The added value. + def add(range, value) + @ranges << [range, value] + return value + end + + # Find the value associated with a key within any range. + # @parameter key [Object] The key to find. + # @yields {...} Block called if no range contains the key. + # @returns [Object] The value if found, result of block if given, or nil. + def find(key) + @ranges.each do |range, value| + return value if range.include?(key) + end + if block_given? + return yield + end + return nil + end + + # Iterate over all values in the map. + # @yields {|value| ...} Block called for each value. + # @parameter value [Object] The value from the range-value pair. + def each + @ranges.each do |range, value| + yield value + end + end + + # Get a random value from the map. + # @returns [Object] A randomly selected value, or nil if map is empty. + def sample + return nil if @ranges.empty? + range, value = @ranges.sample + return value + end + + # Clear all ranges from the map. + def clear + @ranges.clear + end + end + end +end From 143bf771cf15b4902fe93c1941b3c224a3016ae1 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 00:31:53 +1200 Subject: [PATCH 07/11] Fixes. --- lib/async/redis/cluster_client.rb | 2 +- lib/async/redis/cluster_subscribe.rb | 4 +--- lib/async/redis/range_map.rb | 10 +++++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 58249a2..102172b 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -285,7 +285,7 @@ def ssubscribe(*channels) if channels.any? context.subscribe(channels) end - + if block_given? begin yield context diff --git a/lib/async/redis/cluster_subscribe.rb b/lib/async/redis/cluster_subscribe.rb index 795c054..acb55ee 100644 --- a/lib/async/redis/cluster_subscribe.rb +++ b/lib/async/redis/cluster_subscribe.rb @@ -3,8 +3,6 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require_relative "subscribe" - module Async module Redis # Context for managing sharded subscriptions across multiple Redis cluster nodes. @@ -93,7 +91,7 @@ def unsubscribe(channels) @channels -= channels_for_slot # Check if this shard still has channels - remaining_channels_for_slot = @channels.select { |ch| @cluster_client.slot_for(ch) == slot } + remaining_channels_for_slot = @channels.select {|ch| @cluster_client.slot_for(ch) == slot} # If no channels left for this shard, close and remove it if remaining_channels_for_slot.empty? diff --git a/lib/async/redis/range_map.rb b/lib/async/redis/range_map.rb index cede5dd..782af27 100644 --- a/lib/async/redis/range_map.rb +++ b/lib/async/redis/range_map.rb @@ -8,7 +8,7 @@ class RangeMap def initialize @ranges = [] end - + # Add a range-value pair to the map. # @parameter range [Range] The range to map. # @parameter value [Object] The value to associate with the range. @@ -17,7 +17,7 @@ def add(range, value) @ranges << [range, value] return value end - + # Find the value associated with a key within any range. # @parameter key [Object] The key to find. # @yields {...} Block called if no range contains the key. @@ -31,7 +31,7 @@ def find(key) end return nil end - + # Iterate over all values in the map. # @yields {|value| ...} Block called for each value. # @parameter value [Object] The value from the range-value pair. @@ -40,7 +40,7 @@ def each yield value end end - + # Get a random value from the map. # @returns [Object] A randomly selected value, or nil if map is empty. def sample @@ -48,7 +48,7 @@ def sample range, value = @ranges.sample return value end - + # Clear all ranges from the map. def clear @ranges.clear From 464f3d37a399425b31c8fd349677db997f19955d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 10:04:23 +1200 Subject: [PATCH 08/11] Fix coverage test ruby version. --- .github/workflows/test-coverage.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-coverage.yaml b/.github/workflows/test-coverage.yaml index cf46137..5396f1c 100644 --- a/.github/workflows/test-coverage.yaml +++ b/.github/workflows/test-coverage.yaml @@ -60,7 +60,7 @@ jobs: - ubuntu ruby: - - ruby + - "3.4" steps: - uses: actions/checkout@v4 @@ -93,7 +93,7 @@ jobs: - ubuntu ruby: - - ruby + - "3.4" steps: - uses: actions/checkout@v4 From 7a22ef1b6374b4eaac1adb5711aba722241aa260 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 10:05:13 +1200 Subject: [PATCH 09/11] Rename `Subscribe` -> `Subscription`. --- async-redis.gemspec | 2 +- .../context/{subscribe.rb => subscription.rb} | 8 +- guides/subscriptions/readme.md | 221 +++++------------- lib/async/redis/client.rb | 20 +- lib/async/redis/cluster_client.rb | 64 +---- ...r_subscribe.rb => cluster_subscription.rb} | 2 +- .../context/{subscribe.rb => subscription.rb} | 2 +- .../context/{subscribe.rb => subscription.rb} | 2 +- 8 files changed, 82 insertions(+), 239 deletions(-) rename cluster/test/async/redis/context/{subscribe.rb => subscription.rb} (96%) rename lib/async/redis/{cluster_subscribe.rb => cluster_subscription.rb} (99%) rename lib/async/redis/context/{subscribe.rb => subscription.rb} (98%) rename test/async/redis/context/{subscribe.rb => subscription.rb} (97%) diff --git a/async-redis.gemspec b/async-redis.gemspec index bc41697..0f11fc6 100644 --- a/async-redis.gemspec +++ b/async-redis.gemspec @@ -28,5 +28,5 @@ Gem::Specification.new do |spec| spec.add_dependency "async-pool", "~> 0.2" spec.add_dependency "io-endpoint", "~> 0.10" spec.add_dependency "io-stream", "~> 0.4" - spec.add_dependency "protocol-redis", "~> 0.9" + spec.add_dependency "protocol-redis", "~> 0.11" end diff --git a/cluster/test/async/redis/context/subscribe.rb b/cluster/test/async/redis/context/subscription.rb similarity index 96% rename from cluster/test/async/redis/context/subscribe.rb rename to cluster/test/async/redis/context/subscription.rb index 58fcd50..46796d9 100644 --- a/cluster/test/async/redis/context/subscribe.rb +++ b/cluster/test/async/redis/context/subscription.rb @@ -8,7 +8,7 @@ require "sus/fixtures/async" require "securerandom" -describe Async::Redis::Context::Subscribe do +describe Async::Redis::Context::Subscription do include Sus::Fixtures::Async::ReactorContext with "in cluster environment" do @@ -34,7 +34,7 @@ # Set up the subscriber using cluster client's ssubscribe method subscriber_task = Async do - cluster.ssubscribe(shard_channel) do |context| + cluster.subscribe(shard_channel) do |context| ready.resolve type, name, message = context.listen @@ -49,9 +49,7 @@ publisher_task = Async do ready.wait - slot = cluster.slot_for(shard_channel) - publisher_client = cluster.client_for(slot) - publisher_client.call("SPUBLISH", shard_channel, shard_message) + cluster.publish(shard_channel, shard_message) end publisher_task.wait diff --git a/guides/subscriptions/readme.md b/guides/subscriptions/readme.md index 5a56150..2efed28 100644 --- a/guides/subscriptions/readme.md +++ b/guides/subscriptions/readme.md @@ -2,7 +2,13 @@ This guide explains how to use Redis pub/sub functionality with `async-redis` to publish and subscribe to messages. -## Usage +## Overview + +Redis actually has 3 mechanisms to support pub/sub - a general `SUBSCRIBE` command, a pattern-based `PSUBSCRIBE` command, and a sharded `SSUBSCRIBE` command for cluster environments. They mostly work the same way, but have different use cases. + +## Subscribe + +The `SUBSCRIBE` command is used to subscribe to one or more channels. When a message is published to a subscribed channel, the client receives the message in real-time. First, let's create a simple listener that subscribes to messages on a channel: @@ -21,8 +27,6 @@ Async do puts "Received: #{message}" end -ensure - client.close end ``` @@ -37,10 +41,8 @@ client = Async::Redis::Client.new(endpoint) Async do puts "Publishing message..." - client.publish 'status.frontend', 'good' + cluster_client.publish 'status.frontend', 'good' puts "Message sent!" -ensure - client.close end ``` @@ -52,189 +54,80 @@ Listening for messages on 'status.frontend'... Received: good ``` -### How It Works - -**Listener:** -- Uses the `subscribe` method with a channel name and block. -- The block receives a context object for listening to messages. -- `context.listen` returns an array: `[type, name, message]`. -- Runs continuously waiting for messages. - -**Publisher:** -- Uses the `publish` method to send messages to a channel. -- Takes a channel name and message content. -- Sends the message and exits. - -**Channel Communication:** -- Both listener and publisher use the same channel name (`'status.frontend'`). -- Messages are delivered in real-time when published. -- Multiple listeners can subscribe to the same channel. - -## Message Format - -When you call `context.listen`, it returns an array with three elements: - -```ruby -type, name, message = context.listen -``` +## Pattern Subscribe -- **`type`**: The type of Redis pub/sub event. Common values include: - - `"message"` - A regular published message. - - `"subscribe"` - Confirmation that you've subscribed to a channel. - - `"unsubscribe"` - Confirmation that you've unsubscribed from a channel. - - `"pmessage"` - A message from a pattern subscription. - - `"smessage"` - A message from a sharded channel subscription. - -- **`name`**: The channel name where the message was received (e.g., `"status.frontend"`). - -- **`message`**: The actual message content that was published. - -**Note**: For pattern subscriptions (`pmessage`), the format is slightly different: -```ruby -type, pattern, name, message = context.listen -``` -Where `pattern` is the pattern that matched, and `name` is the actual channel name. - -### Example Output - -```ruby -client.subscribe 'notifications' do |context| - type, name, message = context.listen - puts "Type: #{type}, Channel: #{name}, Message: #{message}" -end - -# When someone publishes: client.publish('notifications', 'Hello World!') -# Output: Type: message, Channel: notifications, Message: Hello World! -``` +The `PSUBSCRIBE` command is used to subscribe to channels that match a given pattern. This allows clients to receive messages from multiple channels without subscribing to each one individually. -## Multiple Channels - -You can subscribe to multiple channels at once: +Let's replace the receiver in the above example: ``` ruby -client.subscribe 'channel1', 'channel2', 'channel3' do |context| - while true - type, name, message = context.listen - puts "Received on #{name}: #{message}" - end -end -``` - -## Pattern Subscriptions +require 'async' +require 'async/redis' -Redis also supports pattern-based subscriptions using `psubscribe`: +endpoint = Async::Redis.local_endpoint +client = Async::Redis::Client.new(endpoint) -``` ruby -client.psubscribe 'status.*' do |context| - while true - response = context.listen +Async do + client.psubscribe 'status.*' do |context| + puts "Listening for messages on 'status.*'..." - if response.first == "pmessage" - type, pattern, name, message = response - puts "Pattern #{pattern} matched channel #{name}: #{message}" - end - end -end -``` - -## Mixing Regular and Pattern Subscriptions - -You can mix regular channel subscriptions and pattern subscriptions on the same context: - -``` ruby -client.subscribe 'exact-channel' do |context| - # Add pattern subscription to the same context: - context.psubscribe(['pattern.*']) - - while true - response = context.listen + type, pattern, name, message = context.listen - case response.first - when "message" - type, name, message = response - puts "Regular message on #{name}: #{message}" - when "pmessage" - type, pattern, name, message = response - puts "Pattern #{pattern} matched #{name}: #{message}" - end + puts "Received: #{message}" end end ``` -## Sharded Subscriptions +Note that an extra field, `pattern` is returned when using `PSUBSCRIBE`. This field indicates the pattern that was matched for the incoming message. This can be useful for logging or debugging purposes, as it allows you to see which pattern triggered the message delivery. -Redis 7.0 introduced sharded pub/sub for better scalability in cluster environments. You can use sharded subscriptions with the same `Subscribe` context: +## Shard Subscribe -``` ruby -client.ssubscribe 'user-notifications' do |context| - while true - type, name, message = context.listen - puts "Sharded message on #{name}: #{message}" - end -end -``` +If you are working with a clustered environment, you can improve performance by limiting the scope of your subscriptions to specific shards. This can help reduce the amount of data that needs to be sent over the network and improve overall throughput. -**Key differences from regular pub/sub:** -- Messages are distributed across cluster nodes for better performance. -- Only supports exact channel names (no pattern matching). -- Same message format as regular subscriptions: `[type, channel, message]`. -- Requires Redis 7.0+ and works best in cluster mode. +To use sharded subscriptions, you can use the `SSUBSCRIBE` command, which allows you to subscribe to a specific shard: -## Mixing All Subscription Types +``` ruby +require 'async' +require 'async/redis' -Since all subscription types use the same `Subscribe` context, you can mix them freely: +# endpoints = ... +cluster_client = Async::Redis::ClusterClient.new(endpoints) -``` ruby -client.subscribe 'exact-channel' do |context| - # Add pattern and sharded subscriptions to the same context: - context.psubscribe(['pattern.*']) - context.ssubscribe(['shard-channel']) - - while true - response = context.listen +Async do + cluster_client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + type, name, message = context.listen - case response.first - when "message" - type, name, message = response - puts "Regular message on #{name}: #{message}" - when "pmessage" - type, pattern, name, message = response - puts "Pattern #{pattern} matched #{name}: #{message}" - when "smessage" - type, name, message = response - puts "Sharded message on #{name}: #{message}" - end + puts "Received: #{message}" end end ``` -## Important: Subscription Type Behavior - -Redis supports mixing different subscription types on the same connection: - -- ✅ **SUBSCRIBE + PSUBSCRIBE + SSUBSCRIBE**: All can be mixed on the same connection/context -- 🎯 **Unified Interface**: `async-redis` uses a single `Subscribe` context for all subscription types - -**Benefits of the unified approach:** -- **Simplicity**: One context handles all subscription types -- **Flexibility**: Mix any combination of subscription types as needed -- **Consistency**: Same `listen` method handles all message types -- **Convenience**: No need to manage multiple contexts for different subscription types - -## Error Handling +``` ruby +require 'async' +require 'async/redis' -Always ensure proper cleanup of Redis connections: +# endpoints = ... +cluster_client = Async::Redis::ClusterClient.new(endpoints) -``` ruby Async do - begin - client.subscribe 'my-channel' do |context| - # Handle messages... - end - rescue => error - puts "Subscription error: #{error}" - ensure - client.close - end + puts "Publishing message..." + cluster_client.publish('status.frontend', 'good') + puts "Message sent!" end ``` + +To see pub/sub in action, you can run the listener in one terminal and the publisher in another. The listener will receive any messages sent by the publisher to the `status.frontend` channel: + +```bash +$ ruby listener.rb +Listening for messages on 'status.frontend'... +Received: good +``` + +```bash +$ ruby publisher.rb +Publishing message... +Message sent! +``` \ No newline at end of file diff --git a/lib/async/redis/client.rb b/lib/async/redis/client.rb index 83e0103..10b5d21 100755 --- a/lib/async/redis/client.rb +++ b/lib/async/redis/client.rb @@ -9,7 +9,7 @@ require_relative "context/pipeline" require_relative "context/transaction" -require_relative "context/subscribe" +require_relative "context/subscription" require_relative "endpoint" require "io/endpoint/host_endpoint" @@ -32,11 +32,11 @@ module Methods # Subscribe to one or more channels for pub/sub messaging. # @parameter channels [Array(String)] The channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. + # @parameter context [Context::Subscription] The subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. + # @returns [Context::Subscription] The subscription context if no block given. def subscribe(*channels) - context = Context::Subscribe.new(@pool, channels) + context = Context::Subscription.new(@pool, channels) return context unless block_given? @@ -50,11 +50,11 @@ def subscribe(*channels) # Subscribe to one or more channel patterns for pub/sub messaging. # @parameter patterns [Array(String)] The channel patterns to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. + # @parameter context [Context::Subscription] The subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. + # @returns [Context::Subscription] The subscription context if no block given. def psubscribe(*patterns) - context = Context::Subscribe.new(@pool, []) + context = Context::Subscription.new(@pool, []) context.psubscribe(patterns) return context unless block_given? @@ -69,11 +69,11 @@ def psubscribe(*patterns) # Subscribe to one or more sharded channels for pub/sub messaging (Redis 7.0+). # @parameter channels [Array(String)] The sharded channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. + # @parameter context [Context::Subscription] The subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. + # @returns [Context::Subscription] The subscription context if no block given. def ssubscribe(*channels) - context = Context::Subscribe.new(@pool, []) + context = Context::Subscription.new(@pool, []) context.ssubscribe(channels) return context unless block_given? diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 102172b..2862b13 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -5,14 +5,18 @@ # Copyright, 2025, by Travis Bell. require_relative "client" -require_relative "cluster_subscribe" +require_relative "cluster_subscription" require_relative "range_map" require "io/stream" +require "protocol/redis/cluster/methods" + module Async module Redis # A Redis cluster client that manages multiple Redis instances and handles cluster operations. class ClusterClient + include ::Protocol::Redis::Cluster::Methods + # Raised when cluster configuration cannot be reloaded. class ReloadError < StandardError end @@ -220,67 +224,15 @@ def slots_for(keys) return slots end - # Subscribe to one or more channels for pub/sub messaging in cluster environment. - # - # NOTE: Regular pub/sub in Redis Cluster is GLOBAL - messages propagate to all nodes. - # This method is a convenience that subscribes via an arbitrary cluster node. - # The choice of node does not affect which messages you receive, since regular - # pub/sub messages are broadcast to all nodes in the cluster. - # - # For slot-aware pub/sub, use ssubscribe() instead (Redis 7.0+). - # - # @parameter channels [Array(String)] The channels to subscribe to. - # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. - # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. - def subscribe(*channels) - # For regular pub/sub, use any available node since messages are global - client = any_client - - client.subscribe(*channels) do |context| - if block_given? - yield context - else - return context - end - end - end - - # Subscribe to one or more channel patterns for pub/sub messaging in cluster environment. - # - # NOTE: Pattern subscriptions in Redis Cluster are GLOBAL - they match channels - # across all nodes. This method is a convenience that subscribes via an arbitrary - # cluster node. Pattern matching works across the entire cluster regardless of - # which node you subscribe from. - # - # @parameter patterns [Array(String)] The channel patterns to subscribe to. - # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. - # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. - def psubscribe(*patterns) - # For pattern subscriptions, use any available node since patterns are global - client = any_client - - client.psubscribe(*patterns) do |context| - if block_given? - yield context - else - return context - end - end - end - - # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment (Redis 7.0+). + # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment. # The subscription will be created on the appropriate nodes responsible for each channel's hash slot. # @parameter channels [Array(String)] The sharded channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. # @parameter context [Context::ShardSubscribe] The shard subscription context. # @returns [Object] The result of the block if block given. # @returns [Context::ShardSubscribe] The shard subscription context if no block given. - def ssubscribe(*channels) - context = ClusterSubscribe.new(self) + def subscribe(*channels) + context = ClusterSubscription.new(self) if channels.any? context.subscribe(channels) diff --git a/lib/async/redis/cluster_subscribe.rb b/lib/async/redis/cluster_subscription.rb similarity index 99% rename from lib/async/redis/cluster_subscribe.rb rename to lib/async/redis/cluster_subscription.rb index acb55ee..41fee54 100644 --- a/lib/async/redis/cluster_subscribe.rb +++ b/lib/async/redis/cluster_subscription.rb @@ -8,7 +8,7 @@ module Redis # Context for managing sharded subscriptions across multiple Redis cluster nodes. # This class handles the complexity of subscribing to channels that may be distributed # across different shards in a Redis cluster. - class ClusterSubscribe + class ClusterSubscription # Initialize a new shard subscription context. # @parameter cluster_client [ClusterClient] The cluster client to use. def initialize(cluster_client) diff --git a/lib/async/redis/context/subscribe.rb b/lib/async/redis/context/subscription.rb similarity index 98% rename from lib/async/redis/context/subscribe.rb rename to lib/async/redis/context/subscription.rb index bb88740..eb5b571 100644 --- a/lib/async/redis/context/subscribe.rb +++ b/lib/async/redis/context/subscription.rb @@ -10,7 +10,7 @@ module Async module Redis module Context # Context for Redis pub/sub subscription operations. - class Subscribe < Generic + class Subscription < Generic MESSAGE = "message" PMESSAGE = "pmessage" SMESSAGE = "smessage" diff --git a/test/async/redis/context/subscribe.rb b/test/async/redis/context/subscription.rb similarity index 97% rename from test/async/redis/context/subscribe.rb rename to test/async/redis/context/subscription.rb index 1a9a6d9..e9ca3cb 100644 --- a/test/async/redis/context/subscribe.rb +++ b/test/async/redis/context/subscription.rb @@ -7,7 +7,7 @@ require "sus/fixtures/async" require "securerandom" -describe Async::Redis::Context::Subscribe do +describe Async::Redis::Context::Subscription do include Sus::Fixtures::Async::ReactorContext let(:endpoint) {Async::Redis.local_endpoint} From 7b442ba2fb22cf0ed6619d6a5f71839cdf79e9bd Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 15:37:52 +1200 Subject: [PATCH 10/11] Tidy up subscription handling. --- guides/subscriptions/readme.md | 26 +++------------ lib/async/redis/cluster_subscription.rb | 41 ++++++++++++------------ lib/async/redis/context/generic.rb | 4 +-- lib/async/redis/context/subscription.rb | 8 +++-- test/async/redis/context/subscription.rb | 20 ++++++++++++ 5 files changed, 53 insertions(+), 46 deletions(-) diff --git a/guides/subscriptions/readme.md b/guides/subscriptions/readme.md index 2efed28..577250f 100644 --- a/guides/subscriptions/readme.md +++ b/guides/subscriptions/readme.md @@ -16,8 +16,7 @@ First, let's create a simple listener that subscribes to messages on a channel: require 'async' require 'async/redis' -endpoint = Async::Redis.local_endpoint -client = Async::Redis::Client.new(endpoint) +client = Async::Redis::Client.new Async do client.subscribe 'status.frontend' do |context| @@ -36,12 +35,11 @@ Now, let's create a publisher that sends messages to the same channel: require 'async' require 'async/redis' -endpoint = Async::Redis.local_endpoint -client = Async::Redis::Client.new(endpoint) +client = Async::Redis::Client.new Async do puts "Publishing message..." - cluster_client.publish 'status.frontend', 'good' + client.publish 'status.frontend', 'good' puts "Message sent!" end ``` @@ -82,9 +80,9 @@ Note that an extra field, `pattern` is returned when using `PSUBSCRIBE`. This fi ## Shard Subscribe -If you are working with a clustered environment, you can improve performance by limiting the scope of your subscriptions to specific shards. This can help reduce the amount of data that needs to be sent over the network and improve overall throughput. +If you are working with a clustered environment, you can improve performance by limiting the scope of your subscriptions to specific shards. This can help reduce the amount of data that needs to be sent between shards and improve overall throughput. -To use sharded subscriptions, you can use the `SSUBSCRIBE` command, which allows you to subscribe to a specific shard: +To use sharded subscriptions, use a cluster client which supports sharded pub/sub: ``` ruby require 'async' @@ -117,17 +115,3 @@ Async do puts "Message sent!" end ``` - -To see pub/sub in action, you can run the listener in one terminal and the publisher in another. The listener will receive any messages sent by the publisher to the `status.frontend` channel: - -```bash -$ ruby listener.rb -Listening for messages on 'status.frontend'... -Received: good -``` - -```bash -$ ruby publisher.rb -Publishing message... -Message sent! -``` \ No newline at end of file diff --git a/lib/async/redis/cluster_subscription.rb b/lib/async/redis/cluster_subscription.rb index 41fee54..ed3b42b 100644 --- a/lib/async/redis/cluster_subscription.rb +++ b/lib/async/redis/cluster_subscription.rb @@ -3,6 +3,9 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. +require "async/limited_queue" +require "async/barrier" + module Async module Redis # Context for managing sharded subscriptions across multiple Redis cluster nodes. @@ -11,14 +14,22 @@ module Redis class ClusterSubscription # Initialize a new shard subscription context. # @parameter cluster_client [ClusterClient] The cluster client to use. - def initialize(cluster_client) + def initialize(cluster_client, queue: Async::LimitedQueue.new) @cluster_client = cluster_client @subscriptions = {} @channels = [] + + @barrier = Async::Barrier.new + @queue = queue end # Close all shard subscriptions. def close + if barrier = @barrier + @barrier = nil + barrier.stop + end + @subscriptions.each_value(&:close) @subscriptions.clear end @@ -27,25 +38,7 @@ def close # This uses a simple round-robin approach to check each shard. # @returns [Array] The next message response, or nil if all connections closed. def listen - return nil if @subscriptions.empty? - - # Simple round-robin checking of subscriptions - @subscriptions.each_value do |subscription| - # Non-blocking check for messages - begin - if response = subscription.listen - return response - end - rescue => error - # Handle connection errors gracefully - Console.warn(self, "Error reading from shard subscription: #{error}") - end - end - - # If no immediate messages, do a blocking wait on the first subscription - if first_subscription = @subscriptions.values.first - first_subscription.listen - end + @queue.pop end # Iterate over all messages from all subscribed shards. @@ -71,7 +64,13 @@ def subscribe(channels) else # Create new subscription for this shard client = @cluster_client.client_for(slot) - @subscriptions[slot] = client.ssubscribe(*channels_for_slot) + subscription = @subscriptions[slot] = client.ssubscribe(*channels_for_slot) + + @barrier.async do + while true + @queue << subscription.listen + end + end end end diff --git a/lib/async/redis/context/generic.rb b/lib/async/redis/context/generic.rb index 63ef471..70dc390 100644 --- a/lib/async/redis/context/generic.rb +++ b/lib/async/redis/context/generic.rb @@ -22,9 +22,9 @@ def initialize(pool, *arguments) # Close the context and release the connection back to the pool. def close - if @connection - @pool.release(@connection) + if connection = @connection @connection = nil + @pool.release(connection) end end diff --git a/lib/async/redis/context/subscription.rb b/lib/async/redis/context/subscription.rb index eb5b571..fe29abe 100644 --- a/lib/async/redis/context/subscription.rb +++ b/lib/async/redis/context/subscription.rb @@ -26,7 +26,7 @@ def initialize(pool, channels) # Close the subscription context. def close - # There is no way to reset subscription state. On Redis v6+ you can use RESET, but this is not supported in <= v6. + # This causes anyone calling `#listen` to exit, as `read_response` will fail. If we decided to use `RESET` instead, we'd need to take that into account. @connection&.close super @@ -36,7 +36,11 @@ def close # @returns [Array] The next message response, or nil if connection closed. def listen while response = @connection.read_response - return response if response.first == MESSAGE || response.first == PMESSAGE || response.first == SMESSAGE + type = response.first + + if type == MESSAGE || type == PMESSAGE || type == SMESSAGE + return response + end end end diff --git a/test/async/redis/context/subscription.rb b/test/async/redis/context/subscription.rb index e9ca3cb..848b68d 100644 --- a/test/async/redis/context/subscription.rb +++ b/test/async/redis/context/subscription.rb @@ -90,4 +90,24 @@ subscription.close end end + + with "#close" do + it "causes #listen to exit" do + skip_unless_minimum_ruby_version("3.5") + + subscription = client.subscribe(news_channel) + error = nil + + listener = reactor.async do + subscription.listen + rescue => error + # Ignore. + end + + subscription.close + listener.wait + + expect(error).to be_a(IOError) + end + end end From bb19689c155d722d4bd3b541a1c7fe2f7751c451 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 Aug 2025 17:34:35 +1200 Subject: [PATCH 11/11] Error handling. --- guides/subscriptions/readme.md | 44 +++++++++++++++++++++++++ lib/async/redis/cluster_client.rb | 5 +-- lib/async/redis/cluster_subscription.rb | 20 ++++++++--- lib/async/redis/context/generic.rb | 5 +++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/guides/subscriptions/readme.md b/guides/subscriptions/readme.md index 577250f..816ce1e 100644 --- a/guides/subscriptions/readme.md +++ b/guides/subscriptions/readme.md @@ -52,6 +52,31 @@ Listening for messages on 'status.frontend'... Received: good ``` +### Error Handling + +Subscriptions are at-most-once delivery. In addition, subscriptions are stateful, meaning that they maintain their own internal state and can be affected by network issues or server restarts. In order to improve resilience, it's important to implement error handling and reconnection logic. + +```ruby +require 'async' +require 'async/redis' + +client = Async::Redis::Client.new + +Async do + client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + context.each do |type, name, message| + puts "Received: #{message}" + end + end +rescue => error + Console.warn(self, "Subscription failed", error) + sleep 1 + retry +end +``` + ## Pattern Subscribe The `PSUBSCRIBE` command is used to subscribe to channels that match a given pattern. This allows clients to receive messages from multiple channels without subscribing to each one individually. @@ -115,3 +140,22 @@ Async do puts "Message sent!" end ``` + +### Clustered Subscriptions + +While general `PUBLISH` and `SUBSCRIBE` will work on a cluster, they are less efficient as they require inter-shard communication. By default, the {ruby Async::Redis::ClusterClient} subscription mechanism defaults to `SSUBSCRIBE` and `SPUBLISH`, which are optimized for sharded environments. However, if using multiple subscriptions, internally, several connections will be made to the relevant shards, which increases the complexity. + +#### Cluster Topology Changes and Subscription Invalidation + +If the cluster is re-configured (e.g. adding or removing nodes, resharding), the subscription state may need to be re-established to account for the new topology. During this process, messages may be lost. This is expected as subscriptions are stateless. + +**Important**: When any individual shard subscription fails (due to resharding, node failures, or network issues), the entire cluster subscription is invalidated and will stop delivering messages. This design ensures consistency and prevents partial subscription states that could lead to missed messages on some shards. + +Common scenarios that trigger subscription invalidation: + +- **Resharding operations**: When slots are migrated between nodes (`MOVED` errors) +- **Node failures**: When Redis nodes become unavailable +- **Network partitions**: When connections to specific shards are lost +- **Cluster reconfiguration**: When the cluster topology changes + +Applications should be prepared to handle subscription failures and implement appropriate retry strategies. diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index 2862b13..5c3db86 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -226,11 +226,12 @@ def slots_for(keys) # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment. # The subscription will be created on the appropriate nodes responsible for each channel's hash slot. + # # @parameter channels [Array(String)] The sharded channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::ShardSubscribe] The shard subscription context. + # @parameter context [ClusterSubscription] The cluster subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::ShardSubscribe] The shard subscription context if no block given. + # @returns [ClusterSubscription] The cluster subscription context if no block given. def subscribe(*channels) context = ClusterSubscription.new(self) diff --git a/lib/async/redis/cluster_subscription.rb b/lib/async/redis/cluster_subscription.rb index ed3b42b..8ae20d2 100644 --- a/lib/async/redis/cluster_subscription.rb +++ b/lib/async/redis/cluster_subscription.rb @@ -12,6 +12,10 @@ module Redis # This class handles the complexity of subscribing to channels that may be distributed # across different shards in a Redis cluster. class ClusterSubscription + # Represents a failure in the subscription process, e.g. network issues, shard failures. + class SubscriptionError < StandardError + end + # Initialize a new shard subscription context. # @parameter cluster_client [ClusterClient] The cluster client to use. def initialize(cluster_client, queue: Async::LimitedQueue.new) @@ -35,10 +39,12 @@ def close end # Listen for the next message from any subscribed shard. - # This uses a simple round-robin approach to check each shard. - # @returns [Array] The next message response, or nil if all connections closed. + # @returns [Array] The next message response. + # @raises [SubscriptionError] If the subscription has failed for any reason. def listen @queue.pop + rescue => error + raise SubscriptionError, "Failed to read message!" end # Iterate over all messages from all subscribed shards. @@ -67,9 +73,15 @@ def subscribe(channels) subscription = @subscriptions[slot] = client.ssubscribe(*channels_for_slot) @barrier.async do - while true + # This is optimistic, in other words, subscription.listen will also fail on close. + until subscription.closed? @queue << subscription.listen end + ensure + # If we are exiting here for any reason OTHER than the subscription was closed, we need to re-create the subscription state: + unless subscription.closed? + @queue.close + end end end end @@ -94,8 +106,8 @@ def unsubscribe(channels) # If no channels left for this shard, close and remove it if remaining_channels_for_slot.empty? - subscription.close @subscriptions.delete(slot) + subscription.close end end end diff --git a/lib/async/redis/context/generic.rb b/lib/async/redis/context/generic.rb index 70dc390..60216e1 100644 --- a/lib/async/redis/context/generic.rb +++ b/lib/async/redis/context/generic.rb @@ -28,6 +28,11 @@ def close end end + # @returns [Boolean] Whether the context is closed. + def closed? + @connection.nil? + end + # Write a Redis command request to the connection. # @parameter command [String] The Redis command. # @parameter arguments [Array] The command arguments.