diff --git a/lib/async/redis/endpoint.rb b/lib/async/redis/endpoint.rb index 8cfb31f..a76e334 100644 --- a/lib/async/redis/endpoint.rb +++ b/lib/async/redis/endpoint.rb @@ -58,11 +58,19 @@ def self.parse(string, endpoint = nil, **options) end # Construct an endpoint with a specified scheme, hostname, optional path, and options. + # If no scheme is provided, it will be auto-detected based on SSL context. # - # @parameter scheme [String] The scheme to use, e.g. "redis" or "rediss". + # @parameter scheme [String, nil] The scheme to use, e.g. "redis" or "rediss". If nil, will auto-detect. # @parameter hostname [String] The hostname to connect to (or bind to). # @parameter options [Hash] Additional options, passed to {#initialize}. def self.for(scheme, host, credentials: nil, port: nil, database: nil, **options) + # Auto-detect scheme if not provided: + if default_scheme = options.delete(:scheme) + scheme ||= default_scheme + end + + scheme ||= options.key?(:ssl_context) ? "rediss" : "redis" + uri_klass = SCHEMES.fetch(scheme.downcase) do raise ArgumentError, "Unsupported scheme: #{scheme.inspect}" end diff --git a/lib/async/redis/sentinel_client.rb b/lib/async/redis/sentinel_client.rb index d44dc60..5de8d62 100644 --- a/lib/async/redis/sentinel_client.rb +++ b/lib/async/redis/sentinel_client.rb @@ -21,13 +21,15 @@ class SentinelClient # # @property endpoints [Array(Endpoint)] The list of sentinel endpoints. # @property master_name [String] The name of the master instance, defaults to 'mymaster'. + # @property master_options [Hash] Connection options for master instances. + # @property slave_options [Hash] Connection options for slave instances (defaults to master_options if not specified). # @property role [Symbol] The role of the instance that you want to connect to, either `:master` or `:slave`. - # @property protocol [Protocol] The protocol to use when connecting to the actual Redis server, defaults to {Protocol::RESP2}. - def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, role: :master, protocol: Protocol::RESP2, **options) + def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, master_options: nil, slave_options: nil, role: :master, **options) @endpoints = endpoints @master_name = master_name + @master_options = master_options || {} + @slave_options = slave_options || @master_options @role = role - @protocol = protocol # A cache of sentinel connections. @sentinels = {} @@ -94,9 +96,11 @@ def master(name = @master_name) end end + private + # Resolve the master endpoint address. # @returns [Endpoint | Nil] The master endpoint or nil if not found. - def resolve_master + def resolve_master(options = @master_options) sentinels do |client| begin address = client.call("SENTINEL", "GET-MASTER-ADDR-BY-NAME", @master_name) @@ -104,7 +108,7 @@ def resolve_master next end - return Endpoint.remote(address[0], address[1]) if address + return Endpoint.for(nil, address[0], port: address[1], **options) if address end return nil @@ -112,7 +116,7 @@ def resolve_master # Resolve a slave endpoint address. # @returns [Endpoint | Nil] A slave endpoint or nil if not found. - def resolve_slave + def resolve_slave(options = @slave_options) sentinels do |client| begin reply = client.call("SENTINEL", "SLAVES", @master_name) @@ -124,16 +128,14 @@ def resolve_slave next if slaves.empty? slave = select_slave(slaves) - return Endpoint.remote(slave["ip"], slave["port"]) + return Endpoint.for(nil, slave["ip"], port: slave["port"], **options) end return nil end - protected - def assign_default_tags(tags) - tags[:protocol] = @protocol.to_s + tags[:role] ||= @role end # Override the parent method. The only difference is that this one needs to resolve the master/slave address. @@ -145,7 +147,7 @@ def make_pool(**options) peer = endpoint.connect stream = ::IO::Stream(peer) - @protocol.client(stream) + endpoint.protocol.client(stream) end end diff --git a/releases.md b/releases.md index b6e0b55..38caa63 100644 --- a/releases.md +++ b/releases.md @@ -5,6 +5,7 @@ - Add agent context. - Add support for pattern pub/sub. - Add support for sharded pub/sub. + - Add support for `master_options` and `slave_options` (and removed `protocol`) from `SentinelClient`. ## v0.11.2 diff --git a/sentinel/test/async/redis/sentinel_client.rb b/sentinel/test/async/redis/sentinel_client.rb index aae2068..8fd3971 100644 --- a/sentinel/test/async/redis/sentinel_client.rb +++ b/sentinel/test/async/redis/sentinel_client.rb @@ -7,6 +7,7 @@ require "async/redis/sentinel_client" require "sus/fixtures/async" require "securerandom" +require "openssl" describe Async::Redis::SentinelClient do include Sus::Fixtures::Async::ReactorContext @@ -32,13 +33,18 @@ expect(client.get(key)).to be == value end + def wait_until(duration: 0.001, repeats: 100) + repeats.times do + break if yield + sleep duration + end + end + it "should resolve slave address" do client.set(key, value) - # It takes a while to replicate: - while true - break if slave_client.get(key) == value - sleep 0.01 + wait_until do + slave_client.get(key) == value end expect(slave_client.get(key)).to be == value @@ -51,4 +57,65 @@ client.set(key, value) expect(client.get(key)).to be == value end + + with "endpoint options" do + it "uses master_options for master connections" do + master_options = {database: 1} + client_with_options = subject.new(sentinels, master_options: master_options, role: :master) + + # Verify the client can set/get values (basic connectivity): + client_with_options.set(key, value) + expect(client_with_options.get(key)).to be == value + end + + it "uses slave_options for slave connections when specified" do + slave_options = {database: 2} + slave_client_with_options = subject.new(sentinels, slave_options: slave_options, role: :slave) + + # Set data via master that also uses the same database: + master_client_with_options = subject.new(sentinels, master_options: slave_options, role: :master) + master_client_with_options.set(key, value) + + # Wait for replication and verify slave can read (basic connectivity): + wait_until do + slave_client_with_options.get(key) == value + end + + expect(slave_client_with_options.get(key)).to be == value + end + + it "falls back to master_options for slave connections when slave_options not specified" do + master_options = {database: 3} + slave_client_fallback = subject.new(sentinels, master_options: master_options, role: :slave) + + # Set data via master first: + client_with_master_options = subject.new(sentinels, master_options: master_options, role: :master) + client_with_master_options.set(key, value) + + # Wait for replication and verify slave can read using master options: + wait_until do + slave_client_fallback.get(key) == value + end + + expect(slave_client_fallback.get(key)).to be == value + end + + it "provides correct endpoint options for master role" do + master_options = {database: 1, timeout: 5} + slave_options = {database: 2, timeout: 10} + + client_with_both = subject.new(sentinels, master_options: master_options, slave_options: slave_options) + + expect(client_with_both.instance_variable_get(:@master_options)).to be == master_options + end + + it "provides correct endpoint options for slave role" do + master_options = {database: 1, timeout: 5} + slave_options = {database: 2, timeout: 10} + + client_with_both = subject.new(sentinels, master_options: master_options, slave_options: slave_options) + + expect(client_with_both.instance_variable_get(:@slave_options)).to be == slave_options + end + end end diff --git a/test/async/redis/endpoint.rb b/test/async/redis/endpoint.rb index 385a0c0..f1ea469 100644 --- a/test/async/redis/endpoint.rb +++ b/test/async/redis/endpoint.rb @@ -143,4 +143,26 @@ expect(endpoint.database).to be == 3 end end + + with "scheme auto-detection" do + it "auto-detects redis scheme when no ssl_context provided" do + endpoint = Async::Redis::Endpoint.for(nil, "localhost", port: 6379) + expect(endpoint.scheme).to be == "redis" + expect(endpoint).not.to be(:secure?) + end + + it "auto-detects rediss scheme when ssl_context provided" do + ssl_context = OpenSSL::SSL::SSLContext.new + endpoint = Async::Redis::Endpoint.for(nil, "localhost", ssl_context: ssl_context) + expect(endpoint.scheme).to be == "rediss" + expect(endpoint).to be(:secure?) + end + + it "respects explicit scheme even when ssl_context provided" do + ssl_context = OpenSSL::SSL::SSLContext.new + endpoint = Async::Redis::Endpoint.for("redis", "localhost", ssl_context: ssl_context) + expect(endpoint.scheme).to be == "redis" + expect(endpoint).not.to be(:secure?) # scheme takes precedence + end + end end diff --git a/test/async/redis/sentinel_client.rb b/test/async/redis/sentinel_client.rb new file mode 100644 index 0000000..500be78 --- /dev/null +++ b/test/async/redis/sentinel_client.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024-2025, by Samuel Williams. + +require "async/redis/sentinel_client" +require "sus/fixtures/async" +require "openssl" + +describe Async::Redis::SentinelClient do + include Sus::Fixtures::Async::ReactorContext + + let(:sentinels) {[ + Async::Redis::Endpoint.parse("redis://localhost:26379") + ]} + + with "initialization" do + it "can be created with basic parameters" do + client = subject.new(sentinels) + expect(client.master_name).to be == "mymaster" + expect(client.role).to be == :master + end + + it "accepts master_options parameter" do + master_options = {database: 1, timeout: 5} + client = subject.new(sentinels, master_options: master_options) + + # Test that the options are used by checking the instance variables + expect(client.instance_variable_get(:@master_options)).to be == master_options + end + + it "accepts slave_options parameter" do + master_options = {database: 1} + slave_options = {database: 2, timeout: 10} + client = subject.new(sentinels, master_options: master_options, slave_options: slave_options) + + expect(client.instance_variable_get(:@slave_options)).to be == slave_options + end + + it "uses master_options as fallback for slaves when slave_options not provided" do + master_options = {database: 1, timeout: 5} + client = subject.new(sentinels, master_options: master_options) + + expect(client.instance_variable_get(:@slave_options)).to be == master_options + end + + it "handles empty options gracefully" do + client = subject.new(sentinels) + + expect(client.instance_variable_get(:@master_options)).to be == {} + expect(client.instance_variable_get(:@slave_options)).to be == {} + end + end + + with "endpoint options by role" do + let(:master_options) {{database: 1, timeout: 5}} + let(:slave_options) {{database: 2, timeout: 10}} + let(:client) {subject.new(sentinels, master_options: master_options, slave_options: slave_options)} + + it "stores master options correctly" do + expect(client.instance_variable_get(:@master_options)).to be == master_options + end + + it "stores slave options correctly" do + expect(client.instance_variable_get(:@slave_options)).to be == slave_options + end + end + + with "role-specific scheme handling" do + it "creates correct endpoint scheme for master with SSL" do + ssl_context = OpenSSL::SSL::SSLContext.new + master_options = {ssl_context: ssl_context} + + endpoint = Async::Redis::Endpoint.for(nil, "localhost", **master_options) + expect(endpoint.scheme).to be == "rediss" + end + + it "creates different schemes for master vs slave when options differ" do + ssl_context = OpenSSL::SSL::SSLContext.new + master_options = {ssl_context: ssl_context} # SSL enabled + slave_options = {database: 1} # No SSL + + master_endpoint = Async::Redis::Endpoint.for(nil, "localhost", **master_options) + slave_endpoint = Async::Redis::Endpoint.for(nil, "localhost", **slave_options) + + expect(master_endpoint.scheme).to be == "rediss" + expect(slave_endpoint.scheme).to be == "redis" + end + end +end