From a288ad38b38e5e24bf06436b69444f824d6545f9 Mon Sep 17 00:00:00 2001 From: Oleg Pudeyev Date: Fri, 12 Oct 2018 23:16:05 -0400 Subject: [PATCH] Reorder methods in cluster to be more logically organized --- lib/mongo/cluster.rb | 658 +++++++++++++++++++++---------------------- 1 file changed, 328 insertions(+), 330 deletions(-) diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 174971cc43..a53ef96d04 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -120,6 +120,28 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) @connected = true end + # Create a cluster for the provided client, for use when we don't want the + # client's original cluster instance to be the same. + # + # @api private + # + # @example Create a cluster for the client. + # Cluster.create(client) + # + # @param [ Client ] client The client to create on. + # + # @return [ Cluster ] The cluster. + # + # @since 2.0.0 + def self.create(client) + cluster = Cluster.new( + client.cluster.addresses.map(&:to_s), + Monitoring.new, + client.options + ) + client.instance_variable_set(:@cluster, cluster) + end + # @return [ Hash ] The options hash. attr_reader :options @@ -157,73 +179,99 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) :single?, :unknown? def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor - # Determine if this cluster of servers is equal to another object. Checks the - # servers currently in the cluster, not what was configured. + # Get the maximum number of times the cluster can retry a read operation on + # a mongos. # - # @example Is the cluster equal to the object? - # cluster == other + # @example Get the max read retries. + # cluster.max_read_retries # - # @param [ Object ] other The object to compare to. + # @return [ Integer ] The maximum retries. # - # @return [ true, false ] If the objects are equal. + # @since 2.1.1 + def max_read_retries + options[:max_read_retries] || MAX_READ_RETRIES + end + + # Get the interval, in seconds, in which a mongos read operation is + # retried. # - # @since 2.0.0 - def ==(other) - return false unless other.is_a?(Cluster) - addresses == other.addresses && options == other.options + # @example Get the read retry interval. + # cluster.read_retry_interval + # + # @return [ Float ] The interval. + # + # @since 2.1.1 + def read_retry_interval + options[:read_retry_interval] || READ_RETRY_INTERVAL end - # Add a server to the cluster with the provided address. Useful in - # auto-discovery of new servers when an existing server executes an ismaster - # and potentially non-configured servers were included. + # Whether the cluster object is connected to its cluster. # - # @example Add the server for the address to the cluster. - # cluster.add('127.0.0.1:27018') + # @return [ true|false ] Whether the cluster is connected. # - # @param [ String ] host The address of the server to add. + # @api private + # @since 2.7.0 + def connected? + !!@connected + end + + # Get a list of server candidates from the cluster that can have operations + # executed on them. # - # @return [ Server ] The newly added server, if not present already. + # @example Get the server candidates for an operation. + # cluster.servers + # + # @return [ Array ] The candidate servers. # # @since 2.0.0 - def add(host) - address = Address.new(host, options) - if !addresses.include?(address) - if addition_allowed?(address) - server = Server.new(address, self, @monitoring, event_listeners, options.merge( - monitor: false)) - @update_lock.synchronize { @servers.push(server) } - server.start_monitoring - server - end - end + def servers + topology.servers(servers_list.compact).compact end - # Determine if the cluster would select a readable server for the - # provided read preference. + # The addresses in the cluster. # - # @example Is a readable server present? - # topology.has_readable_server?(server_selector) + # @example Get the addresses in the cluster. + # cluster.addresses # - # @param [ ServerSelector ] server_selector The server - # selector. + # @return [ Array ] The addresses. # - # @return [ true, false ] If a readable server is present. + # @since 2.0.6 + def addresses + servers_list.map(&:address).dup + end + + # The logical session timeout value in minutes. # - # @since 2.4.0 - def has_readable_server?(server_selector = nil) - topology.has_readable_server?(self, server_selector) + # @example Get the logical session timeout in minutes. + # cluster.logical_session_timeout + # + # @return [ Integer, nil ] The logical session timeout. + # + # @since 2.5.0 + def logical_session_timeout + servers.inject(nil) do |min, server| + break unless timeout = server.logical_session_timeout + [timeout, (min || timeout)].min + end end - # Determine if the cluster would select a writable server. + # Get the nicer formatted string for use in inspection. # - # @example Is a writable server present? - # topology.has_writable_server? + # @example Inspect the cluster. + # cluster.inspect # - # @return [ true, false ] If a writable server is present. + # @return [ String ] The cluster inspection. # - # @since 2.4.0 - def has_writable_server? - topology.has_writable_server?(self) + # @since 2.0.0 + def inspect + "#" + end + + # @api experimental + def summary + "#" end # Finalize the cluster for garbage collection. Disconnects all the scoped @@ -249,142 +297,133 @@ def self.finalize(pools, periodic_executor, session_pool) end end - # Get the nicer formatted string for use in inspection. + # Disconnect all servers. # - # @example Inspect the cluster. - # cluster.inspect + # @note Applications should call Client#close to disconnect from + # the cluster rather than calling this method. This method is for + # internal driver use only. # - # @return [ String ] The cluster inspection. + # @example Disconnect the cluster's servers. + # cluster.disconnect! # - # @since 2.0.0 - def inspect - "#" + # @return [ true ] Always true. + # + # @since 2.1.0 + def disconnect! + unless @connecting || @connected + return true + end + @periodic_executor.stop! + @servers.each do |server| + server.disconnect! + publish_sdam_event( + Monitoring::SERVER_CLOSED, + Monitoring::Event::ServerClosed.new(server.address, topology) + ) + end + publish_sdam_event( + Monitoring::TOPOLOGY_CLOSED, + Monitoring::Event::TopologyClosed.new(topology) + ) + @connecting = @connected = false + true end - # @api experimental - def summary - "#" + # Reconnect all servers. + # + # @example Reconnect the cluster's servers. + # cluster.reconnect! + # + # @return [ true ] Always true. + # + # @since 2.1.0 + # @deprecated Use Client#reconnect to reconnect to the cluster instead of + # calling this method. This method does not send SDAM events. + def reconnect! + @connecting = true + scan! + servers.each do |server| + server.reconnect! + end + @periodic_executor.restart! + @connecting = false + @connected = true end - # Get the next primary server we can send an operation to. - # - # @example Get the next primary server. - # cluster.next_primary + # Force a scan of all known servers in the cluster. # - # @param [ true, false ] ping Whether to ping the server before selection. Deprecated, - # not necessary with the implementation of the Server Selection specification. + # @example Force a full cluster scan. + # cluster.scan! # + # @note This operation is done synchronously. If servers in the cluster are + # down or slow to respond this can potentially be a slow operation. # - # @return [ Mongo::Server ] A primary server. + # @return [ true ] Always true. # # @since 2.0.0 - def next_primary(ping = true) - @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) - @primary_selector.select_server(self) + def scan! + servers_list.each{ |server| server.scan! } and true end - # Elect a primary server from the description that has just changed to a - # primary. + # Determine if this cluster of servers is equal to another object. Checks the + # servers currently in the cluster, not what was configured. # - # @example Elect a primary server. - # cluster.elect_primary!(description) + # @example Is the cluster equal to the object? + # cluster == other # - # @param [ Server::Description ] description The newly elected primary. + # @param [ Object ] other The object to compare to. # - # @return [ Topology ] The cluster topology. + # @return [ true, false ] If the objects are equal. # # @since 2.0.0 - def elect_primary!(description) - # Update descriptions - temporary hack until - # https://jira.mongodb.org/browse/RUBY-1509 is implemented. - # There are multiple placet that can update descriptions, these should - # be DRYed to a single one. - servers = servers_list - servers.each do |server| - if server.address == description.address - server.update_description(description) - end - end - - new_topology = nil - if topology.unknown? - new_topology = if description.mongos? - Topology::Sharded.new(topology.options, topology.monitoring, self) - else - initialize_replica_set(description, servers) - end - elsif topology.replica_set? - if description.replica_set_name == replica_set_name - if detect_stale_primary!(description) - # Since detect_stale_primary! can mutate description, - # we need another pass of updating descriptions on our servers. - # https://jira.mongodb.org/browse/RUBY-1509 - servers.each do |server| - if server.address == description.address - server.update_description(description) - end - end - else - # If we had another server marked as primary, mark that one - # unknown. - servers.each do |server| - if server.primary? && server.address != description.address - server.description.unknown! - end - end - - # This mutates the old topology. - # Instead of this the old topology should be left untouched - # and the new values should only be given to the new topology. - # But since there is some logic in these methods, - # this will be addressed by https://jira.mongodb.org/browse/RUBY-1511 - topology.update_max_election_id(description) - topology.update_max_set_version(description) + def ==(other) + return false unless other.is_a?(Cluster) + addresses == other.addresses && options == other.options + end - cls = if servers.any?(&:primary?) - Topology::ReplicaSetWithPrimary - else - Topology::ReplicaSetNoPrimary - end - new_topology = cls.new(topology.options, - topology.monitoring, - self, - topology.max_election_id, - topology.max_set_version) - end - else - log_warn( - "Server #{description.address.to_s} has incorrect replica set name: " + - "'#{description.replica_set_name}'. The current replica set name is '#{topology.replica_set_name}'." - ) - end - end + # Determine if the cluster would select a readable server for the + # provided read preference. + # + # @example Is a readable server present? + # topology.has_readable_server?(server_selector) + # + # @param [ ServerSelector ] server_selector The server + # selector. + # + # @return [ true, false ] If a readable server is present. + # + # @since 2.4.0 + def has_readable_server?(server_selector = nil) + topology.has_readable_server?(self, server_selector) + end - if new_topology - update_topology(new_topology) - # Even though the topology class selection above already attempts - # to figure out if the topology has a primary, in some cases - # we already are in a replica set topology and an additional - # primary check must be performed here. - if topology.replica_set? - check_if_has_primary - end - end + # Determine if the cluster would select a writable server. + # + # @example Is a writable server present? + # topology.has_writable_server? + # + # @return [ true, false ] If a writable server is present. + # + # @since 2.4.0 + def has_writable_server? + topology.has_writable_server?(self) end - # Get the maximum number of times the cluster can retry a read operation on - # a mongos. + # Get the next primary server we can send an operation to. # - # @example Get the max read retries. - # cluster.max_read_retries + # @example Get the next primary server. + # cluster.next_primary # - # @return [ Integer ] The maximum retries. + # @param [ true, false ] ping Whether to ping the server before selection. Deprecated, + # not necessary with the implementation of the Server Selection specification. # - # @since 2.1.1 - def max_read_retries - options[:max_read_retries] || MAX_READ_RETRIES + # + # @return [ Mongo::Server ] A primary server. + # + # @since 2.0.0 + def next_primary(ping = true) + @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) + @primary_selector.select_server(self) end # Get the scoped connection pool for the server. @@ -403,36 +442,51 @@ def pool(server) end end - # Get the interval, in seconds, in which a mongos read operation is - # retried. + # Update the max cluster time seen in a response. # - # @example Get the read retry interval. - # cluster.read_retry_interval + # @example Update the cluster time. + # cluster.update_cluster_time(result) # - # @return [ Float ] The interval. + # @param [ Operation::Result ] result The operation result containing the cluster time. # - # @since 2.1.1 - def read_retry_interval - options[:read_retry_interval] || READ_RETRY_INTERVAL + # @return [ Object ] The cluster time. + # + # @since 2.5.0 + def update_cluster_time(result) + if cluster_time_doc = result.cluster_time + @cluster_time_lock.synchronize do + if @cluster_time.nil? + @cluster_time = cluster_time_doc + elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME] + @cluster_time = cluster_time_doc + end + end + end end - # Notify the cluster that a standalone server was discovered so that the - # topology can be updated accordingly. + # Add a server to the cluster with the provided address. Useful in + # auto-discovery of new servers when an existing server executes an ismaster + # and potentially non-configured servers were included. # - # @example Notify the cluster that a standalone server was discovered. - # cluster.standalone_discovered + # @example Add the server for the address to the cluster. + # cluster.add('127.0.0.1:27018') # - # @return [ Topology ] The cluster topology. + # @param [ String ] host The address of the server to add. # - # @since 2.0.6 - def standalone_discovered - if topology.unknown? - if seeds.length == 1 - update_topology( - Topology::Single.new(topology.options, topology.monitoring, self)) + # @return [ Server ] The newly added server, if not present already. + # + # @since 2.0.0 + def add(host) + address = Address.new(host, options) + if !addresses.include?(address) + if addition_allowed?(address) + server = Server.new(address, self, @monitoring, event_listeners, options.merge( + monitor: false)) + @update_lock.synchronize { @servers.push(server) } + server.start_monitoring + server end end - topology end # Remove the server from the cluster for the provided address, if it @@ -460,97 +514,6 @@ def remove(host) removed_servers.any? end - # Force a scan of all known servers in the cluster. - # - # @example Force a full cluster scan. - # cluster.scan! - # - # @note This operation is done synchronously. If servers in the cluster are - # down or slow to respond this can potentially be a slow operation. - # - # @return [ true ] Always true. - # - # @since 2.0.0 - def scan! - servers_list.each{ |server| server.scan! } and true - end - - # Get a list of server candidates from the cluster that can have operations - # executed on them. - # - # @example Get the server candidates for an operation. - # cluster.servers - # - # @return [ Array ] The candidate servers. - # - # @since 2.0.0 - def servers - topology.servers(servers_list.compact).compact - end - - # Disconnect all servers. - # - # @note Applications should call Client#close to disconnect from - # the cluster rather than calling this method. This method is for - # internal driver use only. - # - # @example Disconnect the cluster's servers. - # cluster.disconnect! - # - # @return [ true ] Always true. - # - # @since 2.1.0 - def disconnect! - unless @connecting || @connected - return true - end - @periodic_executor.stop! - @servers.each do |server| - server.disconnect! - publish_sdam_event( - Monitoring::SERVER_CLOSED, - Monitoring::Event::ServerClosed.new(server.address, topology) - ) - end - publish_sdam_event( - Monitoring::TOPOLOGY_CLOSED, - Monitoring::Event::TopologyClosed.new(topology) - ) - @connecting = @connected = false - true - end - - # Reconnect all servers. - # - # @example Reconnect the cluster's servers. - # cluster.reconnect! - # - # @return [ true ] Always true. - # - # @since 2.1.0 - # @deprecated Use Client#reconnect to reconnect to the cluster instead of - # calling this method. This method does not send SDAM events. - def reconnect! - @connecting = true - scan! - servers.each do |server| - server.reconnect! - end - @periodic_executor.restart! - @connecting = false - @connected = true - end - - # Whether the cluster object is connected to its cluster. - # - # @return [ true|false ] Whether the cluster is connected. - # - # @api private - # @since 2.7.0 - def connected? - !!@connected - end - # Add hosts in a description to the cluster. # # @example Add hosts in a description to the cluster. @@ -581,75 +544,112 @@ def remove_hosts(description) end end - # Create a cluster for the provided client, for use when we don't want the - # client's original cluster instance to be the same. - # - # @api private + # Elect a primary server from the description that has just changed to a + # primary. # - # @example Create a cluster for the client. - # Cluster.create(client) + # @example Elect a primary server. + # cluster.elect_primary!(description) # - # @param [ Client ] client The client to create on. + # @param [ Server::Description ] description The newly elected primary. # - # @return [ Cluster ] The cluster. + # @return [ Topology ] The cluster topology. # # @since 2.0.0 - def self.create(client) - cluster = Cluster.new( - client.cluster.addresses.map(&:to_s), - Monitoring.new, - client.options - ) - client.instance_variable_set(:@cluster, cluster) - end + def elect_primary!(description) + # Update descriptions - temporary hack until + # https://jira.mongodb.org/browse/RUBY-1509 is implemented. + # There are multiple placet that can update descriptions, these should + # be DRYed to a single one. + servers = servers_list + servers.each do |server| + if server.address == description.address + server.update_description(description) + end + end - # The addresses in the cluster. - # - # @example Get the addresses in the cluster. - # cluster.addresses - # - # @return [ Array ] The addresses. - # - # @since 2.0.6 - def addresses - servers_list.map(&:address).dup - end + new_topology = nil + if topology.unknown? + new_topology = if description.mongos? + Topology::Sharded.new(topology.options, topology.monitoring, self) + else + initialize_replica_set(description, servers) + end + elsif topology.replica_set? + if description.replica_set_name == replica_set_name + if detect_stale_primary!(description) + # Since detect_stale_primary! can mutate description, + # we need another pass of updating descriptions on our servers. + # https://jira.mongodb.org/browse/RUBY-1509 + servers.each do |server| + if server.address == description.address + server.update_description(description) + end + end + else + # If we had another server marked as primary, mark that one + # unknown. + servers.each do |server| + if server.primary? && server.address != description.address + server.description.unknown! + end + end - # The logical session timeout value in minutes. - # - # @example Get the logical session timeout in minutes. - # cluster.logical_session_timeout - # - # @return [ Integer, nil ] The logical session timeout. - # - # @since 2.5.0 - def logical_session_timeout - servers.inject(nil) do |min, server| - break unless timeout = server.logical_session_timeout - [timeout, (min || timeout)].min + # This mutates the old topology. + # Instead of this the old topology should be left untouched + # and the new values should only be given to the new topology. + # But since there is some logic in these methods, + # this will be addressed by https://jira.mongodb.org/browse/RUBY-1511 + topology.update_max_election_id(description) + topology.update_max_set_version(description) + + cls = if servers.any?(&:primary?) + Topology::ReplicaSetWithPrimary + else + Topology::ReplicaSetNoPrimary + end + new_topology = cls.new(topology.options, + topology.monitoring, + self, + topology.max_election_id, + topology.max_set_version) + end + else + log_warn( + "Server #{description.address.to_s} has incorrect replica set name: " + + "'#{description.replica_set_name}'. The current replica set name is '#{topology.replica_set_name}'." + ) + end + end + + if new_topology + update_topology(new_topology) + # Even though the topology class selection above already attempts + # to figure out if the topology has a primary, in some cases + # we already are in a replica set topology and an additional + # primary check must be performed here. + if topology.replica_set? + check_if_has_primary + end end end - # Update the max cluster time seen in a response. - # - # @example Update the cluster time. - # cluster.update_cluster_time(result) + # Notify the cluster that a standalone server was discovered so that the + # topology can be updated accordingly. # - # @param [ Operation::Result ] result The operation result containing the cluster time. + # @example Notify the cluster that a standalone server was discovered. + # cluster.standalone_discovered # - # @return [ Object ] The cluster time. + # @return [ Topology ] The cluster topology. # - # @since 2.5.0 - def update_cluster_time(result) - if cluster_time_doc = result.cluster_time - @cluster_time_lock.synchronize do - if @cluster_time.nil? - @cluster_time = cluster_time_doc - elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME] - @cluster_time = cluster_time_doc - end + # @since 2.0.6 + def standalone_discovered + if topology.unknown? + if seeds.length == 1 + update_topology( + Topology::Single.new(topology.options, topology.monitoring, self)) end end + topology end # Handles a change in server description. @@ -715,6 +715,14 @@ def server_description_changed(previous_description, updated_description) end end + # @api private + def member_discovered + if topology.unknown? || topology.single? + publish_sdam_event(Monitoring::TOPOLOGY_CHANGED, + Monitoring::Event::TopologyChanged.new(topology, topology)) + end + end + private # Checks if the cluster has a primary, and if not, transitions the topology @@ -837,15 +845,5 @@ def pools def servers_list @update_lock.synchronize { @servers.dup } end - - public - - # @api private - def member_discovered - if topology.unknown? || topology.single? - publish_sdam_event(Monitoring::TOPOLOGY_CHANGED, - Monitoring::Event::TopologyChanged.new(topology, topology)) - end - end end end