Skip to content
10 changes: 9 additions & 1 deletion lib/async/redis/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ def self.parse(string, endpoint = nil, **options)
end

# Construct an endpoint with a specified scheme, hostname, optional path, and options.
# If no scheme is provided, it will be auto-detected based on SSL context.
#
# @parameter scheme [String] The scheme to use, e.g. "redis" or "rediss".
# @parameter scheme [String, nil] The scheme to use, e.g. "redis" or "rediss". If nil, will auto-detect.
# @parameter hostname [String] The hostname to connect to (or bind to).
# @parameter options [Hash] Additional options, passed to {#initialize}.
def self.for(scheme, host, credentials: nil, port: nil, database: nil, **options)
# Auto-detect scheme if not provided:
if default_scheme = options.delete(:scheme)
scheme ||= default_scheme
end

scheme ||= options.key?(:ssl_context) ? "rediss" : "redis"

uri_klass = SCHEMES.fetch(scheme.downcase) do
raise ArgumentError, "Unsupported scheme: #{scheme.inspect}"
end
Expand Down
24 changes: 13 additions & 11 deletions lib/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ class SentinelClient
#
# @property endpoints [Array(Endpoint)] The list of sentinel endpoints.
# @property master_name [String] The name of the master instance, defaults to 'mymaster'.
# @property master_options [Hash] Connection options for master instances.
# @property slave_options [Hash] Connection options for slave instances (defaults to master_options if not specified).
# @property role [Symbol] The role of the instance that you want to connect to, either `:master` or `:slave`.
# @property protocol [Protocol] The protocol to use when connecting to the actual Redis server, defaults to {Protocol::RESP2}.
def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, role: :master, protocol: Protocol::RESP2, **options)
def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, master_options: nil, slave_options: nil, role: :master, **options)
@endpoints = endpoints
@master_name = master_name
@master_options = master_options || {}
@slave_options = slave_options || @master_options
@role = role
@protocol = protocol

# A cache of sentinel connections.
@sentinels = {}
Expand Down Expand Up @@ -94,25 +96,27 @@ def master(name = @master_name)
end
end

private

# Resolve the master endpoint address.
# @returns [Endpoint | Nil] The master endpoint or nil if not found.
def resolve_master
def resolve_master(options = @master_options)
sentinels do |client|
begin
address = client.call("SENTINEL", "GET-MASTER-ADDR-BY-NAME", @master_name)
rescue Errno::ECONNREFUSED
next
end

return Endpoint.remote(address[0], address[1]) if address
return Endpoint.for(nil, address[0], port: address[1], **options) if address
end

return nil
end

# Resolve a slave endpoint address.
# @returns [Endpoint | Nil] A slave endpoint or nil if not found.
def resolve_slave
def resolve_slave(options = @slave_options)
sentinels do |client|
begin
reply = client.call("SENTINEL", "SLAVES", @master_name)
Expand All @@ -124,16 +128,14 @@ def resolve_slave
next if slaves.empty?

slave = select_slave(slaves)
return Endpoint.remote(slave["ip"], slave["port"])
return Endpoint.for(nil, slave["ip"], port: slave["port"], **options)
end

return nil
end

protected

def assign_default_tags(tags)
tags[:protocol] = @protocol.to_s
tags[:role] ||= @role
end

# Override the parent method. The only difference is that this one needs to resolve the master/slave address.
Expand All @@ -145,7 +147,7 @@ def make_pool(**options)
peer = endpoint.connect
stream = ::IO::Stream(peer)

@protocol.client(stream)
endpoint.protocol.client(stream)
end
end

Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Add agent context.
- Add support for pattern pub/sub.
- Add support for sharded pub/sub.
- Add support for `master_options` and `slave_options` (and removed `protocol`) from `SentinelClient`.

## v0.11.2

Expand Down
75 changes: 71 additions & 4 deletions sentinel/test/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "async/redis/sentinel_client"
require "sus/fixtures/async"
require "securerandom"
require "openssl"

describe Async::Redis::SentinelClient do
include Sus::Fixtures::Async::ReactorContext
Expand All @@ -32,13 +33,18 @@
expect(client.get(key)).to be == value
end

def wait_until(duration: 0.001, repeats: 100)
repeats.times do
break if yield
sleep duration
end
end

it "should resolve slave address" do
client.set(key, value)

# It takes a while to replicate:
while true
break if slave_client.get(key) == value
sleep 0.01
wait_until do
slave_client.get(key) == value
end

expect(slave_client.get(key)).to be == value
Expand All @@ -51,4 +57,65 @@
client.set(key, value)
expect(client.get(key)).to be == value
end

with "endpoint options" do
it "uses master_options for master connections" do
master_options = {database: 1}
client_with_options = subject.new(sentinels, master_options: master_options, role: :master)

# Verify the client can set/get values (basic connectivity):
client_with_options.set(key, value)
expect(client_with_options.get(key)).to be == value
end

it "uses slave_options for slave connections when specified" do
slave_options = {database: 2}
slave_client_with_options = subject.new(sentinels, slave_options: slave_options, role: :slave)

# Set data via master that also uses the same database:
master_client_with_options = subject.new(sentinels, master_options: slave_options, role: :master)
master_client_with_options.set(key, value)

# Wait for replication and verify slave can read (basic connectivity):
wait_until do
slave_client_with_options.get(key) == value
end

expect(slave_client_with_options.get(key)).to be == value
end

it "falls back to master_options for slave connections when slave_options not specified" do
master_options = {database: 3}
slave_client_fallback = subject.new(sentinels, master_options: master_options, role: :slave)

# Set data via master first:
client_with_master_options = subject.new(sentinels, master_options: master_options, role: :master)
client_with_master_options.set(key, value)

# Wait for replication and verify slave can read using master options:
wait_until do
slave_client_fallback.get(key) == value
end

expect(slave_client_fallback.get(key)).to be == value
end

it "provides correct endpoint options for master role" do
master_options = {database: 1, timeout: 5}
slave_options = {database: 2, timeout: 10}

client_with_both = subject.new(sentinels, master_options: master_options, slave_options: slave_options)

expect(client_with_both.instance_variable_get(:@master_options)).to be == master_options
end

it "provides correct endpoint options for slave role" do
master_options = {database: 1, timeout: 5}
slave_options = {database: 2, timeout: 10}

client_with_both = subject.new(sentinels, master_options: master_options, slave_options: slave_options)

expect(client_with_both.instance_variable_get(:@slave_options)).to be == slave_options
end
end
end
22 changes: 22 additions & 0 deletions test/async/redis/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,26 @@
expect(endpoint.database).to be == 3
end
end

with "scheme auto-detection" do
it "auto-detects redis scheme when no ssl_context provided" do
endpoint = Async::Redis::Endpoint.for(nil, "localhost", port: 6379)
expect(endpoint.scheme).to be == "redis"
expect(endpoint).not.to be(:secure?)
end

it "auto-detects rediss scheme when ssl_context provided" do
ssl_context = OpenSSL::SSL::SSLContext.new
endpoint = Async::Redis::Endpoint.for(nil, "localhost", ssl_context: ssl_context)
expect(endpoint.scheme).to be == "rediss"
expect(endpoint).to be(:secure?)
end

it "respects explicit scheme even when ssl_context provided" do
ssl_context = OpenSSL::SSL::SSLContext.new
endpoint = Async::Redis::Endpoint.for("redis", "localhost", ssl_context: ssl_context)
expect(endpoint.scheme).to be == "redis"
expect(endpoint).not.to be(:secure?) # scheme takes precedence
end
end
end
90 changes: 90 additions & 0 deletions test/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024-2025, by Samuel Williams.

require "async/redis/sentinel_client"
require "sus/fixtures/async"
require "openssl"

describe Async::Redis::SentinelClient do
include Sus::Fixtures::Async::ReactorContext

let(:sentinels) {[
Async::Redis::Endpoint.parse("redis://localhost:26379")
]}

with "initialization" do
it "can be created with basic parameters" do
client = subject.new(sentinels)
expect(client.master_name).to be == "mymaster"
expect(client.role).to be == :master
end

it "accepts master_options parameter" do
master_options = {database: 1, timeout: 5}
client = subject.new(sentinels, master_options: master_options)

# Test that the options are used by checking the instance variables
expect(client.instance_variable_get(:@master_options)).to be == master_options
end

it "accepts slave_options parameter" do
master_options = {database: 1}
slave_options = {database: 2, timeout: 10}
client = subject.new(sentinels, master_options: master_options, slave_options: slave_options)

expect(client.instance_variable_get(:@slave_options)).to be == slave_options
end

it "uses master_options as fallback for slaves when slave_options not provided" do
master_options = {database: 1, timeout: 5}
client = subject.new(sentinels, master_options: master_options)

expect(client.instance_variable_get(:@slave_options)).to be == master_options
end

it "handles empty options gracefully" do
client = subject.new(sentinels)

expect(client.instance_variable_get(:@master_options)).to be == {}
expect(client.instance_variable_get(:@slave_options)).to be == {}
end
end

with "endpoint options by role" do
let(:master_options) {{database: 1, timeout: 5}}
let(:slave_options) {{database: 2, timeout: 10}}
let(:client) {subject.new(sentinels, master_options: master_options, slave_options: slave_options)}

it "stores master options correctly" do
expect(client.instance_variable_get(:@master_options)).to be == master_options
end

it "stores slave options correctly" do
expect(client.instance_variable_get(:@slave_options)).to be == slave_options
end
end

with "role-specific scheme handling" do
it "creates correct endpoint scheme for master with SSL" do
ssl_context = OpenSSL::SSL::SSLContext.new
master_options = {ssl_context: ssl_context}

endpoint = Async::Redis::Endpoint.for(nil, "localhost", **master_options)
expect(endpoint.scheme).to be == "rediss"
end

it "creates different schemes for master vs slave when options differ" do
ssl_context = OpenSSL::SSL::SSLContext.new
master_options = {ssl_context: ssl_context} # SSL enabled
slave_options = {database: 1} # No SSL

master_endpoint = Async::Redis::Endpoint.for(nil, "localhost", **master_options)
slave_endpoint = Async::Redis::Endpoint.for(nil, "localhost", **slave_options)

expect(master_endpoint.scheme).to be == "rediss"
expect(slave_endpoint.scheme).to be == "redis"
end
end
end
Loading