Skip to content

RUBY-2132 push monitor #2012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,14 @@ def scan!(sync=true)
# respective server is cleared. Set this option to true to keep the
# existing connection pool (required when handling not master errors
# on 4.2+ servers).
# @option aptions [ true | false ] :awaited Whether the updated description
# was a result of processing an awaited ismaster.
#
# @api private
def run_sdam_flow(previous_desc, updated_desc, options = {})
@sdam_flow_lock.synchronize do
flow = SdamFlow.new(self, previous_desc, updated_desc)
flow = SdamFlow.new(self, previous_desc, updated_desc,
awaited: options[:awaited])
flow.server_description_changed

# SDAM flow may alter the updated description - grab the final
Expand Down
8 changes: 7 additions & 1 deletion lib/mongo/cluster/sdam_flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ class Mongo::Cluster
class SdamFlow
extend Forwardable

def initialize(cluster, previous_desc, updated_desc)
def initialize(cluster, previous_desc, updated_desc, awaited: false)
@cluster = cluster
@topology = cluster.topology
@original_desc = @previous_desc = previous_desc
@updated_desc = updated_desc
@servers_to_disconnect = []
@awaited = !!awaited
end

attr_reader :cluster
Expand All @@ -51,6 +52,10 @@ def initialize(cluster, previous_desc, updated_desc)
attr_reader :updated_desc
attr_reader :original_desc

def awaited?
@awaited
end

def_delegators :topology, :replica_set_name

# Updates descriptions on all servers whose address matches
Expand Down Expand Up @@ -453,6 +458,7 @@ def publish_description_change_event
topology,
previous_desc,
updated_desc,
awaited: awaited?,
)
)
@previous_desc = updated_desc
Expand Down
27 changes: 25 additions & 2 deletions lib/mongo/monitoring/event/server_description_changed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class ServerDescriptionChanged < Mongo::Event::Base
# description.
attr_reader :new_description

# @return [ true | false ] Whether the heartbeat was awaited.
#
# @api experimental
def awaited?
@awaited
end

# Create the event.
#
# @example Create the event.
Expand All @@ -44,13 +51,19 @@ class ServerDescriptionChanged < Mongo::Event::Base
# @param [ Integer ] topology The topology.
# @param [ Server::Description ] previous_description The previous description.
# @param [ Server::Description ] new_description The new description.
# @param [ true | false ] awaited Whether the server description was
# a result of processing an awaited ismaster response.
#
# @since 2.4.0
def initialize(address, topology, previous_description, new_description)
# @api private
def initialize(address, topology, previous_description, new_description,
awaited: false
)
@address = address
@topology = topology
@previous_description = previous_description
@new_description = new_description
@awaited = !!awaited
end

# Returns a concise yet useful summary of the event.
Expand All @@ -65,7 +78,17 @@ def summary
"#<#{short_class_name}" +
" address=#{address}" +
# TODO Add summaries to descriptions and use them here
" prev=#{previous_description.server_type.upcase} new=#{new_description.server_type.upcase}>"
" prev=#{previous_description.server_type.upcase} new=#{new_description.server_type.upcase}#{awaited_indicator}>"
end

private

def awaited_indicator
if awaited?
' [awaited]'
else
''
end
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/monitoring/event/server_heartbeat_failed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,27 @@ class ServerHeartbeatFailed < Mongo::Event::Base
# Alias of error for SDAM spec compliance.
alias :failure :error

# @return [ true | false ] Whether the heartbeat was awaited.
def awaited?
@awaited
end

# Create the event.
#
# @example Create the event.
# ServerHeartbeatSucceeded.new(address, duration)
#
# @param [ Address ] address The server address.
# @param [ Float ] round_trip_time Duration of ismaster call in seconds.
# @param [ true | false ] awaited Whether the heartbeat was awaited.
#
# @since 2.7.0
# @api private
def initialize(address, round_trip_time, error)
def initialize(address, round_trip_time, error, awaited: false)
@address = address
@round_trip_time = round_trip_time
@error = error
@awaited = !!awaited
end

# Returns a concise yet useful summary of the event.
Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/monitoring/event/server_heartbeat_started.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ class ServerHeartbeatStarted < Mongo::Event::Base
# @return [ Address ] address The server address.
attr_reader :address

# @return [ true | false ] Whether the heartbeat was awaited.
def awaited?
@awaited
end

# Create the event.
#
# @example Create the event.
# ServerHeartbeatStarted.new(address)
#
# @param [ Address ] address The server address.
# @param [ true | false ] awaited Whether the heartbeat was awaited.
#
# @since 2.7.0
# @api private
def initialize(address)
def initialize(address, awaited: false)
@address = address
@awaited = !!awaited
end

# Returns a concise yet useful summary of the event.
Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/monitoring/event/server_heartbeat_succeeded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,26 @@ class ServerHeartbeatSucceeded < Mongo::Event::Base
# Alias of round_trip_time.
alias :duration :round_trip_time

# @return [ true | false ] Whether the heartbeat was awaited.
def awaited?
@awaited
end

# Create the event.
#
# @example Create the event.
# ServerHeartbeatSucceeded.new(address, duration)
#
# @param [ Address ] address The server address.
# @param [ Float ] round_trip_time Duration of ismaster call in seconds.
# @param [ true | false ] awaited Whether the heartbeat was awaited.
#
# @since 2.7.0
# @api private
def initialize(address, round_trip_time)
def initialize(address, round_trip_time, awaited: false)
@address = address
@round_trip_time = round_trip_time
@awaited = !!awaited
end

# Returns a concise yet useful summary of the event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ class ServerDescriptionChangedLogSubscriber < SDAMLogSubscriber
def log_event(event)
log_debug(
"Server description for #{event.address} changed from " +
"'#{event.previous_description.server_type}' to '#{event.new_description.server_type}'."
"'#{event.previous_description.server_type}' to '#{event.new_description.server_type}'#{awaited_indicator(event)}."
)
end

def awaited_indicator(event)
if event.awaited?
' [awaited]'
else
''
end
end
end
end
end
13 changes: 10 additions & 3 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def initialize(address, cluster, monitoring, event_listeners, options = {})
@connection_id_gen = Class.new do
include Id
end
@scan_semaphore = Semaphore.new
@scan_semaphore = DistinguishingSemaphore.new
@round_trip_time_averager = RoundTripTimeAverager.new
@description = Description.new(address, {})
@last_scan = nil
Expand Down Expand Up @@ -432,7 +432,7 @@ def with_connection(&block)
def handle_handshake_failure!
yield
rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError => e
unknown!(generation: e.generation)
unknown!(generation: e.generation, stop_push_monitor: true)
raise
end

Expand All @@ -455,7 +455,7 @@ def handle_auth_failure!
raise
rescue Mongo::Error::SocketError => e
# non-timeout network error
unknown!(generation: e.generation)
unknown!(generation: e.generation, stop_push_monitor: true)
raise
rescue Auth::Unauthorized
# auth error, keep server description and topology as they are
Expand Down Expand Up @@ -503,6 +503,8 @@ def retry_writes?
# on 4.2+ servers).
# @option options [ TopologyVersion ] :topology_version Topology version
# of the error response that is causing the server to be marked unknown.
# @option options [ true | false ] :stop_push_monitor Whether to stop
# the PushMonitor associated with the server, if any.
#
# @since 2.4.0, SDAM events are sent as of version 2.7.0
def unknown!(options = {})
Expand All @@ -516,6 +518,10 @@ def unknown!(options = {})
return
end

if options[:stop_push_monitor]
monitor&.stop_push_monitor!
end

# SDAM flow will update description on the server without in-place
# mutations and invoke SDAM transitions as needed.
config = {}
Expand Down Expand Up @@ -562,3 +568,4 @@ def update_last_scan
require 'mongo/server/description'
require 'mongo/server/monitor'
require 'mongo/server/round_trip_time_averager'
require 'mongo/server/push_monitor'
2 changes: 1 addition & 1 deletion lib/mongo/server/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def handle_errors
yield
rescue Error::SocketError => e
@error = e
@server.unknown!(generation: e.generation)
@server.unknown!(generation: e.generation, stop_push_monitor: true)
raise
rescue Error::SocketTimeoutError => e
@error = e
Expand Down
Loading