Skip to content

Commit

Permalink
RUBY-1563 Poll SRV records for mongos discovery (#1441)
Browse files Browse the repository at this point in the history
* RUBY-1563 Implement polling SRV records for mongos

* RUBY-1563 Poll SRV records for mongos discovery

* Ensure host mismatch does not affect mongos status

* RUBY-1563 verify last_scan start as nil

* Omit rubydns on jruby since we do not use it

* jruby hack

* Stop creating srv monitor in topology instances

* Try locking srv monitor writes for jruby
  • Loading branch information
p-mongo committed Sep 12, 2019
1 parent 90f09a8 commit 485ee7b
Show file tree
Hide file tree
Showing 24 changed files with 1,204 additions and 19 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ end
group :testing do
gem 'timecop'
gem 'ice_nine'
gem 'rubydns', platforms: :mri
gem 'rspec-retry'
gem 'rspec-expectations', '~> 3.0'
gem 'rspec-mocks-diag', '~> 3.0'
Expand Down
22 changes: 20 additions & 2 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Client
:read_concern,
:read_retry_interval,
:replica_set,
:resolv_options,
:retry_reads,
:retry_writes,
:scan,
Expand Down Expand Up @@ -365,6 +366,8 @@ def hash
# Can be :w => Integer|String, :fsync => Boolean, :j => Boolean.
# @option options [ Integer ] :zlib_compression_level The Zlib compression level to use, if using compression.
# See Ruby's Zlib module for valid levels.
# @option options [ Hash ] :resolv_options For internal driver use only.
# Options to pass through to Resolv::DNS constructor for SRV lookups.
#
# @since 2.0.0
def initialize(addresses_or_uri, options = nil)
Expand All @@ -374,8 +377,14 @@ def initialize(addresses_or_uri, options = nil)
options = {}
end

srv_uri = nil
if addresses_or_uri.is_a?(::String)
uri = URI.get(addresses_or_uri, options)
if uri.is_a?(URI::SRVProtocol)
# If the URI is an SRV URI, note this so that we can start
# SRV polling if the topology is a sharded cluster.
srv_uri = uri
end
addresses = uri.servers
uri_options = uri.client_options.dup
# Special handing for :write and :write_concern: allow client Ruby
Expand All @@ -386,8 +395,10 @@ def initialize(addresses_or_uri, options = nil)
uri_options.delete(:write_concern)
end
options = uri_options.merge(options)
@srv_records = uri.srv_records
else
addresses = addresses_or_uri
@srv_records = nil
end

unless options[:retry_reads] == false
Expand Down Expand Up @@ -423,7 +434,7 @@ def initialize(addresses_or_uri, options = nil)
sdam_proc.call(self)
end

@cluster = Cluster.new(addresses, @monitoring, cluster_options)
@cluster = Cluster.new(addresses, @monitoring, cluster_options.merge(srv_uri: srv_uri))

# Unset monitoring, it will be taken out of cluster from now on
remove_instance_variable('@monitoring')
Expand All @@ -447,7 +458,14 @@ def cluster_options
# applications should read these values from client, not from cluster
max_read_retries: options[:max_read_retries],
read_retry_interval: options[:read_retry_interval],
)
).tap do |options|
# If the client has a cluster already, forward srv_uri to the new
# cluster to maintain SRV monitoring. If the client is brand new,
# its constructor sets srv_uri manually.
if cluster
options.update(srv_uri: cluster.options[:srv_uri])
end
end
end

# Get the maximum number of times the client can retry a read operation
Expand Down
61 changes: 61 additions & 0 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class Cluster
# :cleanup automatically defaults to false as well.
# @option options [ Float ] :heartbeat_frequency The interval, in seconds,
# for the server monitor to refresh its description via ismaster.
# @option options [ Hash ] :resolv_options For internal driver use only.
# Options to pass through to Resolv::DNS constructor for SRV lookups.
#
# @since 2.0.0
def initialize(seeds, monitoring, options = Options::Redacted.new)
Expand All @@ -116,6 +118,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@sdam_flow_lock = Mutex.new
@cluster_time = nil
@cluster_time_lock = Mutex.new
@srv_monitor_lock = Mutex.new
@server_selection_semaphore = Semaphore.new
@topology = Topology.initial(self, monitoring, options)
Session::SessionPool.create(self)
Expand Down Expand Up @@ -280,6 +283,9 @@ def self.create(client)
end
end

# @api private
attr_reader :srv_monitor

# Get the maximum number of times the client can retry a read operation
# when using legacy read retries.
#
Expand Down Expand Up @@ -439,6 +445,11 @@ def disconnect!(wait=false)
session_pool.end_sessions
@periodic_executor.stop!
end
@srv_monitor_lock.synchronize do
if @srv_monitor
@srv_monitor.stop!
end
end
@servers.each do |server|
if server.connected?
server.disconnect!(wait)
Expand Down Expand Up @@ -569,6 +580,38 @@ def run_sdam_flow(previous_desc, updated_desc, options = {})
unless updated_desc.unknown?
server_selection_semaphore.broadcast
end

check_and_start_srv_monitor
end

# Sets the list of servers to the addresses in the provided list of address
# strings.
#
# This method is called by the SRV monitor after receiving new DNS records
# for the monitored hostname.
#
# Removes servers in the cluster whose addresses are not in the passed
# list of server addresses, and adds servers for any addresses in the
# argument which are not already in the cluster.
#
# @param [ Array<String> ] server_address_strs List of server addresses
# to sync the cluster servers to.
#
# @api private
def set_server_list(server_address_strs)
@sdam_flow_lock.synchronize do
server_address_strs.each do |address_str|
unless servers_list.any? { |server| server.address.seed == address_str }
add(address_str)
end
end

servers_list.each do |server|
unless server_address_strs.any? { |address_str| server.address.seed == address_str }
remove(server.address.seed)
end
end
end
end

# Determine if this cluster of servers is equal to another object. Checks the
Expand Down Expand Up @@ -781,7 +824,25 @@ def sessions_supported?
false
end
end

# @api private
def check_and_start_srv_monitor
return unless topology.is_a?(Topology::Sharded) && options[:srv_uri]
@srv_monitor_lock.synchronize do
unless @srv_monitor
monitor_options = options.merge(
timeout: options[:connect_timeout] || Server::CONNECT_TIMEOUT)
@srv_monitor = _srv_monitor = SrvMonitor.new(self, monitor_options)
finalizer = lambda do
_srv_monitor.stop!
end
ObjectSpace.define_finalizer(self, finalizer)
end
@srv_monitor.run!
end
end
end
end

require 'mongo/cluster/sdam_flow'
require 'mongo/cluster/srv_monitor'
127 changes: 127 additions & 0 deletions lib/mongo/cluster/srv_monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (C) 2019 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Cluster

# Periodically retrieves SRV records for the cluster's SRV URI, and
# sets the cluster's server list to the SRV lookup result.
#
# If an error is encountered during SRV lookup or an SRV record is invalid
# or disallowed for security reasons, a warning is logged and monitoring
# continues.
#
# @api private
class SrvMonitor
include Loggable
include BackgroundThread

MIN_SCAN_INTERVAL = 60

DEFAULT_TIMEOUT = 10

# Creates the SRV monitor.
#
# @param [ Cluster ] cluster The cluster.
# @param [ Hash ] options The cluster options.
#
# @option options [ Float ] :timeout The timeout to use for DNS lookups.
# @option options [ URI::SRVProtocol ] :srv_uri The SRV URI to monitor.
# @option options [ Hash ] :resolv_options For internal driver use only.
# Options to pass through to Resolv::DNS constructor for SRV lookups.
def initialize(cluster, options = nil)
options = if options
options.dup
else
{}
end
@cluster = cluster
@resolver = Srv::Resolver.new(options)
unless @srv_uri = options.delete(:srv_uri)
raise ArgumentError, 'SRV URI is required'
end
@options = options.freeze
@last_result = @srv_uri.srv_result
@stop_semaphore = Semaphore.new
end

attr_reader :options

attr_reader :cluster

# @return [ Srv::Result ] Last known SRV lookup result. Used for
# determining intervals between SRV lookups, which depend on SRV DNS
# records' TTL values.
attr_reader :last_result

def start!
super
ObjectSpace.define_finalizer(self, self.class.finalize(@thread))
end

private

def do_work
scan!
@stop_semaphore.wait(scan_interval)
end

def scan!
old_hosts = last_result.address_strs

begin
last_result = Timeout.timeout(timeout) do
@resolver.get_records(@srv_uri.query_hostname)
end
rescue Resolv::ResolvTimeout => e
log_warn("SRV monitor: timed out trying to resolve hostname #{@srv_uri.query_hostname}: #{e.class}: #{e}")
return
rescue Timeout::Error
log_warn("SRV monitor: timed out trying to resolve hostname #{@srv_uri.query_hostname} (timeout=#{timeout})")
return
rescue Resolv::ResolvError => e
log_warn("SRV monitor: unable to resolve hostname #{@srv_uri.query_hostname}: #{e.class}: #{e}")
return
end

if last_result.empty?
log_warn("SRV monitor: hostname #{@srv_uri.query_hostname} resolved to zero records")
return
end

@cluster.set_server_list(last_result.address_strs)
end

def self.finalize(thread)
Proc.new do
thread.kill
end
end

def scan_interval
if last_result.empty?
[cluster.heartbeat_interval, MIN_SCAN_INTERVAL].min
elsif last_result.min_ttl.nil?
MIN_SCAN_INTERVAL
else
[last_result.min_ttl, MIN_SCAN_INTERVAL].max
end
end

def timeout
options[:timeout] || DEFAULT_TIMEOUT
end
end
end
end
2 changes: 2 additions & 0 deletions lib/mongo/server/description.rb
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ def wire_versions
# @return [ true, false ] If the description is from the server.
#
# @since 2.0.6
# @deprecated
def is_server?(server)
address == server.address
end
Expand All @@ -632,6 +633,7 @@ def is_server?(server)
# of servers.
#
# @since 2.0.6
# @deprecated
def lists_server?(server)
servers.include?(server.address.to_s)
end
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/srv.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

require 'mongo/srv/result'
require 'mongo/srv/resolver'
require 'mongo/srv/monitor'
Loading

0 comments on commit 485ee7b

Please sign in to comment.