Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6f5fd4d
Event modules
durran Jan 22, 2016
be5f89a
Adding Rdoc
durran Jan 24, 2016
9d7b95b
Adding SDAM Monitoring topics
durran Jan 24, 2016
53c1c82
Add monitoring into topologies
durran Jan 24, 2016
39bff45
Fire topology changed events
durran Jan 24, 2016
0011d59
Adding the log subscribers
durran Jan 25, 2016
fc77c39
SPEC-222: Implementing more SDAM events
durran Feb 4, 2016
3c56e74
SPEC-222: Implementing test runner
durran Feb 8, 2016
aeac609
SPEC-222: Publishing description changed events
durran Feb 9, 2016
ca7bed7
SPEC-222: Monitoring server closed events
durran Feb 9, 2016
b458c99
Removing non-important fields for tests, defaults are correct
durran Feb 18, 2016
b031787
Adding secondary server to previous description
durran Feb 19, 2016
2a4931a
SPEC-222: Fixing specs
durran Mar 6, 2016
5fe745c
SPEC-222: Ensure events are generated
durran Mar 10, 2016
49545aa
SPEC-222: Starting work on matchers
durran Mar 10, 2016
6bc6684
SPEC-222: Adding topology & description matchers
durran Mar 11, 2016
60ee5c7
SPEC-222: Remove old logging
durran Mar 15, 2016
77d8ee8
SPEC-222: Unknown topology read/write check
durran Mar 27, 2016
b229284
SPEC-222: Single topology read/write check
durran Mar 27, 2016
9a726e8
SPEC-222: Sharded topology read/write check
durran Mar 27, 2016
c8e0d0a
SPEC-222: Replica set topology read/write check
durran Mar 27, 2016
d89240a
SPEC-222: Replica set check uses server selector
durran Mar 27, 2016
e962402
SPEC-222: Delegate to topology from cluster
durran Mar 27, 2016
cbd0f82
RUBY-1096 Add MemberDiscovered event and updated SDAM Monitoring impl…
estolfo Oct 31, 2016
6a7e360
RUBY-1096 Update #has_readable_server? and #has_writable_server? impl…
estolfo Nov 25, 2016
1344ba6
RUBY-1096 Change montioring message to say members changed in topology
estolfo Nov 28, 2016
8b673c4
RUBY-1096 Remove extra candidates method definition
estolfo Nov 28, 2016
7b89da3
RUBY-1096 Single always returns true for has_readable_server? and has…
estolfo Nov 28, 2016
11ee31e
RUBY-1096 Clean up spec
estolfo Nov 28, 2016
9fe5a5a
RUBY-1096 Clean up SDAM spec
estolfo Nov 28, 2016
a7fa87d
RUBY-1096 Small typo fixes and spec cleanup
estolfo Nov 28, 2016
d6bf77f
RUBY-1096 Deprecate PrimaryElected event in favor of MemberDiscovered…
estolfo Nov 28, 2016
dc53a07
RUBY-1096 Increase heartbeat_frequency on client used in sdam test
estolfo Nov 29, 2016
004c871
RUBY-1096 Don't refer to localhost:27017 in SDAM test, as there proba…
estolfo Nov 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module Mongo
# @since 2.0.0
class Cluster
extend Forwardable
include Monitoring::Publishable
include Event::Subscriber
include Loggable

Expand All @@ -45,6 +46,9 @@ class Cluster
# @return [ Hash ] The options hash.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# @return [ Object ] The cluster topology.
attr_reader :topology

Expand All @@ -54,7 +58,8 @@ class Cluster
# @since 2.4.0
attr_reader :app_metadata

def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown?
def_delegators :topology, :replica_set?, :replica_set_name, :sharded?,
:single?, :unknown?, :member_discovered
def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor

# Determine if this cluster of servers is equal to another object. Checks the
Expand Down Expand Up @@ -89,7 +94,6 @@ def add(host)
address = Address.new(host)
if !addresses.include?(address)
if addition_allowed?(address)
log_debug("Adding #{address.to_s} to the cluster.")
@update_lock.synchronize { @addresses.push(address) }
server = Server.new(address, self, @monitoring, event_listeners, options)
@update_lock.synchronize { @servers.push(server) }
Expand All @@ -98,6 +102,34 @@ def add(host)
end
end

# Determine if the cluster would select a readable server for the
# provided read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(server_selector)
#
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.4.0
def has_readable_server?(server_selector = nil)
topology.has_readable_server?(self, server_selector)
end

# Determine if the cluster would select a writable server.
#
# @example Is a writable server present?
# topology.has_writable_server?
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.4.0
def has_writable_server?
topology.has_writable_server?(self)
end

# Instantiate the new cluster.
#
# @api private
Expand All @@ -119,16 +151,26 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@event_listeners = Event::Listeners.new
@options = options.freeze
@app_metadata ||= AppMetadata.new(self)
@topology = Topology.initial(seeds, options)
@update_lock = Mutex.new
@pool_lock = Mutex.new
@topology = Topology.initial(seeds, monitoring, options)

publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(@topology)
)

subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

seeds.each{ |seed| add(seed) }

publish_sdam_event(
Monitoring::TOPOLOGY_CHANGED,
Monitoring::Event::TopologyChanged.new(@topology, @topology)
) if @servers.size > 1

@cursor_reaper = CursorReaper.new
@cursor_reaper.run!

Expand Down Expand Up @@ -264,11 +306,14 @@ def standalone_discovered
#
# @since 2.0.0
def remove(host)
log_debug("#{host} being removed from the cluster.")
address = Address.new(host)
removed_servers = @servers.select { |s| s.address == address }
@update_lock.synchronize { @servers = @servers - removed_servers }
removed_servers.each{ |server| server.disconnect! } if removed_servers
publish_sdam_event(
Monitoring::SERVER_CLOSED,
Monitoring::Event::ServerClosed.new(address, topology)
)
@update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

Expand Down
13 changes: 7 additions & 6 deletions lib/mongo/cluster/topology.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,34 @@ class Cluster
module Topology
extend self

# The 2 various topologies for server selection.
# The various topologies for server selection.
#
# @since 2.0.0
OPTIONS = {
replica_set: ReplicaSet,
sharded: Sharded,
direct: Single
}
}.freeze

# Get the initial cluster topology for the provided options.
#
# @example Get the initial cluster topology.
# Topology.initial(topology: :replica_set)
#
# @param [ Array<String> ] seeds The addresses of the configured servers.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Hash ] options The cluster options.
#
# @return [ ReplicaSet, Sharded, Single ] The topology.
#
# @since 2.0.0
def initial(seeds, options)
def initial(seeds, monitoring, options)
if options.has_key?(:connect)
OPTIONS.fetch(options[:connect]).new(options, seeds)
OPTIONS.fetch(options[:connect]).new(options, monitoring, seeds)
elsif options.has_key?(:replica_set)
ReplicaSet.new(options, seeds)
ReplicaSet.new(options, monitoring, options)
else
Unknown.new(options, seeds)
Unknown.new(options, monitoring, seeds)
end
end
end
Expand Down
50 changes: 48 additions & 2 deletions lib/mongo/cluster/topology/replica_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Topology
# @since 2.0.0
class ReplicaSet
include Loggable
include Monitoring::Publishable

# Constant for the replica set name configuration option.
#
Expand All @@ -30,6 +31,9 @@ class ReplicaSet
# @return [ Hash ] options The options.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# The display name for the topology.
#
# @since 2.0.0
Expand Down Expand Up @@ -61,7 +65,6 @@ def display_name
def elect_primary(description, servers)
if description.replica_set_name == replica_set_name
unless detect_stale_primary!(description)
log_debug("Server #{description.address.to_s} elected as primary in #{replica_set_name}.")
servers.each do |server|
if server.primary? && server.address != description.address
server.description.unknown!
Expand All @@ -79,16 +82,51 @@ def elect_primary(description, servers)
self
end

# Determine if the topology would select a readable server for the
# provided candidates and read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(cluster, server_selector)
#
# @param [ Cluster ] cluster The cluster.
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true, false ] If a readable server is present.
#
# @since 2.4.0
def has_readable_server?(cluster, server_selector = nil)
(server_selector || ServerSelector.get(mode: :primary)).candidates(cluster).any?
end

# Determine if the topology would select a writable server for the
# provided candidates.
#
# @example Is a writable server present?
# topology.has_writable_server?(servers)
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ true, false ] If a writable server is present.
#
# @since 2.4.0
def has_writable_server?(cluster)
cluster.servers.any?{ |server| server.primary? }
end

# Initialize the topology with the options.
#
# @example Initialize the topology.
# ReplicaSet.new(options)
#
# @param [ Hash ] options The options.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Array<String> ] seeds The seeds.
#
# @since 2.0.0
def initialize(options, seeds = [])
def initialize(options, monitoring, seeds = [])
@options = options
@monitoring = monitoring
@max_election_id = nil
@max_set_version = nil
end
Expand Down Expand Up @@ -222,6 +260,14 @@ def unknown?; false; end
# @since 2.0.6
def standalone_discovered; self; end

# Notify the topology that a member was discovered.
#
# @example Notify the topology that a member was discovered.
# topology.member_discovered
#
# @since 2.4.0
def member_discovered; end;

private

def update_max_election_id(description)
Expand Down
48 changes: 47 additions & 1 deletion lib/mongo/cluster/topology/sharded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ module Topology
#
# @since 2.0.0
class Sharded
include Monitoring::Publishable

# The display name for the topology.
#
# @since 2.0.0
NAME = 'Sharded'.freeze

# @return [ Hash ] options The options.
attr_reader :options

# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

# Get the display name.
#
# @example Get the display name.
Expand All @@ -51,16 +58,47 @@ def display_name
# @return [ Sharded ] The topology.
def elect_primary(description, servers); self; end

# Determine if the topology would select a readable server for the
# provided candidates and read preference.
#
# @example Is a readable server present?
# topology.has_readable_server?(cluster, server_selector)
#
# @param [ Cluster ] cluster The cluster.
# @param [ ServerSelector ] server_selector The server
# selector.
#
# @return [ true ] A Sharded cluster always has a readable server.
#
# @since 2.4.0
def has_readable_server?(cluster, server_selector = nil); true; end

# Determine if the topology would select a writable server for the
# provided candidates.
#
# @example Is a writable server present?
# topology.has_writable_server?(servers)
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ true ] A Sharded cluster always has a writable server.
#
# @since 2.4.0
def has_writable_server?(cluster); true; end

# Initialize the topology with the options.
#
# @example Initialize the topology.
# Sharded.new(options)
#
# @param [ Hash ] options The options.
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Array<String> ] seeds The seeds.
#
# @since 2.0.0
def initialize(options, seeds = [])
def initialize(options, monitoring, seeds = [])
@options = options
@monitoring = monitoring
end

# A sharded topology is not a replica set.
Expand Down Expand Up @@ -181,6 +219,14 @@ def unknown?; false; end
# @since 2.0.6
def standalone_discovered; self; end

# Notify the topology that a member was discovered.
#
# @example Notify the cluster that a member was discovered.
# topology.member_discovered
#
# @since 2.4.0
def member_discovered; end;

private

def remove_self?(description, server)
Expand Down
Loading