Skip to content

Commit

Permalink
RUBY-2326 Redo cursor reaper to track cursor ids (#2289)
Browse files Browse the repository at this point in the history
Co-authored-by: Oleg Pudeyev <code@olegp.name>
  • Loading branch information
p-mongo and p committed Jul 5, 2021
1 parent 906bc22 commit 1462568
Show file tree
Hide file tree
Showing 20 changed files with 242 additions and 406 deletions.
2 changes: 1 addition & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@connected = true

if options[:cleanup] != false
@cursor_reaper = CursorReaper.new
@cursor_reaper = CursorReaper.new(self)
@socket_reaper = SocketReaper.new(self)
@periodic_executor = PeriodicExecutor.new([
@cursor_reaper, @socket_reaper,
Expand Down
7 changes: 4 additions & 3 deletions lib/mongo/cluster/periodic_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ class PeriodicExecutor
#
# @example Create a PeriodicExecutor.
# Mongo::Cluster::PeriodicExecutor.new([reaper, reaper2])
#
# @param [ Array<Object> ] executors The executors. Each must respond
# to #execute and #flush.
# @param [ Hash ] options The options.
#
# @option options [ Logger ] :logger A custom logger to use.
#
# @api private
#
# @since 2.5.0
def initialize(executors = [], options = {})
def initialize(executors, options = {})
@thread = nil
@executors = executors
@stop_semaphore = Semaphore.new
Expand Down
119 changes: 75 additions & 44 deletions lib/mongo/cluster/reapers/cursor_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,38 @@ class Cluster
class CursorReaper
include Retryable

# The default time interval for the cursor reaper to send pending kill cursors operations.
# The default time interval for the cursor reaper to send pending
# kill cursors operations.
#
# @since 2.3.0
FREQUENCY = 1.freeze

# Create a cursor reaper.
#
# @example Create a CursorReaper.
# Mongo::Cluster::CursorReaper.new(cluster)
# @param [ Cluster ] cluster The cluster.
#
# @api private
#
# @since 2.3.0
def initialize
def initialize(cluster)
@cluster = cluster
@to_kill = {}
@active_cursors = Set.new
@active_cursor_ids = Set.new
@mutex = Mutex.new
end

attr_reader :cluster

# Schedule a kill cursors operation to be eventually executed.
#
# @example Schedule a kill cursors operation.
# cursor_reaper.schedule_kill_cursor(id, op_spec, server)
#
# @param [ Integer ] id The id of the cursor to kill.
# @param [ Hash ] op_spec The spec for the kill cursors op.
# @param [ Mongo::Server ] server The server to send the kill cursors operation to.
# @param [ Cursor::KillSpec ] kill_spec The kill specification.
# @param [ Mongo::Server ] server The server to send the kill cursors
# operation to.
#
# @api private
#
# @since 2.3.0
def schedule_kill_cursor(id, op_spec, server)
def schedule_kill_cursor(kill_spec, server)
@mutex.synchronize do
if @active_cursors.include?(id)
@to_kill[server] ||= Set.new
@to_kill[server] << op_spec
if @active_cursor_ids.include?(kill_spec.cursor_id)
@to_kill[server.address.seed] ||= Set.new
@to_kill[server.address.seed] << kill_spec
end
end
end
Expand All @@ -87,7 +83,7 @@ def register_cursor(id)
end

@mutex.synchronize do
@active_cursors << id
@active_cursor_ids << id
end
end

Expand All @@ -110,7 +106,7 @@ def unregister_cursor(id)
end

@mutex.synchronize do
@active_cursors.delete(id)
@active_cursor_ids.delete(id)
end
end

Expand All @@ -123,34 +119,69 @@ def unregister_cursor(id)
#
# @since 2.3.0
def kill_cursors
to_kill_copy = {}
active_cursors_copy = []
# TODO optimize this to batch kill cursor operations for the same
# server/database/collection instead of killing each cursor
# individually.

@mutex.synchronize do
to_kill_copy = @to_kill.dup
active_cursors_copy = @active_cursors.dup
@to_kill = {}
end
loop do
server_address_str = nil

kill_spec = @mutex.synchronize do
# Find a server that has any cursors scheduled for destruction.
server_address_str, specs =
@to_kill.detect { |server_address_str, specs| specs.any? }

if specs.nil?
# All servers have empty specs, nothing to do.
return
end

# Note that this mutates the spec in the queue.
# If the kill cursor operation fails, we don't attempt to
# kill that cursor again.
spec = specs.take(1).tap do |arr|
specs.subtract(arr)
end.first

unless @active_cursor_ids.include?(spec.cursor_id)
# The cursor was already killed, typically because it has
# been iterated to completion. Remove the kill spec from
# our records without doing any more work.
spec = nil
end

spec
end

# If there was a spec to kill but its cursor was already killed,
# look for another spec.
next unless kill_spec

# We could also pass kill_spec directly into the KillCursors
# operation, though this would make that operation have a
# different API from all of the other ones which accept hashes.
spec = {
cursor_ids: [kill_spec.cursor_id],
coll_name: kill_spec.coll_name,
db_name: kill_spec.db_name,
}
op = Operation::KillCursors.new(spec)

server = cluster.servers.detect do |server|
server.address.seed == server_address_str
end

unless server
# TODO We currently don't have a server for the address that the
# cursor is associated with. We should leave the cursor in the
# queue to be killed at a later time (when the server comes back).
next
end

to_kill_copy.each do |server, op_specs|
options = {
server_api: server.options[:server_api],
}
context = Operation::Context.new(options: options)
op_specs.each do |op_spec|
# TODO use connection properties, see RUBY-2326
if server.load_balancer? || server.features.find_command_enabled?
Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a)
if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0
Operation::KillCursors.new(op_spec).execute(server, context: context)
end
else
Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a)
if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0
Operation::KillCursors.new(op_spec).execute(server, context: context)
end
end
end
op.execute(server, context: Operation::Context.new(options: options))
end
end
alias :execute :kill_cursors
Expand Down
7 changes: 7 additions & 0 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class View
# @since 2.0.0
module Iterable

# Returns the cursor associated with this view, if any.
#
# @return [ nil | Cursor ] The cursor, if any.
#
# @api private
attr_reader :cursor

# Iterate through documents returned by a query with this +View+.
#
# @example Iterate through the result of the view.
Expand Down
51 changes: 26 additions & 25 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/cursor/builder'

module Mongo

# Client-side representation of an iterator over a query result set on
Expand Down Expand Up @@ -82,9 +80,8 @@ def initialize(view, result, server, options = {})
@session = @options[:session]
unless closed?
register
ObjectSpace.define_finalizer(self, self.class.finalize(@cursor_id,
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec,
cluster,
kill_cursors_op_spec,
server,
@session))
end
Expand All @@ -96,20 +93,19 @@ def initialize(view, result, server, options = {})
# Finalize the cursor for garbage collection. Schedules this cursor to be included
# in a killCursors operation executed by the Cluster's CursorReaper.
#
# @example Finalize the cursor.
# Cursor.finalize(id, cluster, op, server)
#
# @param [ Integer ] cursor_id The cursor's id.
# @param [ Cursor::KillSpec ] kill_spec The KillCursor operation specification.
# @param [ Mongo::Cluster ] cluster The cluster associated with this cursor and its server.
# @param [ Hash ] op_spec The killCursors operation specification.
# @param [ Mongo::Server ] server The server to send the killCursors operation to.
#
# @return [ Proc ] The Finalizer.
#
# @since 2.3.0
def self.finalize(cursor_id, cluster, op_spec, server, session)
# @api private
def self.finalize(kill_spec, cluster, server, session)
unless KillSpec === kill_spec
raise ArgumentError, "First argument must be a KillSpec: #{kill_spec.inspect}"
end
proc do
cluster.schedule_kill_cursor(cursor_id, op_spec, server)
cluster.schedule_kill_cursor(kill_spec, server)
session.end_session if session && session.implicit?
end
end
Expand Down Expand Up @@ -274,7 +270,13 @@ def close

unregister
read_with_one_retry do
kill_cursors_operation.execute(@server, context: Operation::Context.new(client: client, session: @session))
spec = {
coll_name: collection_name,
db_name: database.name,
cursor_ids: [id],
}
op = Operation::KillCursors.new(spec)
op.execute(@server, context: Operation::Context.new(client: client, session: @session))
end

nil
Expand Down Expand Up @@ -352,6 +354,15 @@ def get_more
process(get_more_operation.execute(@server, context: Operation::Context.new(client: client, session: @session)))
end

# @api private
def kill_spec
KillSpec.new(
cursor_id: id,
coll_name: collection_name,
db_name: database.name,
)
end

private

def exhausted?
Expand Down Expand Up @@ -394,18 +405,6 @@ def end_session
@session.end_session if @session && @session.implicit?
end

def kill_cursors_operation
Operation::KillCursors.new(kill_cursors_op_spec)
end

def kill_cursors_op_spec
if @server.with_connection { |connection| connection.features }.find_command_enabled?
Builder::KillCursorsCommand.new(self).specification
else
Builder::OpKillCursors.new(self).specification
end
end

def limited?
limit ? limit > 0 : false
end
Expand Down Expand Up @@ -448,3 +447,5 @@ def unregister
end
end
end

require 'mongo/cursor/kill_spec'

0 comments on commit 1462568

Please sign in to comment.