diff --git a/Gemfile b/Gemfile index 124afb7f83..1674c1223f 100644 --- a/Gemfile +++ b/Gemfile @@ -26,6 +26,7 @@ end group :testing do gem 'timecop' gem 'ice_nine' + gem 'rubydns', platforms: :mri gem 'rspec-retry' gem 'rspec-expectations', '~> 3.0' gem 'rspec-mocks-diag', '~> 3.0' diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index f1fffefea9..7251792319 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -76,6 +76,7 @@ class Client :read_concern, :read_retry_interval, :replica_set, + :resolv_options, :retry_reads, :retry_writes, :scan, @@ -365,6 +366,8 @@ def hash # Can be :w => Integer|String, :fsync => Boolean, :j => Boolean. # @option options [ Integer ] :zlib_compression_level The Zlib compression level to use, if using compression. # See Ruby's Zlib module for valid levels. + # @option options [ Hash ] :resolv_options For internal driver use only. + # Options to pass through to Resolv::DNS constructor for SRV lookups. # # @since 2.0.0 def initialize(addresses_or_uri, options = nil) @@ -374,8 +377,14 @@ def initialize(addresses_or_uri, options = nil) options = {} end + srv_uri = nil if addresses_or_uri.is_a?(::String) uri = URI.get(addresses_or_uri, options) + if uri.is_a?(URI::SRVProtocol) + # If the URI is an SRV URI, note this so that we can start + # SRV polling if the topology is a sharded cluster. + srv_uri = uri + end addresses = uri.servers uri_options = uri.client_options.dup # Special handing for :write and :write_concern: allow client Ruby @@ -386,8 +395,10 @@ def initialize(addresses_or_uri, options = nil) uri_options.delete(:write_concern) end options = uri_options.merge(options) + @srv_records = uri.srv_records else addresses = addresses_or_uri + @srv_records = nil end unless options[:retry_reads] == false @@ -423,7 +434,7 @@ def initialize(addresses_or_uri, options = nil) sdam_proc.call(self) end - @cluster = Cluster.new(addresses, @monitoring, cluster_options) + @cluster = Cluster.new(addresses, @monitoring, cluster_options.merge(srv_uri: srv_uri)) # Unset monitoring, it will be taken out of cluster from now on remove_instance_variable('@monitoring') @@ -447,7 +458,14 @@ def cluster_options # applications should read these values from client, not from cluster max_read_retries: options[:max_read_retries], read_retry_interval: options[:read_retry_interval], - ) + ).tap do |options| + # If the client has a cluster already, forward srv_uri to the new + # cluster to maintain SRV monitoring. If the client is brand new, + # its constructor sets srv_uri manually. + if cluster + options.update(srv_uri: cluster.options[:srv_uri]) + end + end end # Get the maximum number of times the client can retry a read operation diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 0ca963d6a1..4236acfb79 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -95,6 +95,8 @@ class Cluster # :cleanup automatically defaults to false as well. # @option options [ Float ] :heartbeat_frequency The interval, in seconds, # for the server monitor to refresh its description via ismaster. + # @option options [ Hash ] :resolv_options For internal driver use only. + # Options to pass through to Resolv::DNS constructor for SRV lookups. # # @since 2.0.0 def initialize(seeds, monitoring, options = Options::Redacted.new) @@ -116,6 +118,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) @sdam_flow_lock = Mutex.new @cluster_time = nil @cluster_time_lock = Mutex.new + @srv_monitor_lock = Mutex.new @server_selection_semaphore = Semaphore.new @topology = Topology.initial(self, monitoring, options) Session::SessionPool.create(self) @@ -280,6 +283,9 @@ def self.create(client) end end + # @api private + attr_reader :srv_monitor + # Get the maximum number of times the client can retry a read operation # when using legacy read retries. # @@ -439,6 +445,11 @@ def disconnect!(wait=false) session_pool.end_sessions @periodic_executor.stop! end + @srv_monitor_lock.synchronize do + if @srv_monitor + @srv_monitor.stop! + end + end @servers.each do |server| if server.connected? server.disconnect!(wait) @@ -569,6 +580,38 @@ def run_sdam_flow(previous_desc, updated_desc, options = {}) unless updated_desc.unknown? server_selection_semaphore.broadcast end + + check_and_start_srv_monitor + end + + # Sets the list of servers to the addresses in the provided list of address + # strings. + # + # This method is called by the SRV monitor after receiving new DNS records + # for the monitored hostname. + # + # Removes servers in the cluster whose addresses are not in the passed + # list of server addresses, and adds servers for any addresses in the + # argument which are not already in the cluster. + # + # @param [ Array ] server_address_strs List of server addresses + # to sync the cluster servers to. + # + # @api private + def set_server_list(server_address_strs) + @sdam_flow_lock.synchronize do + server_address_strs.each do |address_str| + unless servers_list.any? { |server| server.address.seed == address_str } + add(address_str) + end + end + + servers_list.each do |server| + unless server_address_strs.any? { |address_str| server.address.seed == address_str } + remove(server.address.seed) + end + end + end end # Determine if this cluster of servers is equal to another object. Checks the @@ -781,7 +824,25 @@ def sessions_supported? false end end + + # @api private + def check_and_start_srv_monitor + return unless topology.is_a?(Topology::Sharded) && options[:srv_uri] + @srv_monitor_lock.synchronize do + unless @srv_monitor + monitor_options = options.merge( + timeout: options[:connect_timeout] || Server::CONNECT_TIMEOUT) + @srv_monitor = _srv_monitor = SrvMonitor.new(self, monitor_options) + finalizer = lambda do + _srv_monitor.stop! + end + ObjectSpace.define_finalizer(self, finalizer) + end + @srv_monitor.run! + end + end end end require 'mongo/cluster/sdam_flow' +require 'mongo/cluster/srv_monitor' diff --git a/lib/mongo/cluster/srv_monitor.rb b/lib/mongo/cluster/srv_monitor.rb new file mode 100644 index 0000000000..28aef7521f --- /dev/null +++ b/lib/mongo/cluster/srv_monitor.rb @@ -0,0 +1,127 @@ +# Copyright (C) 2019 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Cluster + + # Periodically retrieves SRV records for the cluster's SRV URI, and + # sets the cluster's server list to the SRV lookup result. + # + # If an error is encountered during SRV lookup or an SRV record is invalid + # or disallowed for security reasons, a warning is logged and monitoring + # continues. + # + # @api private + class SrvMonitor + include Loggable + include BackgroundThread + + MIN_SCAN_INTERVAL = 60 + + DEFAULT_TIMEOUT = 10 + + # Creates the SRV monitor. + # + # @param [ Cluster ] cluster The cluster. + # @param [ Hash ] options The cluster options. + # + # @option options [ Float ] :timeout The timeout to use for DNS lookups. + # @option options [ URI::SRVProtocol ] :srv_uri The SRV URI to monitor. + # @option options [ Hash ] :resolv_options For internal driver use only. + # Options to pass through to Resolv::DNS constructor for SRV lookups. + def initialize(cluster, options = nil) + options = if options + options.dup + else + {} + end + @cluster = cluster + @resolver = Srv::Resolver.new(options) + unless @srv_uri = options.delete(:srv_uri) + raise ArgumentError, 'SRV URI is required' + end + @options = options.freeze + @last_result = @srv_uri.srv_result + @stop_semaphore = Semaphore.new + end + + attr_reader :options + + attr_reader :cluster + + # @return [ Srv::Result ] Last known SRV lookup result. Used for + # determining intervals between SRV lookups, which depend on SRV DNS + # records' TTL values. + attr_reader :last_result + + def start! + super + ObjectSpace.define_finalizer(self, self.class.finalize(@thread)) + end + + private + + def do_work + scan! + @stop_semaphore.wait(scan_interval) + end + + def scan! + old_hosts = last_result.address_strs + + begin + last_result = Timeout.timeout(timeout) do + @resolver.get_records(@srv_uri.query_hostname) + end + rescue Resolv::ResolvTimeout => e + log_warn("SRV monitor: timed out trying to resolve hostname #{@srv_uri.query_hostname}: #{e.class}: #{e}") + return + rescue Timeout::Error + log_warn("SRV monitor: timed out trying to resolve hostname #{@srv_uri.query_hostname} (timeout=#{timeout})") + return + rescue Resolv::ResolvError => e + log_warn("SRV monitor: unable to resolve hostname #{@srv_uri.query_hostname}: #{e.class}: #{e}") + return + end + + if last_result.empty? + log_warn("SRV monitor: hostname #{@srv_uri.query_hostname} resolved to zero records") + return + end + + @cluster.set_server_list(last_result.address_strs) + end + + def self.finalize(thread) + Proc.new do + thread.kill + end + end + + def scan_interval + if last_result.empty? + [cluster.heartbeat_interval, MIN_SCAN_INTERVAL].min + elsif last_result.min_ttl.nil? + MIN_SCAN_INTERVAL + else + [last_result.min_ttl, MIN_SCAN_INTERVAL].max + end + end + + def timeout + options[:timeout] || DEFAULT_TIMEOUT + end + end + end +end diff --git a/lib/mongo/server/description.rb b/lib/mongo/server/description.rb index 63926d5a85..b96c58e1fb 100644 --- a/lib/mongo/server/description.rb +++ b/lib/mongo/server/description.rb @@ -619,6 +619,7 @@ def wire_versions # @return [ true, false ] If the description is from the server. # # @since 2.0.6 + # @deprecated def is_server?(server) address == server.address end @@ -632,6 +633,7 @@ def is_server?(server) # of servers. # # @since 2.0.6 + # @deprecated def lists_server?(server) servers.include?(server.address.to_s) end diff --git a/lib/mongo/srv.rb b/lib/mongo/srv.rb index 3c1209e815..081677ef7d 100644 --- a/lib/mongo/srv.rb +++ b/lib/mongo/srv.rb @@ -14,3 +14,4 @@ require 'mongo/srv/result' require 'mongo/srv/resolver' +require 'mongo/srv/monitor' diff --git a/lib/mongo/srv/monitor.rb b/lib/mongo/srv/monitor.rb new file mode 100644 index 0000000000..47fc0d1327 --- /dev/null +++ b/lib/mongo/srv/monitor.rb @@ -0,0 +1,92 @@ +# Copyright (C) 2014-2019 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module SRV + + class Monitor + include Loggable + + MIN_RESCAN_FREQUENCY = 60 + + attr_reader :options + + def initialize(cluster, resolver, srv_records, options = nil) + @options = options || {} + @cluster = cluster + @resolver = resolver + @records = srv_records + @no_records_found = false + end + + def start_monitor! + @thread = Thread.new do + loop do + sleep(rescan_frequency) + scan! + end + end + + ObjectSpace.define_finalizer(self, self.class.finalize(@thread)) + end + + def scan! + @old_hosts = @records.hosts + + begin + @records = @resolver.get_records(@records.hostname) + rescue Resolv::ResolvTimeout => e + log_warn("Timed out trying to resolve hostname #{@records.hostname}") + return + rescue Resolv::ResolvError => e + log_warn("Unable to resolve hostname #{@records.hostname}") + return + end + + if @records.empty? + @no_records_found = true + return + end + + @no_records_found = false + + (@old_hosts - @records.hosts).each do |host| + @cluster.remove(host) + end + + (@records.hosts - @old_hosts).each do |host| + @cluster.add(host) + end + end + + def self.finalize(thread) + Proc.new do + thread.kill + end + end + + private + + def rescan_frequency + if @no_records_found + Server:: Monitor::HEARTBEAT_FREQUENCY + elsif @records.min_ttl.nil? + MIN_RESCAN_FREQUENCY + else + [@records.min_ttl, MIN_RESCAN_FREQUENCY].max + end + end + end + end +end diff --git a/lib/mongo/srv/resolver.rb b/lib/mongo/srv/resolver.rb index 0fa4019a63..dfda0d2341 100644 --- a/lib/mongo/srv/resolver.rb +++ b/lib/mongo/srv/resolver.rb @@ -33,9 +33,15 @@ class Resolver # @option options [ Boolean ] :raise_on_invalid Whether or not to raise # an exception if either a record with a mismatched domain is found # or if no records are found. Defaults to true. + # @option options [ Hash ] :resolv_options For internal driver use only. + # Options to pass through to Resolv::DNS constructor for SRV lookups. def initialize(options = nil) - @options ||= {} - @resolver = Resolv::DNS.new + @options = if options + options.dup + else + {} + end.freeze + @resolver = Resolv::DNS.new(@options[:resolv_options]) end # Obtains all of the SRV records for a given hostname. diff --git a/lib/mongo/srv/warning_result.rb b/lib/mongo/srv/warning_result.rb new file mode 100644 index 0000000000..acdf5c1827 --- /dev/null +++ b/lib/mongo/srv/warning_result.rb @@ -0,0 +1,35 @@ +# Copyright (C) 2019 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + + module Srv + + # SRV record lookup result which warns on errors rather than raising + # exceptions. + # + # @api private + class WarningResult < Result + + # Adds a new record. + # + # @param [ Resolv::DNS::Resource ] record An SRV record found for the hostname. + def add_record(record) + super + rescue Error::InvalidAddress, Error::MismatchedDomain => e + log_warn(e.message) + end + end + end +end diff --git a/lib/mongo/uri.rb b/lib/mongo/uri.rb index 8582af488e..064e75f9ef 100644 --- a/lib/mongo/uri.rb +++ b/lib/mongo/uri.rb @@ -245,6 +245,10 @@ def client_options @user ? opts.merge(credentials) : opts end + def srv_records + nil + end + # Create the new uri from the provided string. # # @example Create the new URI. diff --git a/lib/mongo/uri/srv_protocol.rb b/lib/mongo/uri/srv_protocol.rb index 34d4afbfb0..a79c9b1093 100644 --- a/lib/mongo/uri/srv_protocol.rb +++ b/lib/mongo/uri/srv_protocol.rb @@ -33,6 +33,8 @@ class URI # @since 2.5.0 class SRVProtocol < URI + attr_reader :srv_records + # Gets the options hash that needs to be passed to a Mongo::Client on instantiation, so we # don't have to merge the txt record options, credentials, and database in at that point - # we only have a single point here. @@ -49,6 +51,23 @@ def client_options @user ? opts.merge(credentials) : opts end + # @return [ Srv::Result ] SRV lookup result. + # + # @api private + attr_reader :srv_result + + # The hostname that is specified in the URI and used to look up + # SRV records. + # + # This attribute needs to be defined because SRVProtocol changes + # #servers to be the result of the lookup rather than the hostname + # specified in the URI. + # + # @return [ String ] The hostname used in SRV lookup. + # + # @api private + attr_reader :query_hostname + private # @return [ String ] DOT_PARTITION The '.' character used to delineate the parts of a @@ -108,7 +127,10 @@ def raise_invalid_error!(details) # # @return [ Mongo::Srv::Resolver ] def resolver - @resolver ||= Srv::Resolver.new(raise_on_invalid: true) + @resolver ||= Srv::Resolver.new( + raise_on_invalid: true, + resolv_options: options[:resolv_options], + ) end # Parses the credentials from the URI and performs DNS queries to obtain @@ -123,9 +145,13 @@ def parse!(remaining) raise_invalid_error!(INVALID_HOST) end hostname = @servers.first - validate_hostname(hostname) + validate_srv_hostname(hostname) + @query_hostname = hostname - srv_result = resolver.get_records(hostname) + @srv_result = resolver.get_records(hostname) + if srv_result.empty? + raise Error::NoSRVRecords.new(NO_SRV_RECORDS % hostname) + end @txt_options = get_txt_options(hostname) || {} records = srv_result.address_strs records.each do |record| @@ -145,7 +171,7 @@ def parse!(remaining) # components (foo.bar.tld). # # Raises Error::InvalidURI if validation fails. - def validate_hostname(hostname) + def validate_srv_hostname(hostname) raise_invalid_error!(INVALID_PORT) if hostname.include?(HOST_PORT_DELIM) if hostname.start_with?('.') diff --git a/spec/README.md b/spec/README.md index 00aef76748..41dc5030a7 100644 --- a/spec/README.md +++ b/spec/README.md @@ -78,12 +78,17 @@ configuration is needed: A sharded cluster can be configured with mlaunch: - mlaunch init --replicaset --name ruby-driver-rs --sharded 1 \ + mlaunch init --replicaset --name ruby-driver-rs --sharded 1 --mongos 2 \ --dir /tmp/mdb-sc --setParameter enableTestCommands=1 As with the replica set, the test suite will automatically detect sharded cluster topology. +Note that some tests require a sharded cluster with exactly one shard and +other tests require a sharded cluster with more than one shard. Tests requiring +a single shard can be run against a deployment with multiple shards by +specifying only one mongos address in MONGODB_URI. + ## TLS With Verification The test suite includes a set of TLS certificates for configuring a server diff --git a/spec/integration/reconnect_spec.rb b/spec/integration/reconnect_spec.rb index 37f768ba9e..ac30d1a3ab 100644 --- a/spec/integration/reconnect_spec.rb +++ b/spec/integration/reconnect_spec.rb @@ -3,7 +3,7 @@ describe 'Client after reconnect' do let(:client) { authorized_client } - it 'works' do + it 'is a functioning client' do client['test'].insert_one('testk' => 'testv') client.reconnect @@ -28,4 +28,84 @@ expect(new_thread).not_to eq(thread) expect(new_thread).to be_alive end + + context 'with min_pool_size > 0' do + let(:client) { authorized_client.with(min_pool_size: 1) } + + it 'recreates connection pool populator thread' do + server = client.cluster.next_primary + thread = server.pool.populator.instance_variable_get('@thread') + expect(thread).to be_alive + + thread.kill + # context switch to let the thread get killed + sleep 0.1 + expect(thread).not_to be_alive + + client.reconnect + + new_server = client.cluster.next_primary + new_thread = new_server.pool.populator.instance_variable_get('@thread') + expect(new_thread).not_to eq(thread) + expect(new_thread).to be_alive + end + end + + context 'in sharded topology' do + require_topology :sharded + require_default_port_deployment + require_multi_shard + + let(:uri) do + "mongodb+srv://test1.test.build.10gen.cc/?tls=#{SpecConfig.instance.ssl?}&tlsInsecure=true".tap do |uri| + puts "Constructed URI: #{uri}" + end + end + + # Debug logging to troubleshoot failures in Evergreen + let(:logger) do + Logger.new(STDERR). tap do |logger| + logger.level = :debug + end + end + + let(:client) do + ClientRegistry.instance.register_local_client( + Mongo::Client.new(uri, server_selection_timeout: 3.86, + logger: logger)) + end + + it 'recreates srv monitor' do + client.cluster.next_primary + if BSON::Environment.jruby? + # Wait for jruby to start SRV monitor thread + sleep 1 + end + expect(client.cluster.topology).to be_a(Mongo::Cluster::Topology::Sharded) + thread = client.cluster.srv_monitor.instance_variable_get('@thread') + expect(thread).to be_alive + + thread.kill + # context switch to let the thread get killed + sleep 0.1 + if BSON::Environment.jruby? + # jruby takes a long time here as well + 15.times do + if thread.alive? + sleep 1 + else + break + end + end + end + expect(thread).not_to be_alive + + client.reconnect + + client.cluster.next_primary + new_thread = client.cluster.srv_monitor.instance_variable_get('@thread') + expect(new_thread).not_to eq(thread) + expect(new_thread).to be_alive + end + end end diff --git a/spec/integration/srv_monitoring_spec.rb b/spec/integration/srv_monitoring_spec.rb new file mode 100644 index 0000000000..6a21553422 --- /dev/null +++ b/spec/integration/srv_monitoring_spec.rb @@ -0,0 +1,179 @@ +require 'spec_helper' + +describe 'SRV Monitoring' do + context 'with SRV lookups mocked at Resolver' do + let(:srv_result) do + double('srv result').tap do |result| + allow(result).to receive(:empty?).and_return(false) + allow(result).to receive(:address_strs).and_return( + [ClusterConfig.instance.primary_address_str]) + end + end + + let(:client) do + allow_any_instance_of(Mongo::Srv::Resolver).to receive(:get_records).and_return(srv_result) + allow_any_instance_of(Mongo::Srv::Resolver).to receive(:get_txt_options_string) + + new_local_client_nmio('mongodb+srv://foo.a.b', server_selection_timeout: 3.15) + end + + context 'standalone/replica set' do + require_topology :single, :replica_set + + it 'does not create SRV monitor' do + expect(client.cluster.topology).to be_a(Mongo::Cluster::Topology::Unknown) + + client.cluster.run_sdam_flow( + Mongo::Server::Description.new(ClusterConfig.instance.primary_address_str), + ClusterConfig.instance.primary_description, + ) + + expect(client.cluster.topology).not_to be_a(Mongo::Cluster::Topology::Unknown) + + expect(client.cluster.instance_variable_get('@srv_monitor')).to be nil + end + end + + context 'sharded cluster' do + require_topology :sharded + + it 'creates SRV monitor' do + expect(client.cluster.topology).to be_a(Mongo::Cluster::Topology::Unknown) + + # Since we force the cluster to run sdam flow which creates a monitor, + # we need to manually adjust its state. + client.cluster.instance_variable_set('@connecting', true) + + client.cluster.run_sdam_flow( + Mongo::Server::Description.new(ClusterConfig.instance.primary_address_str), + ClusterConfig.instance.primary_description, + ) + + expect(client.cluster.topology).to be_a(Mongo::Cluster::Topology::Sharded) + + expect(client.cluster.instance_variable_get('@srv_monitor')).to be_a(Mongo::Cluster::SrvMonitor) + + # Close the client in the test rather than allowing our post-test cleanup + # to take care of it, since the client references test doubles. + client.close + end + end + end + + # These tests require a sharded cluster to be launched on localhost:27017 + # and localhost:27018, plus internet connectivity for SRV record lookups. + context 'end to end' do + require_default_port_deployment + require_topology :sharded + require_multi_shard + + # JRuby apparently does not implement non-blocking UDP I/O which is used + # by RubyDNS: + # NotImplementedError: recvmsg_nonblock is not implemented + fails_on_jruby + + before(:all) do + require 'support/dns' + end + + let(:uri) do + "mongodb+srv://test-fake.test.build.10gen.cc/?tls=#{SpecConfig.instance.ssl?}&tlsInsecure=true" + end + + let(:logger) do + Logger.new(STDERR, level: Logger::DEBUG) + end + + let(:client) do + new_local_client(uri, + server_selection_timeout: 3.16, + resolv_options: { + nameserver: 'localhost', + nameserver_port: [['localhost', 5300], ['127.0.0.1', 5300]], + }, + logger: logger, + ) + end + + context 'sharded cluster' do + it 'updates topology via SRV records' do + # Expedite the polling process + allow_any_instance_of(Mongo::Cluster::SrvMonitor).to receive(:scan_interval).and_return(1) + + rules = [ + ['_mongodb._tcp.test-fake.test.build.10gen.cc', :srv, + [0, 0, 27017, 'localhost.test.build.10gen.cc'], + ], + ] + + mock_dns(rules) do + client.cluster.next_primary + expect(client.cluster.topology).to be_a(Mongo::Cluster::Topology::Sharded) + + address_strs = client.cluster.servers.map(&:address).map(&:seed).sort + expect(address_strs).to eq(%w( + localhost.test.build.10gen.cc:27017 + )) + end + + # In Evergreen there are replica set nodes on the next port number + # after mongos nodes, therefore the addresses in DNS need to accurately + # reflect how many mongos we have. + + rules = [ + ['_mongodb._tcp.test-fake.test.build.10gen.cc', :srv, + [0, 0, 27018, 'localhost.test.build.10gen.cc'], + [0, 0, 27017, 'localhost.test.build.10gen.cc'], + ], + ] + + mock_dns(rules) do + 15.times do + address_strs = client.cluster.servers_list.map(&:address).map(&:seed).sort + if address_strs == %w( + localhost.test.build.10gen.cc:27017 + localhost.test.build.10gen.cc:27018 + ) + then + break + end + sleep 1 + end + + address_strs = client.cluster.servers_list.map(&:address).map(&:seed).sort + expect(address_strs).to eq(%w( + localhost.test.build.10gen.cc:27017 + localhost.test.build.10gen.cc:27018 + )) + end + + # And because we have only two mongos in Evergreen, test removal + # separately here. + + rules = [ + ['_mongodb._tcp.test-fake.test.build.10gen.cc', :srv, + [0, 0, 27018, 'localhost.test.build.10gen.cc'], + ], + ] + + mock_dns(rules) do + 15.times do + address_strs = client.cluster.servers_list.map(&:address).map(&:seed).sort + if address_strs == %w( + localhost.test.build.10gen.cc:27018 + ) + then + break + end + sleep 1 + end + + address_strs = client.cluster.servers_list.map(&:address).map(&:seed).sort + expect(address_strs).to eq(%w( + localhost.test.build.10gen.cc:27018 + )) + end + end + end + end +end diff --git a/spec/mongo/client_construction_spec.rb b/spec/mongo/client_construction_spec.rb index d0b55e315b..db5f06758b 100644 --- a/spec/mongo/client_construction_spec.rb +++ b/spec/mongo/client_construction_spec.rb @@ -118,9 +118,7 @@ end let(:logger) do - Logger.new(STDOUT).tap do |logger| - logger.level = Logger::DEBUG - end + Logger.new(STDOUT, level: Logger::DEBUG) end let(:subscriber) do diff --git a/spec/mongo/cluster/srv_monitor_spec.rb b/spec/mongo/cluster/srv_monitor_spec.rb new file mode 100644 index 0000000000..1cdf588e5a --- /dev/null +++ b/spec/mongo/cluster/srv_monitor_spec.rb @@ -0,0 +1,214 @@ +require 'lite_spec_helper' + +xdescribe Mongo::Cluster::SrvMonitor do + describe '#scan!' do + let(:hostname) do + 'test1.test.build.10gen.cc.' + end + + let(:hosts) do + [ + 'localhost.test.build.10gen.cc.:27017', + 'localhost.test.build.10gen.cc.:27018', + ] + end + + let(:records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:srv_uri) do + Mongo::URI.get("mongodb+srv://localhost.a.b") + end + + let(:cluster) do + Mongo::Cluster.new(records.hosts, Mongo::Monitoring.new, { monitoring_io: false }) + end + + let(:monitoring) do + described_class.new(cluster, srv_uri: srv_uri) + end + + before do + monitoring.scan! + end + + context 'when a new DNS record is added' do + let(:new_hosts) do + hosts + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when a DNS record is removed' do + let(:new_hosts) do + hosts - ['test1.test.build.10gen.cc.:27018'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when a single DNS record is replaced' do + let(:new_hosts) do + hosts - ['test1.test.build.10gen.cc.:27018'] + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when all DNS records are replaced with a single record' do + let(:new_hosts) do + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when all DNS records are replaced with multiple records' do + let(:new_hosts) do + [ + 'test1.test.build.10gen.cc.:27019', + 'test1.test.build.10gen.cc.:27020', + ] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when the DNS lookup times out' do + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_raise(Resolv::ResolvTimeout) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + + context 'when the DNS lookup is unable to resolve the hostname' do + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_raise(Resolv::ResolvError) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + + context 'when no DNS records are returned' do + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return([]) + allow(records).to receive(:empty?).and_return(true) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + end +end diff --git a/spec/mongo/cluster_spec.rb b/spec/mongo/cluster_spec.rb index 5c6bcf492f..7f0e1d7cdb 100644 --- a/spec/mongo/cluster_spec.rb +++ b/spec/mongo/cluster_spec.rb @@ -267,6 +267,27 @@ expect(cluster.servers[0].address.seed).to_not eq('a') end end + + context 'topology is Sharded' do + + let(:topology) do + Mongo::Cluster::Topology::Single.new({}, cluster) + end + + before do + cluster.add('a') + end + + it 'creates server with nil last_scan' do + server = cluster.servers_list.detect do |server| + server.address.seed == 'a' + end + + expect(server).not_to be nil + + expect(server.last_scan).to be nil + end + end end describe '#disconnect!' do diff --git a/spec/mongo/server/description_spec.rb b/spec/mongo/server/description_spec.rb index cd37c4e813..50082db264 100644 --- a/spec/mongo/server/description_spec.rb +++ b/spec/mongo/server/description_spec.rb @@ -433,6 +433,23 @@ it 'returns :sharded' do expect(description.server_type).to eq(:sharded) end + + context 'when client and server addresses are different' do + let(:config) do + { 'msg' => 'isdbgrid', 'ismaster' => true, + 'minWireVersion' => 2, 'maxWireVersion' => 3, 'ok' => 1, + 'me' => '127.0.0.1', + } + end + + let(:address) do + Mongo::Address.new('localhost') + end + + it 'returns :sharded' do + expect(description.server_type).to eq(:sharded) + end + end end context 'when the server is a primary' do diff --git a/spec/mongo/srv/monitor_spec.rb b/spec/mongo/srv/monitor_spec.rb new file mode 100644 index 0000000000..c8e63346fa --- /dev/null +++ b/spec/mongo/srv/monitor_spec.rb @@ -0,0 +1,211 @@ +require 'lite_spec_helper' + +describe Mongo::SRV::Monitor do + describe '#scan!' do + let(:hostname) do + 'test1.test.build.10gen.cc.' + end + + let(:hosts) do + [ + 'localhost.test.build.10gen.cc.:27017', + 'localhost.test.build.10gen.cc.:27018', + ] + end + + let(:records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + + let(:cluster) do + Mongo::Cluster.new(records.hosts, Mongo::Monitoring.new, { monitoring_io: false }) + end + + let(:monitoring) do + described_class.new(cluster, resolver, records) + end + + before do + monitoring.scan! + end + + context 'when a new DNS record is added' do + let(:new_hosts) do + hosts + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when a DNS record is removed' do + let(:new_hosts) do + hosts - ['test1.test.build.10gen.cc.:27018'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when a single DNS record is replaced' do + let(:new_hosts) do + hosts - ['test1.test.build.10gen.cc.:27018'] + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when all DNS records are replaced with a single record' do + let(:new_hosts) do + ['test1.test.build.10gen.cc.:27019'] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when all DNS records are replaced with multiple records' do + let(:new_hosts) do + [ + 'test1.test.build.10gen.cc.:27019', + 'test1.test.build.10gen.cc.:27020', + ] + end + + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return(new_hosts) + allow(records).to receive(:empty?).and_return(false) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'adds the new host to the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(new_hosts.sort) + end + end + + context 'when the DNS lookup times out' do + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_raise(Resolv::ResolvTimeout) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + + context 'when the DNS lookup is unable to resolve the hostname' do + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_raise(Resolv::ResolvError) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + + context 'when no DNS records are returned' do + let(:new_records) do + double('records').tap do |records| + allow(records).to receive(:hostname).and_return(hostname) + allow(records).to receive(:hosts).and_return([]) + allow(records).to receive(:empty?).and_return(true) + allow(records).to receive(:min_ttl).and_return(nil) + end + end + + let(:resolver) do + double('resolver').tap do |resolver| + allow(resolver).to receive(:get_records).and_return(new_records) + end + end + + it 'does not add or remove any hosts from the cluster' do + expect(cluster.addresses.map(&:to_s).sort).to eq(hosts.sort) + end + end + end +end \ No newline at end of file diff --git a/spec/mongo/uri/srv_protocol_spec.rb b/spec/mongo/uri/srv_protocol_spec.rb index ae7c95c11e..6f4daa8d16 100644 --- a/spec/mongo/uri/srv_protocol_spec.rb +++ b/spec/mongo/uri/srv_protocol_spec.rb @@ -962,7 +962,7 @@ end end - describe '#validate_hostname' do + describe '#validate_srv_hostname' do let(:valid_hostname) do end @@ -971,7 +971,7 @@ end let(:validate) do - dummy_uri.send(:validate_hostname, hostname) + dummy_uri.send(:validate_srv_hostname, hostname) end context 'when the hostname is valid' do diff --git a/spec/support/common_shortcuts.rb b/spec/support/common_shortcuts.rb index b66819127c..4e71d99c24 100644 --- a/spec/support/common_shortcuts.rb +++ b/spec/support/common_shortcuts.rb @@ -189,5 +189,54 @@ def stop_monitoring(*clients) client.cluster.disconnect! end end + + DNS_INTERFACES = [ + [:udp, "0.0.0.0", 5300], + [:tcp, "0.0.0.0", 5300], + ] + + def mock_dns(config) + semaphore = Mongo::Semaphore.new + + thread = Thread.new do + RubyDNS::run_server(DNS_INTERFACES) do + config.each do |(query, type, *answers)| + + resource_cls = Resolv::DNS::Resource::IN.const_get(type.to_s.upcase) + resources = answers.map do |answer| + resource_cls.new(*answer) + end + match(query, resource_cls) do |req| + req.add(resources) + end + end + + semaphore.signal + end + end + + semaphore.wait + + begin + yield + ensure + 10.times do + if $last_async_task + break + end + sleep 0.5 + end + + # Hack to stop the server - https://github.com/socketry/rubydns/issues/75 + if $last_async_task.nil? + STDERR.puts "No async task - server never started?" + else + $last_async_task.stop + end + + thread.kill + thread.join + end + end end end diff --git a/spec/support/constraints.rb b/spec/support/constraints.rb index 8af8f6fe64..581c6070a8 100644 --- a/spec/support/constraints.rb +++ b/spec/support/constraints.rb @@ -158,6 +158,14 @@ def require_set_write_concern end end + def require_multi_shard + before do + if ClusterConfig.instance.topology == :sharded && SpecConfig.instance.addresses.length == 1 + skip 'Test requires a minimum of two shards if run in sharded topology' + end + end + end + def require_no_multi_shard before do if ClusterConfig.instance.topology == :sharded && SpecConfig.instance.addresses.length > 1 @@ -191,4 +199,21 @@ def require_mmapv1 end end end + + # Integration tests for SRV polling require internet connectivity to + # look up SRV records and a sharded cluster configured on default port on + # localhost (localhost:27017, localhost:27018). + def require_default_port_deployment + # Because the DNS records at test1.test.build.10gen.cc point to + # localhost:27017 & localhost:27018, the test suite must have been + # configured to use these addresses + before(:all) do + have_default_port = SpecConfig.instance.addresses.any? do |address| + %w(127.0.0.1 127.0.0.1:27017 localhost localhost:27017).include?(address) + end + unless have_default_port + skip 'This test requires the test suite to be configured for localhost:27017' + end + end + end end diff --git a/spec/support/dns.rb b/spec/support/dns.rb new file mode 100644 index 0000000000..b7e36b0b86 --- /dev/null +++ b/spec/support/dns.rb @@ -0,0 +1,13 @@ +require 'rubydns' + +# Hack to stop the server - https://github.com/socketry/rubydns/issues/75 +module Async + class Task + alias :run_without_record :run + def run(*args) + run_without_record.tap do + $last_async_task = self + end + end + end +end diff --git a/spec/support/lite_constraints.rb b/spec/support/lite_constraints.rb index 65fe9f5d8d..16e6ffadb0 100644 --- a/spec/support/lite_constraints.rb +++ b/spec/support/lite_constraints.rb @@ -1,7 +1,7 @@ module LiteConstraints # Constrain tests that use TimeoutInterrupt to MRI (and Unix) def only_mri - before do + before(:all) do unless SpecConfig.instance.mri? skip "MRI required, we have #{SpecConfig.instance.platform}" end @@ -13,7 +13,7 @@ def only_mri # with jruby). # Often times these failures happen only in Evergreen. def fails_on_jruby(version=nil) - before do + before(:all) do if BSON::Environment.jruby? if version min_parts = version.split('.').map(&:to_i) @@ -30,7 +30,7 @@ def fails_on_jruby(version=nil) end def require_external_connectivity - before do + before(:all) do if ENV['EXTERNAL_DISABLED'] skip "Test requires external connectivity" end @@ -38,7 +38,7 @@ def require_external_connectivity end def require_mongo_kerberos - before do + before(:all) do skip 'KERBEROS_REQUIRED env var not specified' unless ENV['KERBEROS_REQUIRED'] require 'mongo_kerberos' end