Skip to content
5 changes: 5 additions & 0 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class Cluster
# @since 2.1.1
READ_RETRY_INTERVAL = 5

# How often an idle primary writes a no-op to the oplog.
#
# @since 2.4.0
IDLE_WRITE_PERIOD_SECONDS = 10

# @return [ Hash ] The options hash.
attr_reader :options

Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/error/invalid_server_preference.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class InvalidServerPreference < Error
# Error message for when the max staleness is not at least twice the heartbeat frequency.
#
# @since 2.4.0
INVALID_MAX_STALENESS = "max_staleness must be at least twice the client's heartbeat frequency.".freeze
INVALID_MAX_STALENESS = "`max_staleness` value is too small. It must be at least " +
"`ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` and (the cluster's heartbeat_frequency " +
"setting + `Cluster::IDLE_WRITE_PERIOD_SECONDS`).".freeze

# Error message when max staleness cannot be used because one or more servers has version < 3.4.
#
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Server

# Get the description from the monitor and scan on monitor.
def_delegators :monitor, :description, :scan!, :heartbeat_frequency, :last_scan
alias :heartbeat_frequency_seconds :heartbeat_frequency

# Delegate convenience methods to the monitor description.
def_delegators :description,
Expand Down
5 changes: 5 additions & 0 deletions lib/mongo/server_selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ module ServerSelector
# @since 2.0.0
SERVER_SELECTION_TIMEOUT = 30.freeze

# The smallest allowed max staleness value, in seconds.
#
# @since 2.4.0
SMALLEST_MAX_STALENESS_SECONDS = 90

# Primary read preference.
#
# @since 2.1.0
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server_selector/nearest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def tags_allowed?
def to_mongos
preference = { :mode => 'nearest' }
preference.merge!({ :tags => tag_sets }) unless tag_sets.empty?
preference.merge!({ maxStalenessMS: max_staleness * 1000 }) if max_staleness
preference.merge!({ maxStalenessSeconds: max_staleness }) if max_staleness
preference
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server_selector/primary_preferred.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def tags_allowed?
def to_mongos
preference = { :mode => 'primaryPreferred' }
preference.merge!({ :tags => tag_sets }) unless tag_sets.empty?
preference.merge!({ maxStalenessMS: max_staleness * 1000 }) if max_staleness
preference.merge!({ maxStalenessSeconds: max_staleness }) if max_staleness
preference
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server_selector/secondary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def tags_allowed?
def to_mongos
preference = { :mode => 'secondary' }
preference.merge!({ :tags => tag_sets }) unless tag_sets.empty?
preference.merge!({ maxStalenessMS: max_staleness * 1000 }) if max_staleness
preference.merge!({ maxStalenessSeconds: max_staleness }) if max_staleness
preference
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server_selector/secondary_preferred.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def to_mongos
return nil if tag_sets.empty? && max_staleness.nil?
preference = { mode: 'secondaryPreferred' }
preference.merge!({ tags: tag_sets }) unless tag_sets.empty?
preference.merge!({ maxStalenessMS: max_staleness * 1000 }) if max_staleness
preference.merge!({ maxStalenessSeconds: max_staleness }) if max_staleness
preference
end

Expand Down
20 changes: 11 additions & 9 deletions lib/mongo/server_selector/selectable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module Selectable
# @return [ Array ] tag_sets The tag sets used to select servers.
attr_reader :tag_sets

# @return [ Float ] max_staleness The maximum replication lag, in seconds, that a
# @return [ Integer ] max_staleness The maximum replication lag, in seconds, that a
# secondary can suffer and still be eligible for a read.
#
# @since 2.4.0
Expand Down Expand Up @@ -68,7 +68,7 @@ def ==(other)
def initialize(options = {})
@options = (options || {}).freeze
@tag_sets = (options[:tag_sets] || []).freeze
@max_staleness = options[:max_staleness] if options[:max_staleness] && options[:max_staleness] > 0
@max_staleness = options[:max_staleness] unless options[:max_staleness] == -1
validate!
end

Expand Down Expand Up @@ -154,7 +154,7 @@ def candidates(cluster)
elsif cluster.sharded?
near_servers(cluster.servers).each { |server| validate_max_staleness_support!(server) }
else
validate_max_staleness_value!(cluster)
validate_max_staleness_value!(cluster) unless cluster.unknown?
select(cluster.servers)
end
end
Expand Down Expand Up @@ -230,14 +230,14 @@ def filter_stale_servers(candidates, primary = nil)
validate_max_staleness_support!(server)
staleness = (server.last_scan - server.last_write_date) -
(primary.last_scan - primary.last_write_date) +
(server.heartbeat_frequency * 1000)
(server.heartbeat_frequency_seconds * 1000)
staleness <= max_staleness_ms
end
else
max_write_date = candidates.collect(&:last_write_date).max
candidates.select do |server|
validate_max_staleness_support!(server)
staleness = max_write_date - server.last_write_date + (server.heartbeat_frequency * 1000)
staleness = max_write_date - server.last_write_date + (server.heartbeat_frequency_seconds * 1000)
staleness <= max_staleness_ms
end
end
Expand All @@ -258,10 +258,12 @@ def validate_max_staleness_support!(server)
end

def validate_max_staleness_value!(cluster)
return unless @max_staleness
heartbeat_frequency = cluster.options[:heartbeat_frequency] || Server::Monitor::HEARTBEAT_FREQUENCY
if @max_staleness < heartbeat_frequency * 2
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::INVALID_MAX_STALENESS)
if @max_staleness
heartbeat_frequency_seconds = cluster.options[:heartbeat_frequency] || Server::Monitor::HEARTBEAT_FREQUENCY
unless @max_staleness >= [ SMALLEST_MAX_STALENESS_SECONDS,
(heartbeat_frequency_seconds + Cluster::IDLE_WRITE_PERIOD_SECONDS) ].max
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::INVALID_MAX_STALENESS)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/uri.rb
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def self.uri_option(uri_key, name, extra = {})
# Read Options
uri_option 'readpreference', :mode, :group => :read, :type => :read_mode
uri_option 'readpreferencetags', :tag_sets, :group => :read, :type => :read_tags
uri_option 'maxstalenessms', :max_staleness, :group => :read, :type => :ms_convert
uri_option 'maxstalenessseconds', :max_staleness, :group => :read

# Pool options
uri_option 'minpoolsize', :min_pool_size
Expand Down
56 changes: 35 additions & 21 deletions spec/mongo/max_staleness_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
if spec.heartbeat_frequency
TEST_OPTIONS.merge(heartbeat_frequency: spec.heartbeat_frequency)
else
copy = TEST_OPTIONS.dup
copy.delete(:heartbeat_frequency)
copy
end.merge!(server_selection_timeout: 0.2)
TEST_OPTIONS.dup.tap do |opts|
opts.delete(:heartbeat_frequency)
end
end.merge!(server_selection_timeout: 0.2, connect_timeout: 0.1)
end

let(:cluster) do
Expand All @@ -38,7 +38,8 @@
allow(c).to receive(:single?).and_return(topology.single?)
allow(c).to receive(:sharded?).and_return(topology.sharded?)
allow(c).to receive(:replica_set?).and_return(topology.replica_set?)
allow(c).to receive(:options).and_return(options.merge(server_selection_timeout: 0.2))
allow(c).to receive(:unknown?).and_return(topology.unknown?)
allow(c).to receive(:options).and_return(options)
allow(c).to receive(:scan!).and_return(true)
allow(c).to receive(:app_metadata).and_return(app_metadata)
end
Expand All @@ -47,25 +48,25 @@
let(:candidate_servers) do
spec.candidate_servers.collect do |server|
features = double('features').tap do |feat|
allow(feat).to receive(:max_staleness_enabled?).and_return(server['maxWireVersion'] >= 5)
allow(feat).to receive(:max_staleness_enabled?).and_return(server['maxWireVersion'] && server['maxWireVersion'] >= 5)
end
address = Mongo::Address.new(server['address'])
Mongo::Server.new(address, cluster, monitoring, listeners, options).tap do |s|
allow(s).to receive(:average_round_trip_time).and_return(server['avg_rtt_ms'] / 1000.0)
allow(s).to receive(:average_round_trip_time).and_return(server['avg_rtt_ms'] / 1000.0) if server['avg_rtt_ms']
allow(s).to receive(:tags).and_return(server['tags'])
allow(s).to receive(:secondary?).and_return(server['type'] == 'RSSecondary')
allow(s).to receive(:primary?).and_return(server['type'] == 'RSPrimary')
allow(s).to receive(:connectable?).and_return(true)
allow(s).to receive(:last_write_date).and_return(server['lastWrite']['lastWriteDate']['$numberLong'].to_i * 1000)
allow(s).to receive(:last_scan).and_return(server['lastUpdateTime'] * 1000)
allow(s).to receive(:last_write_date).and_return(server['lastWrite']['lastWriteDate']['$numberLong'].to_i) if server['lastWrite']
allow(s).to receive(:last_scan).and_return(server['lastUpdateTime'])
allow(s).to receive(:features).and_return(features)
end
end
end

let(:in_latency_window) do
spec.in_latency_window.collect do |server|
Mongo::Server.new(Mongo::Address.new(server['address']), cluster, monitoring, listeners)
Mongo::Server.new(Mongo::Address.new(server['address']), cluster, monitoring, listeners, options)
end
end

Expand All @@ -84,23 +85,36 @@
allow(cluster).to receive(:servers).and_return(candidate_servers)
end

context 'Valid read preference and matching server available', unless: spec.invalid_max_staleness? do
context 'when the max staleness is invalid' do

it 'Finds all suitable servers in the latency window', if: spec.replica_set? do
expect(server_selector.send(:select, cluster.servers)).to match_array(in_latency_window)
end
it 'Raises an InvalidServerPreference exception', if: spec.invalid_max_staleness? do

it 'Finds the most suitable server in the latency window' do
expect(in_latency_window).to include(server_selector.select_server(cluster))
expect do
server_selector.select_server(cluster)
end.to raise_exception(Mongo::Error::InvalidServerPreference)
end
end

context 'when the max staleness cannot be applied', if: spec.invalid_max_staleness? do
context 'when the max staleness is valid' do

it 'Raises exception' do
expect do
server_selector.select_server(cluster)
end.to raise_exception(Mongo::Error::InvalidServerPreference)
context 'when there are available servers' do

it 'Finds all suitable servers in the latency window', if: (spec.replica_set? && !spec.invalid_max_staleness? && spec.server_available?) do
expect(server_selector.send(:select, cluster.servers)).to match_array(in_latency_window)
end

it 'Finds the most suitable server in the latency window', if: (!spec.invalid_max_staleness? && spec.server_available?) do
expect(in_latency_window).to include(server_selector.select_server(cluster))
end
end

context 'when there are no available servers', if: (!spec.invalid_max_staleness? && !spec.server_available?) do

it 'Raises a NoServerAvailable Exception' do
expect do
server_selector.select_server(cluster)
end.to raise_exception(Mongo::Error::NoServerAvailable)
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions spec/mongo/server_selection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
allow(c).to receive(:single?).and_return(topology.single?)
allow(c).to receive(:sharded?).and_return(topology.sharded?)
allow(c).to receive(:replica_set?).and_return(topology.replica_set?)
allow(c).to receive(:unknown?).and_return(topology.unknown?)
allow(c).to receive(:app_metadata).and_return(app_metadata)
end
end
Expand Down
10 changes: 5 additions & 5 deletions spec/mongo/server_selector/nearest_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context 'when max_staleness is provided' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 95 }
end

it 'sets the max_staleness option' do
Expand All @@ -32,7 +32,7 @@
context 'when max staleness is the same' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 95 }
end

let(:other) do
Expand All @@ -47,7 +47,7 @@
context 'when max staleness is different' do

let(:other_options) do
{ max_staleness: 30 }
{ max_staleness: 100 }
end

let(:other) do
Expand Down Expand Up @@ -100,11 +100,11 @@
context 'max staleness provided' do

let(:max_staleness) do
60
100
end

let(:expected) do
{ :mode => 'nearest', maxStalenessMS: 60000 }
{ :mode => 'nearest', maxStalenessSeconds: 100 }
end

it 'returns a read preference formatted for mongos' do
Expand Down
10 changes: 5 additions & 5 deletions spec/mongo/server_selector/primary_preferred_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context 'when max_staleness is provided' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 95 }
end

it 'sets the max_staleness option' do
Expand All @@ -32,7 +32,7 @@
context 'when max staleness is the same' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 95 }
end

let(:other) do
Expand All @@ -47,7 +47,7 @@
context 'when max staleness is different' do

let(:other_options) do
{ max_staleness: 30 }
{ max_staleness: 100 }
end

let(:other) do
Expand Down Expand Up @@ -93,11 +93,11 @@
context 'max staleness provided' do

let(:max_staleness) do
60
100
end

let(:expected) do
{ :mode => 'primaryPreferred', maxStalenessMS: 60000 }
{ :mode => 'primaryPreferred', maxStalenessSeconds: 100 }
end

it 'returns a read preference formatted for mongos' do
Expand Down
4 changes: 2 additions & 2 deletions spec/mongo/server_selector/primary_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
context 'when max_staleness is provided' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 100 }
end

it 'raises an exception' do
Expand Down Expand Up @@ -66,7 +66,7 @@
context 'max staleness provided' do

let(:max_staleness) do
60
100
end

it 'raises an error' do
Expand Down
8 changes: 4 additions & 4 deletions spec/mongo/server_selector/secondary_preferred_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context 'when max_staleness is provided' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 95 }
end

it 'sets the max_staleness option' do
Expand All @@ -32,7 +32,7 @@
context 'when max staleness is the same' do

let(:options) do
{ max_staleness: 60 }
{ max_staleness: 90 }
end

let(:other) do
Expand All @@ -47,7 +47,7 @@
context 'when max staleness is different' do

let(:other_options) do
{ max_staleness: 30 }
{ max_staleness: 100 }
end

let(:other) do
Expand Down Expand Up @@ -100,7 +100,7 @@
end

let(:expected) do
{ :mode => 'secondaryPreferred', maxStalenessMS: 60000 }
{ :mode => 'secondaryPreferred', maxStalenessSeconds: 60 }
end

it 'returns a read preference formatted for mongos' do
Expand Down
Loading