diff --git a/.github/workflows/test-coverage.yaml b/.github/workflows/test-coverage.yaml index cf46137..5396f1c 100644 --- a/.github/workflows/test-coverage.yaml +++ b/.github/workflows/test-coverage.yaml @@ -60,7 +60,7 @@ jobs: - ubuntu ruby: - - ruby + - "3.4" steps: - uses: actions/checkout@v4 @@ -93,7 +93,7 @@ jobs: - ubuntu ruby: - - ruby + - "3.4" steps: - uses: actions/checkout@v4 diff --git a/async-redis.gemspec b/async-redis.gemspec index 936c443..0f11fc6 100644 --- a/async-redis.gemspec +++ b/async-redis.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |spec| "source_code_uri" => "https://github.com/socketry/async-redis.git", } - spec.files = Dir.glob(["{lib}/**/*", "*.md"], File::FNM_DOTMATCH, base: __dir__) + spec.files = Dir.glob(["{context,lib}/**/*", "*.md"], File::FNM_DOTMATCH, base: __dir__) spec.required_ruby_version = ">= 3.2" @@ -28,5 +28,5 @@ Gem::Specification.new do |spec| spec.add_dependency "async-pool", "~> 0.2" spec.add_dependency "io-endpoint", "~> 0.10" spec.add_dependency "io-stream", "~> 0.4" - spec.add_dependency "protocol-redis", "~> 0.9" + spec.add_dependency "protocol-redis", "~> 0.11" end diff --git a/cluster/test/async/redis/context/subscription.rb b/cluster/test/async/redis/context/subscription.rb new file mode 100644 index 0000000..46796d9 --- /dev/null +++ b/cluster/test/async/redis/context/subscription.rb @@ -0,0 +1,253 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/variable" +require "async/redis/cluster_client" +require "sus/fixtures/async" +require "securerandom" + +describe Async::Redis::Context::Subscription do + include Sus::Fixtures::Async::ReactorContext + + with "in cluster environment" do + let(:node_a) {"redis://redis-a:6379"} + let(:node_b) {"redis://redis-b:6379"} + let(:node_c) {"redis://redis-c:6379"} + + let(:endpoints) {[ + Async::Redis::Endpoint.parse(node_a), + Async::Redis::Endpoint.parse(node_b), + Async::Redis::Endpoint.parse(node_c) + ]} + + let(:cluster) {Async::Redis::ClusterClient.new(endpoints)} + + with "sharded subscriptions" do + let(:shard_channel) {"cluster-shard:test:#{SecureRandom.uuid}"} + let(:shard_message) {"cluster sharded message"} + + it "can subscribe to sharded channels and receive messages" do + received_message = nil + ready = Async::Variable.new + + # Set up the subscriber using cluster client's ssubscribe method + subscriber_task = Async do + cluster.subscribe(shard_channel) do |context| + ready.resolve + + type, name, message = context.listen + + expect(type).to be == "smessage" + expect(name).to be == shard_channel + received_message = message + end + end + + # Set up the publisher + publisher_task = Async do + ready.wait + + cluster.publish(shard_channel, shard_message) + end + + publisher_task.wait + subscriber_task.wait + + expect(received_message).to be == shard_message + end + + it "distributes sharded messages across cluster nodes" do + # This test verifies that sharded pub/sub properly distributes + # messages across different cluster nodes based on channel hash + + channels = [ + "shard:node:a:#{SecureRandom.uuid}", + "shard:node:b:#{SecureRandom.uuid}", + "shard:node:c:#{SecureRandom.uuid}" + ] + + # Find channels that map to different slots/nodes + channel_slots = channels.map {|channel| [channel, cluster.slot_for(channel)]} + unique_slots = channel_slots.map(&:last).uniq + + # We should have channels distributed across different slots + expect(unique_slots.size).to be > 1 + + received_messages = [] + ready = Async::Variable.new + subscriber_count = 0 + target_count = channels.size + + # Set up subscribers for each channel + subscriber_tasks = channels.map do |channel| + Async do + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + client.ssubscribe(channel) do |context| + subscriber_count += 1 + ready.resolve if subscriber_count == target_count + + type, name, message = context.listen + received_messages << {channel: name, message: message, slot: slot} + end + end + end + + # Set up publisher + publisher_task = Async do + ready.wait # Wait for all subscribers + + channels.each_with_index do |channel, index| + slot = cluster.slot_for(channel) + client = cluster.client_for(slot) + + client.call("SPUBLISH", channel, "message-#{index}") + end + end + + publisher_task.wait + subscriber_tasks.each(&:wait) + + # Verify we received messages for different channels + expect(received_messages.size).to be == channels.size + + # Verify messages were distributed to different slots + received_slots = received_messages.map {|msg| msg[:slot]}.uniq + expect(received_slots.size).to be > 1 + end + + it "can mix sharded and regular subscriptions on different nodes" do + regular_channel = "regular:#{SecureRandom.uuid}" + shard_channel = "shard:#{SecureRandom.uuid}" + + regular_slot = cluster.slot_for(regular_channel) + shard_slot = cluster.slot_for(shard_channel) + + received_messages = [] + condition = Async::Condition.new + ready_count = 0 + + # Regular subscription on one node + regular_task = reactor.async do + client = cluster.client_for(regular_slot) + client.subscribe(regular_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Sharded subscription on another node (if different) + shard_task = reactor.async do + client = cluster.client_for(shard_slot) + client.ssubscribe(shard_channel) do |context| + ready_count += 1 + condition.signal if ready_count == 2 + + type, name, message = context.listen + received_messages << {type: type, channel: name, message: message} + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait # Wait for both subscribers + + # Publish to regular channel + regular_client = cluster.client_for(regular_slot) + regular_client.publish(regular_channel, "regular message") + + # Publish to sharded channel + shard_client = cluster.client_for(shard_slot) + shard_client.call("SPUBLISH", shard_channel, "sharded message") + end + + publisher_task.wait + regular_task.wait + shard_task.wait + + # Should have received both messages + expect(received_messages.size).to be == 2 + + # Verify message types + message_types = received_messages.map {|msg| msg[:type]} + expect(message_types).to be(:include?, "message") # Regular pub/sub + expect(message_types).to be(:include?, "smessage") # Sharded pub/sub + end + + it "handles sharded subscription on same connection as regular subscription" do + # Test that the unified Subscribe context works in cluster environment + channel = "unified:test:#{SecureRandom.uuid}" + shard_channel = "shard:unified:#{SecureRandom.uuid}" + + # Check if both channels hash to the same slot + channel_slot = cluster.slot_for(channel) + shard_slot = cluster.slot_for(shard_channel) + + # For this test to work, both channels must be on the same node + # If they're not, we need to use the same hash tag to force them to the same slot + if channel_slot != shard_slot + # Use hash tags to force both channels to the same slot + base_key = "{unified:#{SecureRandom.uuid}}" + channel = "#{base_key}:regular" + shard_channel = "#{base_key}:shard" + + # Verify they now hash to the same slot + channel_slot = cluster.slot_for(channel) + shard_slot = cluster.slot_for(shard_channel) + expect(channel_slot).to be == shard_slot + end + + client = cluster.client_for(channel_slot) + + received_messages = [] + condition = Async::Condition.new + + # Set up unified subscription + subscriber_task = reactor.async do + client.subscribe(channel) do |context| + # Add sharded subscription to same context + context.ssubscribe([shard_channel]) + + condition.signal # Ready to receive + + # Listen for both message types + 2.times do + response = context.listen + received_messages << response + end + end + end + + # Publisher + publisher_task = reactor.async do + condition.wait + + # Both messages must be published from the same node (same slot) + publisher_client = cluster.client_for(channel_slot) + + # Publish regular message + publisher_client.publish(channel, "unified regular") + + # Publish sharded message + publisher_client.call("SPUBLISH", shard_channel, "unified sharded") + end + + publisher_task.wait + subscriber_task.wait + + # Should receive both message types on same context + expect(received_messages.size).to be == 2 + + message_types = received_messages.map(&:first) + expect(message_types).to be(:include?, "message") # Regular + expect(message_types).to be(:include?, "smessage") # Sharded + end + end + end +end diff --git a/gems.rb b/gems.rb index 33f28a0..3c3e552 100644 --- a/gems.rb +++ b/gems.rb @@ -14,6 +14,8 @@ gem "bake-gem" gem "bake-releases" + gem "agent-context" + gem "utopia-project" end diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index e728239..b599258 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -27,6 +27,9 @@ Async do client = Async::Redis::Client.new(endpoint) puts client.info + + client.set("mykey", "myvalue") + puts client.get("mykey") end ``` @@ -86,34 +89,6 @@ ensure end ``` -### Subscriptions - -``` ruby -require 'async' -require 'async/redis' - -endpoint = Async::Redis.local_endpoint -client = Async::Redis::Client.new(endpoint) +## Next Steps -Async do |task| - condition = Async::Condition.new - - publisher = task.async do - condition.wait - - client.publish 'status.frontend', 'good' - end - - subscriber = task.async do - client.subscribe 'status.frontend' do |context| - condition.signal # We are waiting for messages. - - type, name, message = context.listen - - pp type, name, message - end - end -ensure - client.close -end -``` \ No newline at end of file +- [Subscriptions](../subscriptions/) - Learn how to use Redis pub/sub functionality for real-time messaging. \ No newline at end of file diff --git a/guides/links.yaml b/guides/links.yaml index 7f527b0..e83ae52 100644 --- a/guides/links.yaml +++ b/guides/links.yaml @@ -1,2 +1,5 @@ getting-started: order: 1 + +subscriptions: + order: 2 diff --git a/guides/subscriptions/readme.md b/guides/subscriptions/readme.md new file mode 100644 index 0000000..816ce1e --- /dev/null +++ b/guides/subscriptions/readme.md @@ -0,0 +1,161 @@ +# Subscriptions + +This guide explains how to use Redis pub/sub functionality with `async-redis` to publish and subscribe to messages. + +## Overview + +Redis actually has 3 mechanisms to support pub/sub - a general `SUBSCRIBE` command, a pattern-based `PSUBSCRIBE` command, and a sharded `SSUBSCRIBE` command for cluster environments. They mostly work the same way, but have different use cases. + +## Subscribe + +The `SUBSCRIBE` command is used to subscribe to one or more channels. When a message is published to a subscribed channel, the client receives the message in real-time. + +First, let's create a simple listener that subscribes to messages on a channel: + +``` ruby +require 'async' +require 'async/redis' + +client = Async::Redis::Client.new + +Async do + client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + type, name, message = context.listen + + puts "Received: #{message}" + end +end +``` + +Now, let's create a publisher that sends messages to the same channel: + +``` ruby +require 'async' +require 'async/redis' + +client = Async::Redis::Client.new + +Async do + puts "Publishing message..." + client.publish 'status.frontend', 'good' + puts "Message sent!" +end +``` + +To see pub/sub in action, you can run the listener in one terminal and the publisher in another. The listener will receive any messages sent by the publisher to the `status.frontend` channel: + +```bash +$ ruby listener.rb +Listening for messages on 'status.frontend'... +Received: good +``` + +### Error Handling + +Subscriptions are at-most-once delivery. In addition, subscriptions are stateful, meaning that they maintain their own internal state and can be affected by network issues or server restarts. In order to improve resilience, it's important to implement error handling and reconnection logic. + +```ruby +require 'async' +require 'async/redis' + +client = Async::Redis::Client.new + +Async do + client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + context.each do |type, name, message| + puts "Received: #{message}" + end + end +rescue => error + Console.warn(self, "Subscription failed", error) + sleep 1 + retry +end +``` + +## Pattern Subscribe + +The `PSUBSCRIBE` command is used to subscribe to channels that match a given pattern. This allows clients to receive messages from multiple channels without subscribing to each one individually. + +Let's replace the receiver in the above example: + +``` ruby +require 'async' +require 'async/redis' + +endpoint = Async::Redis.local_endpoint +client = Async::Redis::Client.new(endpoint) + +Async do + client.psubscribe 'status.*' do |context| + puts "Listening for messages on 'status.*'..." + + type, pattern, name, message = context.listen + + puts "Received: #{message}" + end +end +``` + +Note that an extra field, `pattern` is returned when using `PSUBSCRIBE`. This field indicates the pattern that was matched for the incoming message. This can be useful for logging or debugging purposes, as it allows you to see which pattern triggered the message delivery. + +## Shard Subscribe + +If you are working with a clustered environment, you can improve performance by limiting the scope of your subscriptions to specific shards. This can help reduce the amount of data that needs to be sent between shards and improve overall throughput. + +To use sharded subscriptions, use a cluster client which supports sharded pub/sub: + +``` ruby +require 'async' +require 'async/redis' + +# endpoints = ... +cluster_client = Async::Redis::ClusterClient.new(endpoints) + +Async do + cluster_client.subscribe 'status.frontend' do |context| + puts "Listening for messages on 'status.frontend'..." + + type, name, message = context.listen + + puts "Received: #{message}" + end +end +``` + +``` ruby +require 'async' +require 'async/redis' + +# endpoints = ... +cluster_client = Async::Redis::ClusterClient.new(endpoints) + +Async do + puts "Publishing message..." + cluster_client.publish('status.frontend', 'good') + puts "Message sent!" +end +``` + +### Clustered Subscriptions + +While general `PUBLISH` and `SUBSCRIBE` will work on a cluster, they are less efficient as they require inter-shard communication. By default, the {ruby Async::Redis::ClusterClient} subscription mechanism defaults to `SSUBSCRIBE` and `SPUBLISH`, which are optimized for sharded environments. However, if using multiple subscriptions, internally, several connections will be made to the relevant shards, which increases the complexity. + +#### Cluster Topology Changes and Subscription Invalidation + +If the cluster is re-configured (e.g. adding or removing nodes, resharding), the subscription state may need to be re-established to account for the new topology. During this process, messages may be lost. This is expected as subscriptions are stateless. + +**Important**: When any individual shard subscription fails (due to resharding, node failures, or network issues), the entire cluster subscription is invalidated and will stop delivering messages. This design ensures consistency and prevents partial subscription states that could lead to missed messages on some shards. + +Common scenarios that trigger subscription invalidation: + +- **Resharding operations**: When slots are migrated between nodes (`MOVED` errors) +- **Node failures**: When Redis nodes become unavailable +- **Network partitions**: When connections to specific shards are lost +- **Cluster reconfiguration**: When the cluster topology changes + +Applications should be prepared to handle subscription failures and implement appropriate retry strategies. diff --git a/lib/async/redis/client.rb b/lib/async/redis/client.rb index 6621a44..10b5d21 100755 --- a/lib/async/redis/client.rb +++ b/lib/async/redis/client.rb @@ -9,7 +9,7 @@ require_relative "context/pipeline" require_relative "context/transaction" -require_relative "context/subscribe" +require_relative "context/subscription" require_relative "endpoint" require "io/endpoint/host_endpoint" @@ -32,11 +32,49 @@ module Methods # Subscribe to one or more channels for pub/sub messaging. # @parameter channels [Array(String)] The channels to subscribe to. # @yields {|context| ...} If a block is given, it will be executed within the subscription context. - # @parameter context [Context::Subscribe] The subscription context. + # @parameter context [Context::Subscription] The subscription context. # @returns [Object] The result of the block if block given. - # @returns [Context::Subscribe] The subscription context if no block given. + # @returns [Context::Subscription] The subscription context if no block given. def subscribe(*channels) - context = Context::Subscribe.new(@pool, channels) + context = Context::Subscription.new(@pool, channels) + + return context unless block_given? + + begin + yield context + ensure + context.close + end + end + + # Subscribe to one or more channel patterns for pub/sub messaging. + # @parameter patterns [Array(String)] The channel patterns to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscription] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscription] The subscription context if no block given. + def psubscribe(*patterns) + context = Context::Subscription.new(@pool, []) + context.psubscribe(patterns) + + return context unless block_given? + + begin + yield context + ensure + context.close + end + end + + # Subscribe to one or more sharded channels for pub/sub messaging (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [Context::Subscription] The subscription context. + # @returns [Object] The result of the block if block given. + # @returns [Context::Subscription] The subscription context if no block given. + def ssubscribe(*channels) + context = Context::Subscription.new(@pool, []) + context.ssubscribe(channels) return context unless block_given? diff --git a/lib/async/redis/cluster_client.rb b/lib/async/redis/cluster_client.rb index d57659b..5c3db86 100644 --- a/lib/async/redis/cluster_client.rb +++ b/lib/async/redis/cluster_client.rb @@ -5,12 +5,18 @@ # Copyright, 2025, by Travis Bell. require_relative "client" +require_relative "cluster_subscription" +require_relative "range_map" require "io/stream" +require "protocol/redis/cluster/methods" + module Async module Redis # A Redis cluster client that manages multiple Redis instances and handles cluster operations. class ClusterClient + include ::Protocol::Redis::Cluster::Methods + # Raised when cluster configuration cannot be reloaded. class ReloadError < StandardError end @@ -21,54 +27,6 @@ class SlotError < StandardError Node = Struct.new(:id, :endpoint, :role, :health, :client) - # A map that stores ranges and their associated values for efficient lookup. - class RangeMap - # Initialize a new RangeMap. - def initialize - @ranges = [] - end - - # Add a range-value pair to the map. - # @parameter range [Range] The range to map. - # @parameter value [Object] The value to associate with the range. - # @returns [Object] The added value. - def add(range, value) - @ranges << [range, value] - - return value - end - - # Find the value associated with a key within any range. - # @parameter key [Object] The key to find. - # @yields {...} Block called if no range contains the key. - # @returns [Object] The value if found, result of block if given, or nil. - def find(key) - @ranges.each do |range, value| - return value if range.include?(key) - end - - if block_given? - return yield - end - - return nil - end - - # Iterate over all values in the map. - # @yields {|value| ...} Block called for each value. - # @parameter value [Object] The value from the range-value pair. - def each - @ranges.each do |range, value| - yield value - end - end - - # Clear all ranges from the map. - def clear - @ranges.clear - end - end - # Create a new instance of the cluster client. # # @property endpoints [Array(Endpoint)] The list of cluster endpoints. @@ -127,13 +85,35 @@ def client_for(slot, role = :master) end end + # Get any available client from the cluster. + # This is useful for operations that don't require slot-specific routing, such as global pub/sub operations, INFO commands, or other cluster-wide operations. + # @parameter role [Symbol] The role of node to get (:master or :slave). + # @returns [Client] A Redis client for any available node. + def any_client(role = :master) + unless @shards + reload_cluster! + end + + # Sample a random shard to get better load distribution + if nodes = @shards.sample + nodes = nodes.select{|node| node.role == role} + + if node = nodes.sample + return (node.client ||= Client.new(node.endpoint, **@options)) + end + end + + # Fallback to slot 0 if sampling fails + client_for(0, role) + end + protected def reload_cluster!(endpoints = @endpoints) @endpoints.each do |endpoint| client = Client.new(endpoint, **@options) - shards = RangeMap.new + shards = Async::Redis::RangeMap.new endpoints = [] client.call("CLUSTER", "SHARDS").each do |shard| @@ -243,6 +223,32 @@ def slots_for(keys) return slots end + + # Subscribe to one or more sharded channels for pub/sub messaging in cluster environment. + # The subscription will be created on the appropriate nodes responsible for each channel's hash slot. + # + # @parameter channels [Array(String)] The sharded channels to subscribe to. + # @yields {|context| ...} If a block is given, it will be executed within the subscription context. + # @parameter context [ClusterSubscription] The cluster subscription context. + # @returns [Object] The result of the block if block given. + # @returns [ClusterSubscription] The cluster subscription context if no block given. + def subscribe(*channels) + context = ClusterSubscription.new(self) + + if channels.any? + context.subscribe(channels) + end + + if block_given? + begin + yield context + ensure + context.close + end + else + return context + end + end end end end diff --git a/lib/async/redis/cluster_subscription.rb b/lib/async/redis/cluster_subscription.rb new file mode 100644 index 0000000..8ae20d2 --- /dev/null +++ b/lib/async/redis/cluster_subscription.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/limited_queue" +require "async/barrier" + +module Async + module Redis + # Context for managing sharded subscriptions across multiple Redis cluster nodes. + # This class handles the complexity of subscribing to channels that may be distributed + # across different shards in a Redis cluster. + class ClusterSubscription + # Represents a failure in the subscription process, e.g. network issues, shard failures. + class SubscriptionError < StandardError + end + + # Initialize a new shard subscription context. + # @parameter cluster_client [ClusterClient] The cluster client to use. + def initialize(cluster_client, queue: Async::LimitedQueue.new) + @cluster_client = cluster_client + @subscriptions = {} + @channels = [] + + @barrier = Async::Barrier.new + @queue = queue + end + + # Close all shard subscriptions. + def close + if barrier = @barrier + @barrier = nil + barrier.stop + end + + @subscriptions.each_value(&:close) + @subscriptions.clear + end + + # Listen for the next message from any subscribed shard. + # @returns [Array] The next message response. + # @raises [SubscriptionError] If the subscription has failed for any reason. + def listen + @queue.pop + rescue => error + raise SubscriptionError, "Failed to read message!" + end + + # Iterate over all messages from all subscribed shards. + # @yields {|response| ...} Block called for each message. + # @parameter response [Array] The message response. + def each + return to_enum unless block_given? + + while response = self.listen + yield response + end + end + + # Subscribe to additional sharded channels. + # @parameter channels [Array(String)] The channels to subscribe to. + def subscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + # Add to existing subscription for this shard + subscription.ssubscribe(channels_for_slot) + else + # Create new subscription for this shard + client = @cluster_client.client_for(slot) + subscription = @subscriptions[slot] = client.ssubscribe(*channels_for_slot) + + @barrier.async do + # This is optimistic, in other words, subscription.listen will also fail on close. + until subscription.closed? + @queue << subscription.listen + end + ensure + # If we are exiting here for any reason OTHER than the subscription was closed, we need to re-create the subscription state: + unless subscription.closed? + @queue.close + end + end + end + end + + @channels.concat(channels) + end + + # Unsubscribe from sharded channels. + # @parameter channels [Array(String)] The channels to unsubscribe from. + def unsubscribe(channels) + slots = @cluster_client.slots_for(channels) + + slots.each do |slot, channels_for_slot| + if subscription = @subscriptions[slot] + subscription.sunsubscribe(channels_for_slot) + + # Remove channels from our tracking + @channels -= channels_for_slot + + # Check if this shard still has channels + remaining_channels_for_slot = @channels.select {|ch| @cluster_client.slot_for(ch) == slot} + + # If no channels left for this shard, close and remove it + if remaining_channels_for_slot.empty? + @subscriptions.delete(slot) + subscription.close + end + end + end + end + + # Get the list of currently subscribed channels. + # @returns [Array(String)] The list of subscribed channels. + def channels + @channels.dup + end + + # Get the number of active shard subscriptions. + # @returns [Integer] The number of shard connections. + def shard_count + @subscriptions.size + end + end + end +end diff --git a/lib/async/redis/context/generic.rb b/lib/async/redis/context/generic.rb index 63ef471..60216e1 100644 --- a/lib/async/redis/context/generic.rb +++ b/lib/async/redis/context/generic.rb @@ -22,12 +22,17 @@ def initialize(pool, *arguments) # Close the context and release the connection back to the pool. def close - if @connection - @pool.release(@connection) + if connection = @connection @connection = nil + @pool.release(connection) end end + # @returns [Boolean] Whether the context is closed. + def closed? + @connection.nil? + end + # Write a Redis command request to the connection. # @parameter command [String] The Redis command. # @parameter arguments [Array] The command arguments. diff --git a/lib/async/redis/context/subscribe.rb b/lib/async/redis/context/subscription.rb similarity index 53% rename from lib/async/redis/context/subscribe.rb rename to lib/async/redis/context/subscription.rb index dfbe8e4..fe29abe 100644 --- a/lib/async/redis/context/subscribe.rb +++ b/lib/async/redis/context/subscription.rb @@ -10,8 +10,10 @@ module Async module Redis module Context # Context for Redis pub/sub subscription operations. - class Subscribe < Generic + class Subscription < Generic MESSAGE = "message" + PMESSAGE = "pmessage" + SMESSAGE = "smessage" # Initialize a new subscription context. # @parameter pool [Pool] The connection pool to use. @@ -19,12 +21,12 @@ class Subscribe < Generic def initialize(pool, channels) super(pool) - subscribe(channels) + subscribe(channels) if channels.any? end # Close the subscription context. def close - # There is no way to reset subscription state. On Redis v6+ you can use RESET, but this is not supported in <= v6. + # This causes anyone calling `#listen` to exit, as `read_response` will fail. If we decided to use `RESET` instead, we'd need to take that into account. @connection&.close super @@ -34,7 +36,11 @@ def close # @returns [Array] The next message response, or nil if connection closed. def listen while response = @connection.read_response - return response if response.first == MESSAGE + type = response.first + + if type == MESSAGE || type == PMESSAGE || type == SMESSAGE + return response + end end end @@ -62,6 +68,34 @@ def unsubscribe(channels) @connection.write_request ["UNSUBSCRIBE", *channels] @connection.flush end + + # Subscribe to channel patterns. + # @parameter patterns [Array(String)] The channel patterns to subscribe to. + def psubscribe(patterns) + @connection.write_request ["PSUBSCRIBE", *patterns] + @connection.flush + end + + # Unsubscribe from channel patterns. + # @parameter patterns [Array(String)] The channel patterns to unsubscribe from. + def punsubscribe(patterns) + @connection.write_request ["PUNSUBSCRIBE", *patterns] + @connection.flush + end + + # Subscribe to sharded channels (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to subscribe to. + def ssubscribe(channels) + @connection.write_request ["SSUBSCRIBE", *channels] + @connection.flush + end + + # Unsubscribe from sharded channels (Redis 7.0+). + # @parameter channels [Array(String)] The sharded channels to unsubscribe from. + def sunsubscribe(channels) + @connection.write_request ["SUNSUBSCRIBE", *channels] + @connection.flush + end end end end diff --git a/lib/async/redis/endpoint.rb b/lib/async/redis/endpoint.rb index 0ca9596..8cfb31f 100644 --- a/lib/async/redis/endpoint.rb +++ b/lib/async/redis/endpoint.rb @@ -210,6 +210,8 @@ def database end end + # Get the credentials for authentication. + # @returns [Array(String) | Nil] The username and password credentials or nil if not specified. def credentials @options[:credentials] || extract_userinfo(@url.userinfo) end @@ -224,6 +226,8 @@ def credentials end end + # Check if the endpoint is connecting to localhost. + # @returns [Boolean] True if connecting to localhost. def localhost? @url.hostname =~ /^(.*?\.)?localhost\.?$/ end @@ -237,6 +241,8 @@ def ssl_verify_mode end end + # Get the SSL context for secure connections. + # @returns [OpenSSL::SSL::SSLContext] The SSL context configured for this endpoint. def ssl_context @options[:ssl_context] || OpenSSL::SSL::SSLContext.new.tap do |context| context.set_params( @@ -245,6 +251,9 @@ def ssl_context end end + # Build the underlying endpoint with optional SSL wrapping. + # @parameter endpoint [IO::Endpoint] Optional base endpoint to wrap. + # @returns [IO::Endpoint] The built endpoint, potentially wrapped with SSL. def build_endpoint(endpoint = nil) endpoint ||= tcp_endpoint @@ -260,22 +269,33 @@ def build_endpoint(endpoint = nil) return endpoint end + # Get the underlying endpoint, building it if necessary. + # @returns [IO::Endpoint] The underlying endpoint for connections. def endpoint @endpoint ||= build_endpoint end + # Set the underlying endpoint. + # @parameter endpoint [IO::Endpoint] The endpoint to wrap and use. def endpoint=(endpoint) @endpoint = build_endpoint(endpoint) end + # Bind to the endpoint and yield the server socket. + # @parameter arguments [Array] Arguments to pass to the underlying endpoint bind method. + # @yields [IO] The bound server socket. def bind(*arguments, &block) endpoint.bind(*arguments, &block) end + # Connect to the endpoint and yield the client socket. + # @yields [IO] The connected client socket. def connect(&block) endpoint.connect(&block) end + # Iterate over each possible endpoint variation. + # @yields [Endpoint] Each endpoint variant. def each return to_enum unless block_given? @@ -284,14 +304,21 @@ def each end end + # Get the key for hashing and equality comparison. + # @returns [Array] The key components for this endpoint. def key [@url, @options] end + # Check if this endpoint is equal to another. + # @parameter other [Endpoint] The other endpoint to compare with. + # @returns [Boolean] True if the endpoints are equal. def eql? other self.key.eql? other.key end + # Get the hash code for this endpoint. + # @returns [Integer] The hash code based on the endpoint's key. def hash self.key.hash end diff --git a/lib/async/redis/range_map.rb b/lib/async/redis/range_map.rb new file mode 100644 index 0000000..782af27 --- /dev/null +++ b/lib/async/redis/range_map.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +module Async + module Redis + # A map that stores ranges and their associated values for efficient lookup. + class RangeMap + # Initialize a new RangeMap. + def initialize + @ranges = [] + end + + # Add a range-value pair to the map. + # @parameter range [Range] The range to map. + # @parameter value [Object] The value to associate with the range. + # @returns [Object] The added value. + def add(range, value) + @ranges << [range, value] + return value + end + + # Find the value associated with a key within any range. + # @parameter key [Object] The key to find. + # @yields {...} Block called if no range contains the key. + # @returns [Object] The value if found, result of block if given, or nil. + def find(key) + @ranges.each do |range, value| + return value if range.include?(key) + end + if block_given? + return yield + end + return nil + end + + # Iterate over all values in the map. + # @yields {|value| ...} Block called for each value. + # @parameter value [Object] The value from the range-value pair. + def each + @ranges.each do |range, value| + yield value + end + end + + # Get a random value from the map. + # @returns [Object] A randomly selected value, or nil if map is empty. + def sample + return nil if @ranges.empty? + range, value = @ranges.sample + return value + end + + # Clear all ranges from the map. + def clear + @ranges.clear + end + end + end +end diff --git a/releases.md b/releases.md index 426e6c9..b6e0b55 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,11 @@ # Releases +## Unreleased + + - Add agent context. + - Add support for pattern pub/sub. + - Add support for sharded pub/sub. + ## v0.11.2 - Fix handling of IPv6 address literals, including those returned by Redis Cluster / Sentinel. diff --git a/test/async/redis/context/subscribe.rb b/test/async/redis/context/subscription.rb similarity index 85% rename from test/async/redis/context/subscribe.rb rename to test/async/redis/context/subscription.rb index 1a9a6d9..848b68d 100644 --- a/test/async/redis/context/subscribe.rb +++ b/test/async/redis/context/subscription.rb @@ -7,7 +7,7 @@ require "sus/fixtures/async" require "securerandom" -describe Async::Redis::Context::Subscribe do +describe Async::Redis::Context::Subscription do include Sus::Fixtures::Async::ReactorContext let(:endpoint) {Async::Redis.local_endpoint} @@ -90,4 +90,24 @@ subscription.close end end + + with "#close" do + it "causes #listen to exit" do + skip_unless_minimum_ruby_version("3.5") + + subscription = client.subscribe(news_channel) + error = nil + + listener = reactor.async do + subscription.listen + rescue => error + # Ignore. + end + + subscription.close + listener.wait + + expect(error).to be_a(IOError) + end + end end