Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5169047
Event modules
durran Jan 22, 2016
256f91b
Adding Rdoc
durran Jan 24, 2016
f746b96
Adding SDAM Monitoring topics
durran Jan 24, 2016
1be6d83
Add monitoring into topologies
durran Jan 24, 2016
63f75cb
Fire topology changed events
durran Jan 24, 2016
9a04dfc
Adding the log subscribers
durran Jan 25, 2016
bf934f8
SPEC-222: Implementing more SDAM events
durran Feb 4, 2016
111c46c
SPEC-222: Implementing test runner
durran Feb 8, 2016
614eb4c
SPEC-222: Publishing description changed events
durran Feb 9, 2016
b31e7ad
SPEC-222: Monitoring server closed events
durran Feb 9, 2016
c8256c7
Removing non-important fields for tests, defaults are correct
durran Feb 18, 2016
5b0954a
Adding secondary server to previous description
durran Feb 19, 2016
108b4ed
SPEC-222: Fixing specs
durran Mar 6, 2016
0cb07df
SPEC-222: Ensure events are generated
durran Mar 10, 2016
c0c7f6c
SPEC-222: Starting work on matchers
durran Mar 10, 2016
1b8f539
SPEC-222: Adding topology & description matchers
durran Mar 11, 2016
d17ebb6
SPEC-222: Remove old logging
durran Mar 15, 2016
e3928b2
SPEC-222: Unknown topology read/write check
durran Mar 27, 2016
851cf43
SPEC-222: Single topology read/write check
durran Mar 27, 2016
bc6feb2
SPEC-222: Sharded topology read/write check
durran Mar 27, 2016
86cbd47
SPEC-222: Replica set topology read/write check
durran Mar 27, 2016
be8a12e
SPEC-222: Replica set check uses server selector
durran Mar 27, 2016
c130941
SPEC-222: Delegate to topology from cluster
durran Mar 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Mongo
# @since 2.0.0
class Cluster
extend Forwardable
include Monitoring::Publishable
include Event::Subscriber
include Loggable

Expand All @@ -38,6 +39,9 @@ class Cluster
# @return [ Hash ] The options hash.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# @return [ Object ] The cluster topology.
attr_reader :topology

Expand Down Expand Up @@ -75,7 +79,6 @@ def add(host)
address = Address.new(host)
if !addresses.include?(address)
if addition_allowed?(address)
log_debug("Adding #{address.to_s} to the cluster.")
@update_lock.synchronize { @addresses.push(address) }
server = Server.new(address, self, @monitoring, event_listeners, options)
@update_lock.synchronize { @servers.push(server) }
Expand All @@ -84,6 +87,34 @@ def add(host)
end
end

# Determine if the cluster would select a readable server for the
# provided read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(server_selector)
#
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.3.0
def has_readable_server?(server_selector)
topology.has_readable_server?(self, server_selector)
end

# Determine if the cluster would select a writable server.
#
# @example Is a writable server present?
# topology.has_writable_server?
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.3.0
def has_writable_server?
topology.has_writable_server?(self)
end

# Instantiate the new cluster.
#
# @api private
Expand All @@ -104,7 +135,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@monitoring = monitoring
@event_listeners = Event::Listeners.new
@options = options.freeze
@topology = Topology.initial(seeds, options)
@topology = Topology.initial(seeds, monitoring, options)
@update_lock = Mutex.new
@pool_lock = Mutex.new

Expand Down Expand Up @@ -242,11 +273,14 @@ def standalone_discovered
#
# @since 2.0.0
def remove(host)
log_debug("#{host} being removed from the cluster.")
address = Address.new(host)
removed_servers = @servers.select { |s| s.address == address }
@update_lock.synchronize { @servers = @servers - removed_servers }
removed_servers.each{ |server| server.disconnect! } if removed_servers
publish_sdam_event(
Monitoring::SERVER_CLOSED,
Monitoring::Event::ServerClosed.new(address, topology)
)
@update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

Expand Down
14 changes: 8 additions & 6 deletions lib/mongo/cluster/topology.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,29 @@ module Topology
replica_set: ReplicaSet,
sharded: Sharded,
direct: Single
}
}.freeze

# Get the initial cluster topology for the provided options.
#
# @example Get the initial cluster topology.
# Topology.initial(topology: :replica_set)
#
# @param [ Array<String> ] seeds The addresses of the configured servers.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Hash ] options The cluster options.
#
# @return [ ReplicaSet, Sharded, Single ] The topology.
#
# @since 2.0.0
def initial(seeds, options)
if options.has_key?(:connect)
OPTIONS.fetch(options[:connect]).new(options, seeds)
def initial(seeds, monitoring, options)
topology = if options.has_key?(:connect)
OPTIONS.fetch(options[:connect]).new(options, monitoring, seeds)
elsif options.has_key?(:replica_set)
ReplicaSet.new(options, seeds)
ReplicaSet.new(options, monitoring, options)
else
Unknown.new(options, seeds)
Unknown.new(options, monitoring, seeds)
end
topology
end
end
end
Expand Down
45 changes: 44 additions & 1 deletion lib/mongo/cluster/topology/replica_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Topology
# @since 2.0.0
class ReplicaSet
include Loggable
include Monitoring::Publishable

# Constant for the replica set name configuration option.
#
Expand All @@ -30,6 +31,9 @@ class ReplicaSet
# @return [ Hash ] options The options.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# The display name for the topology.
#
# @since 2.0.0
Expand Down Expand Up @@ -79,18 +83,57 @@ def elect_primary(description, servers)
self
end

# Determine if the topology would select a readable server for the
# provided candidates and read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(cluster, server_selector)
#
# @param [ Cluster ] cluster The cluster.
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.3.0
def has_readable_server?(cluster, server_selector)
server_selector.candidates(cluster).any?
end

# Determine if the topology would select a writable server for the
# provided candidates.
#
# @example Is a writable server present?
# topology.has_writable_server?(servers)
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.3.0
def has_writable_server?(cluster)
cluster.servers.any?{ |server| server.primary? }
end

# Initialize the topology with the options.
#
# @example Initialize the topology.
# ReplicaSet.new(options)
#
# @param [ Hash ] options The options.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Array<String> ] seeds The seeds.
#
# @since 2.0.0
def initialize(options, seeds = [])
def initialize(options, monitoring, seeds = [])
@options = options
@monitoring = monitoring
@max_election_id = nil
@max_set_version = nil
publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(self)
)
end

# A replica set topology is a replica set.
Expand Down
44 changes: 43 additions & 1 deletion lib/mongo/cluster/topology/sharded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ module Topology
#
# @since 2.0.0
class Sharded
include Monitoring::Publishable

# The display name for the topology.
#
# @since 2.0.0
NAME = 'Sharded'.freeze

# @return [ Hash ] options The options.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# Get the display name.
#
# @example Get the display name.
Expand All @@ -51,16 +58,51 @@ def display_name
# @return [ Sharded ] The topology.
def elect_primary(description, servers); self; end

# Determine if the topology would select a readable server for the
# provided candidates and read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(cluster, server_selector)
#
# @param [ Cluster ] cluster The cluster.
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.3.0
def has_readable_server?(cluster, server_selector); true; end

# Determine if the topology would select a writable server for the
# provided candidates.
#
# @example Is a writable server present?
# topology.has_writable_server?(servers)
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.3.0
def has_writable_server?(cluster); true; end

# Initialize the topology with the options.
#
# @example Initialize the topology.
# Sharded.new(options)
#
# @param [ Hash ] options The options.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Array<String> ] seeds The seeds.
#
# @since 2.0.0
def initialize(options, seeds = [])
def initialize(options, monitoring, seeds = [])
@options = options
@monitoring = monitoring
publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(self)
)
end

# A sharded topology is not a replica set.
Expand Down
54 changes: 50 additions & 4 deletions lib/mongo/cluster/topology/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@ module Topology
#
# @since 2.0.0
class Single

# @return [ String ] seed The seed address.
attr_reader :seed
include Monitoring::Publishable

# The display name for the topology.
#
# @since 2.0.0
NAME = 'Single'.freeze

# @return [ Hash ] options The options.
attr_reader :options

# @return [ String ] seed The seed address.
attr_reader :seed

# @return [ monitoring ] monitoring the monitoring.
attr_reader :monitoring

# Get the display name.
#
# @example Get the display name.
Expand All @@ -54,17 +61,56 @@ def display_name
# @return [ Single ] The topology.
def elect_primary(description, servers); self; end

# Determine if the topology would select a readable server for the
# provided candidates and read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(cluster, server_selector)
#
# @param [ Cluster ] cluster The cluster.
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.3.0
def has_readable_server?(cluster, server_selector)
server_selector.candidates(cluster).any?
end

# Determine if the topology would select a writable server for the
# provided candidates.
#
# @example Is a writable server present?
# topology.has_writable_server?(servers)
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.3.0
def has_writable_server?(cluster)
cluster.servers.any?{ |server| server.primary? }
end

# Initialize the topology with the options.
#
# @example Initialize the topology.
# Single.new(options)
#
# @param [ Hash ] options The options.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Array<String> ] seeds The seeds.
#
# @since 2.0.0
def initialize(options, seeds = [])
def initialize(options, monitoring, seeds = [])
@options = options
@monitoring = monitoring
@seed = seeds.first
publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(self)
)
end

# A single topology is not a replica set.
Expand Down
Loading